00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ConsumerAdmin.h"
00025
00026 #include "EventChannel.h"
00027 #include "ProxyPushSupplier.h"
00028 #include "ProxyPullSupplier.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 #include "Filter.h"
00032
00033 namespace OmniEvents {
00034
00035
00036 CosEventChannelAdmin::ProxyPushSupplier_ptr
00037 ConsumerAdmin_i::obtain_push_supplier()
00038 {
00039 if(!_pushSupplier)
00040 {
00041 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00042 _pushSupplier->_add_ref();
00043 }
00044 return _pushSupplier->createObject();
00045 }
00046
00047
00048 CosEventChannelAdmin::ProxyPullSupplier_ptr
00049 ConsumerAdmin_i::obtain_pull_supplier()
00050 {
00051 if(!_pullSupplier)
00052 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00053 return _pullSupplier->createObject();
00054 }
00055
00056
00057 ConsumerAdmin_i::ConsumerAdmin_i(
00058 const EventChannel_i& channel,
00059 PortableServer::POA_ptr poa
00060 )
00061 : Servant(poa),
00062 _channel(channel),
00063 _queue(channel.maxQueueLength()),
00064 _pushSupplier(NULL),
00065 _pullSupplier(NULL)
00066 {
00067 if(_channel.properties().hasAttr("FilterId"))
00068 {
00069 string rid =_channel.properties().attrString("FilterId");
00070 _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
00071 }
00072 else if(_channel.properties().hasAttr("FilterKind"))
00073 {
00074 CORBA::TCKind kind =
00075 CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
00076 _queue.setFilter(new FilterByTCKind(kind));
00077 }
00078
00079 activateObjectWithId("ConsumerAdmin");
00080 _remove_ref();
00081 }
00082
00083
00084 ConsumerAdmin_i::~ConsumerAdmin_i()
00085 {
00086 if(_pushSupplier)
00087 {
00088 _pushSupplier->_remove_ref();
00089 _pushSupplier=NULL;
00090 }
00091 DB(20,"~ConsumerAdmin_i()")
00092 }
00093
00094
00095 void ConsumerAdmin_i::send(CORBA::Any* event)
00096 {
00097 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00098 _queue.append(event);
00099 }
00100
00101
00102 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
00103 {
00104 if(!events.empty())
00105 {
00106 ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00107 for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
00108 _queue.append( *i );
00109 events.clear();
00110 }
00111 }
00112
00113
00114 void ConsumerAdmin_i::disconnect()
00115 {
00116 if(_pushSupplier)
00117 _pushSupplier->disconnect();
00118 if(_pullSupplier)
00119 _pullSupplier->disconnect();
00120 }
00121
00122
00123 void ConsumerAdmin_i::reincarnate(const PersistNode& node)
00124 {
00125
00126 PersistNode* pushsNode =node.child("ProxyPushSupplier");
00127 if(pushsNode && !pushsNode->_child.empty())
00128 {
00129 _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00130 _pushSupplier->_add_ref();
00131 _pushSupplier->reincarnate(*pushsNode);
00132 }
00133
00134
00135 PersistNode* pullsNode =node.child("ProxyPullSupplier");
00136 if(pullsNode && !pullsNode->_child.empty())
00137 {
00138 _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00139 _pullSupplier->reincarnate(*pullsNode);
00140 }
00141 }
00142
00143
00144 void ConsumerAdmin_i::output(ostream& os)
00145 {
00146 if(_pushSupplier)
00147 {
00148 omni_mutex_lock l(_pushSupplier->_lock);
00149 _pushSupplier->output(os);
00150 }
00151 if(_pullSupplier)
00152 {
00153 _pullSupplier->output(os);
00154 }
00155 }
00156
00157
00158 };