00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "EventChannel.h"
00025 #include "ConsumerAdmin.h"
00026 #include "SupplierAdmin.h"
00027 #include "omniEventsLog.h"
00028 #include "Orb.h"
00029
00030 #include <list>
00031
00032 namespace OmniEvents {
00033
00034
00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
00036 {
00037 if(!_consumerAdmin || _shutdownRequested)
00038 throw CORBA::OBJECT_NOT_EXIST();
00039 return _consumerAdmin->_this();
00040 }
00041
00042
00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
00044 {
00045 if(!_supplierAdmin || _shutdownRequested)
00046 throw CORBA::OBJECT_NOT_EXIST();
00047 return _supplierAdmin->_this();
00048 }
00049
00050
00051 void EventChannel_i::destroy()
00052 {
00053 if(_shutdownRequested)
00054 throw CORBA::OBJECT_NOT_EXIST();
00055
00056
00057
00058 if(_consumerAdmin)
00059 _consumerAdmin->disconnect();
00060 if(_supplierAdmin)
00061 _supplierAdmin->disconnect();
00062 if(_mapper)
00063 _mapper->destroy();
00064
00065
00066 _shutdownRequested=true;
00067
00068
00069
00070 }
00071
00072
00073 EventChannel_i::EventChannel_i(EventChannelStore* store)
00074 : Servant(PortableServer::POA::_nil()),
00075 _eventChannelStore(store),
00076 _consumerAdmin(NULL),
00077 _supplierAdmin(NULL),
00078 _poaManager(),
00079 _shutdownRequested(false),
00080 _properties(),
00081 _mapper(NULL)
00082 {}
00083
00084
00085 void EventChannel_i::activate(
00086 const char* channelName,
00087 const PersistNode* node
00088 )
00089 {
00090
00091
00092
00093 createPoa(channelName);
00094
00095 if(node)
00096 _properties._attr=node->_attr;
00097
00098
00099 _consumerAdmin=new ConsumerAdmin_i(*this,_poa);
00100
00101
00102 _supplierAdmin=new SupplierAdmin_i(*this,_poa);
00103
00104 if(node)
00105 {
00106 PersistNode* saNode =node->child("SupplierAdmin");
00107 if(saNode)
00108 _supplierAdmin->reincarnate(*saNode);
00109
00110 PersistNode* caNode =node->child("ConsumerAdmin");
00111 if(caNode)
00112 _consumerAdmin->reincarnate(*caNode);
00113 }
00114
00115 activateObjectWithId("EventChannel");
00116
00117
00118 setInsName(_properties.attrString("InsName"));
00119 }
00120
00121
00122 EventChannel_i::~EventChannel_i()
00123 {
00124 if(CORBA::is_nil(_poa))
00125 {
00126 DB(20,"~EventChannel_i()")
00127 }
00128 else
00129 {
00130 DB(20,"~EventChannel_i() - destroying POA")
00131 try {
00132 _poa->destroy(
00133 CORBA::Boolean(1) ,
00134 CORBA::Boolean(1)
00135 );
00136 }
00137 catch(...) {
00138 DB(2,"~EventChannel_i() - ERROR destroying POA")
00139 }
00140 _poa=PortableServer::POA::_nil();
00141 }
00142 }
00143
00144
00145 void EventChannel_i::run(void* arg)
00146 {
00147
00148 assert(!CORBA::is_nil(_poa));
00149
00150 try
00151 {
00152
00153 if(_eventChannelStore)
00154 {
00155 _eventChannelStore->insert(this);
00156 if(omniEventsLog::exists())
00157 {
00158 WriteLock log;
00159 output(log.os);
00160 }
00161 }
00162
00163
00164 mainLoop();
00165
00166
00167 if(_eventChannelStore)
00168 {
00169 _eventChannelStore->erase(this);
00170 CORBA::String_var poaName =_poa->the_name();
00171 if(omniEventsLog::exists())
00172 {
00173 WriteLock log;
00174 log.os<<"-ecf/"<<poaName.in()<<'\n';
00175 }
00176 }
00177
00178 _poa->destroy(
00179 CORBA::Boolean(1) ,
00180 CORBA::Boolean(1)
00181 );
00182 _poaManager->deactivate(
00183 CORBA::Boolean(1) ,
00184 CORBA::Boolean(1)
00185 );
00186 _poa=PortableServer::POA::_nil();
00187
00188 }
00189 catch(PortableServer::POAManager::AdapterInactive& ex)
00190 {
00191 DB(0,"EventChannel_i::run() - POA deactivated from the outside.")
00192 Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00193 }
00194 catch (CORBA::Exception& ex) {
00195 Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00196 }
00197 catch(...)
00198 {
00199 Orb::inst().reportObjectFailure(HERE,_this(),NULL);
00200 }
00201
00202
00203
00204 }
00205
00206
00207 void EventChannel_i::mainLoop()
00208 {
00209 _poaManager->activate();
00210 unsigned long localCyclePeriod_ns=cyclePeriod_ns();
00211 while(!_shutdownRequested)
00212 {
00213
00214
00215 _poaManager->hold_requests(CORBA::Boolean(1) );
00216
00217 list<CORBA::Any*> events;
00218 _supplierAdmin->collect(events);
00219 _consumerAdmin->send(events);
00220 assert(events.empty());
00221
00222 _poaManager->activate();
00223
00224
00225
00226
00227 omni_thread::sleep(0,localCyclePeriod_ns);
00228 }
00229 }
00230
00231
00232 void EventChannel_i::output(ostream& os)
00233 {
00234 CORBA::String_var poaName =_poa->the_name();
00235 string name =string("ecf/")+poaName.in();
00236 _properties.output(os,name);
00237 if(_supplierAdmin)
00238 _supplierAdmin->output(os);
00239 if(_consumerAdmin)
00240 _consumerAdmin->output(os);
00241 }
00242
00243
00244 void EventChannel_i::setInsName(const string v)
00245 {
00246 Mapper* newMapper =NULL;
00247 try
00248 {
00249
00250
00251
00252 if(!v.empty())
00253 {
00254
00255 newMapper=new Mapper(v.c_str(),_this());
00256 }
00257
00258 if(_mapper)
00259 _mapper->destroy();
00260 _mapper=newMapper;
00261
00262 }
00263 catch(...)
00264 {
00265
00266 delete newMapper;
00267 throw;
00268 }
00269 }
00270
00271
00272 void EventChannel_i::createPoa(const char* channelName)
00273 {
00274 using namespace PortableServer;
00275 POA_ptr p=Orb::inst()._RootPOA.in();
00276
00277
00278
00279
00280
00281
00282
00283
00284
00285
00286 CORBA::PolicyList policies;
00287 policies.length(3);
00288 policies[0]=p->create_lifespan_policy(PERSISTENT);
00289 policies[1]=p->create_id_assignment_policy(USER_ID);
00290 policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00291
00292 try
00293 {
00294 try
00295 {
00296
00297
00298 _poa=p->create_POA(channelName,POAManager::_nil(),policies);
00299 _poaManager=_poa->the_POAManager();
00300 }
00301 catch(POA::AdapterAlreadyExists& ex)
00302 {
00303 DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
00304 throw;
00305 }
00306 catch(POA::InvalidPolicy& ex)
00307 {
00308 DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
00309 throw;
00310 }
00311 }
00312 catch(...)
00313 {
00314
00315 for(CORBA::ULong i=0; i<policies.length(); ++i)
00316 policies[i]->destroy();
00317 throw;
00318 }
00319
00320
00321 for(CORBA::ULong i=0; i<policies.length(); ++i)
00322 policies[i]->destroy();
00323 }
00324
00325
00326
00327
00328
00329
00330
00331 EventChannelStore::EventChannelStore()
00332 :_channels(),_lock()
00333 {}
00334
00335 EventChannelStore::~EventChannelStore()
00336 {
00337
00338 }
00339
00340 void EventChannelStore::insert(EventChannel_i* channel)
00341 {
00342 omni_mutex_lock l(_lock);
00343 bool insertOK =_channels.insert(channel).second;
00344 if(!insertOK)
00345 DB(2,"Attempted to store an EventChannel, when it is already stored.");
00346 }
00347
00348 void EventChannelStore::erase(EventChannel_i* channel)
00349 {
00350 omni_mutex_lock l(_lock);
00351 set<EventChannel_i*>::iterator pos =_channels.find(channel);
00352 if(pos==_channels.end())
00353 DB(2,"Failed to erase unknown EventChannel.")
00354 else
00355 _channels.erase(pos);
00356 }
00357
00358 void EventChannelStore::output(ostream &os)
00359 {
00360 omni_mutex_lock l(_lock);
00361 for(set<EventChannel_i*>::iterator i=_channels.begin();
00362 i!=_channels.end();
00363 ++i)
00364 {
00365 (*i)->output(os);
00366 }
00367 }
00368
00369
00370 };
00371