00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "SupplierAdmin.h"
00025
00026 #include "EventChannel.h"
00027 #include "ProxyPushConsumer.h"
00028 #include "ProxyPullConsumer.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031
00032 #define MILLION 1000000
00033 #define BILLION 1000000000
00034
00035 namespace OmniEvents {
00036
00037 CosEventChannelAdmin::ProxyPushConsumer_ptr
00038 SupplierAdmin_i::obtain_push_consumer()
00039 {
00040 return _pushConsumer->createObject();
00041 }
00042
00043
00044 CosEventChannelAdmin::ProxyPullConsumer_ptr
00045 SupplierAdmin_i::obtain_pull_consumer()
00046 {
00047 if(!_pullConsumer)
00048 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00049 return _pullConsumer->createObject();
00050 }
00051
00052
00053 SupplierAdmin_i::SupplierAdmin_i(
00054 const EventChannel_i& channel,
00055 PortableServer::POA_ptr poa
00056 )
00057 : Servant(poa),
00058 _channel(channel),
00059 _pushConsumer(NULL),
00060 _pullConsumer(NULL),
00061 _queue(),
00062 _nextPull(0,0)
00063 {
00064
00065
00066 if(_channel.pullRetryPeriod_ms() > (_channel.cyclePeriod_ns()/MILLION))
00067 {
00068 omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
00069 }
00070
00071
00072
00073
00074
00075
00076
00077 _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin());
00078
00079 activateObjectWithId("SupplierAdmin");
00080 _remove_ref();
00081 }
00082
00083
00084 SupplierAdmin_i::~SupplierAdmin_i()
00085 {
00086 if(_pushConsumer)
00087 {
00088 delete _pushConsumer;
00089 _pushConsumer=NULL;
00090 }
00091 for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
00092 delete *i;
00093 DB(20,"~SupplierAdmin_i()")
00094 }
00095
00096
00097 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
00098 {
00099 if(_pullConsumer)
00100 {
00101 _pullConsumer->collect();
00102 if(0==_nextPull.first)
00103 {
00104 _pullConsumer->triggerRequest();
00105 }
00106 else
00107 {
00108 pair<unsigned long,unsigned long> now;
00109 omni_thread::get_time(&(now.first),&(now.second));
00110 if(now>=_nextPull)
00111 {
00112 _pullConsumer->triggerRequest();
00113
00114 CORBA::ULong p =_channel.pullRetryPeriod_ms();
00115 do{
00116 _nextPull.second += (p%1000)*MILLION;
00117 _nextPull.first += p/1000 + _nextPull.second/BILLION;
00118 _nextPull.second %= BILLION;
00119 } while(now>=_nextPull);
00120 }
00121 }
00122 }
00123 _pushConsumer->trigger();
00124
00125 events=_queue;
00126 _queue.clear();
00127 }
00128
00129
00130 void SupplierAdmin_i::disconnect()
00131 {
00132 if(_pushConsumer)
00133 _pushConsumer->disconnect();
00134 if(_pullConsumer)
00135 _pullConsumer->disconnect();
00136 }
00137
00138
00139 void SupplierAdmin_i::reincarnate(const PersistNode& node)
00140 {
00141
00142 PersistNode* pushcNode =node.child("ProxyPushConsumer");
00143 if(pushcNode && !pushcNode->_child.empty())
00144 {
00145 assert(_pushConsumer!=NULL);
00146 _pushConsumer->reincarnate(*pushcNode);
00147 }
00148
00149
00150 PersistNode* pullcNode =node.child("ProxyPullConsumer");
00151 if(pullcNode && !pullcNode->_child.empty())
00152 {
00153 if(!_pullConsumer)
00154 _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00155 _pullConsumer->reincarnate(*pullcNode);
00156 }
00157 }
00158
00159
00160 void SupplierAdmin_i::output(ostream& os)
00161 {
00162 if(_pushConsumer)
00163 _pushConsumer->output(os);
00164 if(_pullConsumer)
00165 _pullConsumer->output(os);
00166 }
00167
00168
00169 };