EventChannel.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // EventChannel.cc            Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "EventChannel.h"
00025 #include "ConsumerAdmin.h"
00026 #include "SupplierAdmin.h"
00027 #include "omniEventsLog.h"
00028 #include "Orb.h"
00029 
00030 #include <list>
00031 
00032 namespace OmniEvents {
00033 
00034 // CORBA interface methods
00035 CosEventChannelAdmin::ConsumerAdmin_ptr EventChannel_i::for_consumers()
00036 {
00037   if(!_consumerAdmin || _shutdownRequested)
00038       throw CORBA::OBJECT_NOT_EXIST();
00039   return _consumerAdmin->_this();
00040 }
00041 
00042 
00043 CosEventChannelAdmin::SupplierAdmin_ptr EventChannel_i::for_suppliers()
00044 {
00045   if(!_supplierAdmin || _shutdownRequested)
00046       throw CORBA::OBJECT_NOT_EXIST();
00047   return _supplierAdmin->_this();
00048 }
00049 
00050 
00051 void EventChannel_i::destroy()
00052 {
00053   if(_shutdownRequested)
00054       throw CORBA::OBJECT_NOT_EXIST();
00055 
00056   // Send disconnect messages to connected clients.
00057   // (Admins might be NULL if destroy() is somehow called before activate.)
00058   if(_consumerAdmin)
00059      _consumerAdmin->disconnect();
00060   if(_supplierAdmin)
00061      _supplierAdmin->disconnect();
00062   if(_mapper)
00063      _mapper->destroy();
00064 
00065   // Terminate the thread.
00066   _shutdownRequested=true;
00067   
00068   //?? There is some danger that new connections will be established between now
00069   //?? and channel shutdown, These connections should be rejected.
00070 }
00071 
00072 
00073 EventChannel_i::EventChannel_i(EventChannelStore* store)
00074 : Servant(PortableServer::POA::_nil()),
00075   _eventChannelStore(store),
00076   _consumerAdmin(NULL),
00077   _supplierAdmin(NULL),
00078   _poaManager(),
00079   _shutdownRequested(false),
00080   _properties(),
00081   _mapper(NULL)
00082 {}
00083 
00084 
00085 void EventChannel_i::activate(
00086   const char*        channelName,
00087   const PersistNode* node
00088 )
00089 {
00090   // The order of these various initialization methods is very important.
00091   // I've documented dependencies as 'REQUIRES' comments.
00092 
00093   createPoa(channelName);
00094 
00095   if(node)
00096       _properties._attr=node->_attr;
00097 
00098   // REQUIRES: _properties
00099   _consumerAdmin=new ConsumerAdmin_i(*this,_poa);
00100 
00101   // REQUIRES: _consumerAdmin, _properties
00102   _supplierAdmin=new SupplierAdmin_i(*this,_poa);
00103 
00104   if(node)
00105   {
00106     PersistNode* saNode =node->child("SupplierAdmin");
00107     if(saNode)
00108         _supplierAdmin->reincarnate(*saNode);
00109 
00110     PersistNode* caNode =node->child("ConsumerAdmin");
00111     if(caNode)
00112         _consumerAdmin->reincarnate(*caNode);
00113   }
00114 
00115   activateObjectWithId("EventChannel");
00116 
00117   // REQUIRES: activate() ...since it uses _this().
00118   setInsName(_properties.attrString("InsName"));
00119 }
00120 
00121 
00122 EventChannel_i::~EventChannel_i()
00123 {
00124   if(CORBA::is_nil(_poa))
00125   {
00126     DB(20,"~EventChannel_i()")
00127   }
00128   else
00129   {
00130     DB(20,"~EventChannel_i() - destroying POA")
00131     try {
00132       _poa->destroy(
00133         CORBA::Boolean(1) /* etherealize_objects */,
00134         CORBA::Boolean(1) /* wait_for_completion */
00135       );
00136     }
00137     catch(...) {
00138       DB(2,"~EventChannel_i() - ERROR destroying POA")
00139     }
00140     _poa=PortableServer::POA::_nil();
00141   }
00142 }
00143 
00144 
00145 void EventChannel_i::run(void* arg)
00146 {
00147   // Ensure that activate() is called before start()/run().
00148   assert(!CORBA::is_nil(_poa));
00149 
00150   try
00151   {
00152     // Add this object to the store.
00153     if(_eventChannelStore)
00154     {
00155       _eventChannelStore->insert(this);
00156       if(omniEventsLog::exists())
00157       {
00158         WriteLock log;
00159         output(log.os);
00160       }
00161     }
00162 
00163     // Process events until the channel is destroyed.
00164     mainLoop();
00165 
00166     // Remove this object from the store.
00167     if(_eventChannelStore)
00168     {
00169       _eventChannelStore->erase(this);
00170       CORBA::String_var poaName =_poa->the_name();
00171       if(omniEventsLog::exists())
00172       {
00173         WriteLock log;
00174         log.os<<"-ecf/"<<poaName.in()<<'\n';
00175       }
00176     }
00177 
00178     _poa->destroy(
00179       CORBA::Boolean(1) /* etherealize_objects */,
00180       CORBA::Boolean(1) /* wait_for_completion */
00181     );
00182     _poaManager->deactivate(
00183       CORBA::Boolean(1) /* etherealize_objects */,
00184       CORBA::Boolean(1) /* wait_for_completion */
00185     );
00186     _poa=PortableServer::POA::_nil();
00187 
00188   }
00189   catch(PortableServer::POAManager::AdapterInactive& ex)
00190   {
00191     DB(0,"EventChannel_i::run() - POA deactivated from the outside.")
00192     Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00193   }
00194   catch (CORBA::Exception& ex) {
00195     Orb::inst().reportObjectFailure(HERE,_this(),&ex);
00196   }
00197   catch(...)
00198   {
00199     Orb::inst().reportObjectFailure(HERE,_this(),NULL);
00200   }
00201 
00202   // Thread now exits, and this object is deleted.
00203   // Contents are cleaned up by the destructor.
00204 }
00205 
00206 
00207 void EventChannel_i::mainLoop()
00208 {
00209   _poaManager->activate();
00210   unsigned long localCyclePeriod_ns=cyclePeriod_ns();
00211   while(!_shutdownRequested)
00212   {
00213     //
00214     // TRANSFER PHASE - transfer events from SupplierAdmin to ConsumerAdmin.
00215     _poaManager->hold_requests(CORBA::Boolean(1) /* wait_for_completion */);
00216 
00217     list<CORBA::Any*> events;
00218     _supplierAdmin->collect(events);
00219     _consumerAdmin->send(events);
00220     assert(events.empty());
00221 
00222     _poaManager->activate();
00223     
00224     //
00225     // COMMUNICATION PHASE - talk with clients' suppliers & consumers.
00226     // Note: On Linux the resolution of nanosleep is a huge 10ms.
00227     omni_thread::sleep(0,localCyclePeriod_ns);
00228   }
00229 }
00230 
00231 
00232 void EventChannel_i::output(ostream& os)
00233 {
00234   CORBA::String_var poaName =_poa->the_name();
00235   string name =string("ecf/")+poaName.in();
00236   _properties.output(os,name);
00237   if(_supplierAdmin)
00238      _supplierAdmin->output(os);
00239   if(_consumerAdmin)
00240      _consumerAdmin->output(os);
00241 }
00242 
00243 
00244 void EventChannel_i::setInsName(const string v)
00245 {
00246   Mapper* newMapper =NULL;
00247   try
00248   {
00249 
00250     // If _insName is set, then create a mapper object to allow clients to
00251     // find this object with a `corbaloc' string.
00252     if(!v.empty())
00253     {
00254       // !! Throws when there is already an object named 'v' in the INSPOA.
00255       newMapper=new Mapper(v.c_str(),_this());
00256     }
00257     // Deactivate the old _mapper object.
00258     if(_mapper)
00259        _mapper->destroy();
00260     _mapper=newMapper;
00261 
00262   }
00263   catch(...)
00264   {
00265     // Can't use an auto_ptr, because MS VC++ 6 has no auto_ptr::reset()
00266     delete newMapper;
00267     throw;
00268   }
00269 }
00270 
00271 
00272 void EventChannel_i::createPoa(const char* channelName)
00273 {
00274   using namespace PortableServer;
00275   POA_ptr p=Orb::inst()._RootPOA.in();
00276 
00277   // POLICIES:
00278   //  Lifespan          =PERSISTENT             // we can persist
00279   //  Assignment        =USER_ID                // write our own oid
00280   //  Uniqueness        =[default] UNIQUE_ID    // one servant per object
00281   //  ImplicitActivation=[default] IMPLICIT_ACTIVATION // auto activation
00282   //  RequestProcessing =[default] USE_ACTIVE_OBJECT_MAP_ONLY
00283   //  ServantRetention  =[default] RETAIN       // stateless POA
00284   //  Thread            =SINGLE_THREAD_MODEL    // keep it simple
00285 
00286   CORBA::PolicyList policies;
00287   policies.length(3);
00288   policies[0]=p->create_lifespan_policy(PERSISTENT);
00289   policies[1]=p->create_id_assignment_policy(USER_ID);
00290   policies[2]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00291 
00292   try // finally
00293   {
00294       try
00295       {
00296         // Create a new POA (and new POAManager) for this channel.
00297         // The POAManager will be used for all of this channel's POAs.
00298         _poa=p->create_POA(channelName,POAManager::_nil(),policies);
00299         _poaManager=_poa->the_POAManager();
00300       }
00301       catch(POA::AdapterAlreadyExists& ex) // create_POA
00302       {
00303         DB(0,"EventChannel_i::createPoa() - POA::AdapterAlreadyExists")
00304         throw;
00305       }
00306       catch(POA::InvalidPolicy& ex) // create_POA
00307       {
00308         DB(0,"EventChannel_i::createPoa() - POA::InvalidPolicy: "<<ex.index)
00309         throw;
00310       }
00311   }
00312   catch(...) // finally
00313   {
00314     // Destroy the policy objects (Not strictly necessary in omniORB)
00315     for(CORBA::ULong i=0; i<policies.length(); ++i)
00316         policies[i]->destroy();
00317     throw;
00318   }
00319 
00320   // Destroy the policy objects (Not strictly necessary in omniORB)
00321   for(CORBA::ULong i=0; i<policies.length(); ++i)
00322       policies[i]->destroy();
00323 }
00324 
00325 
00326 //
00327 // class EventChannelStore
00328 //
00329 
00330 
00331 EventChannelStore::EventChannelStore()
00332 :_channels(),_lock()
00333 {}
00334 
00335 EventChannelStore::~EventChannelStore()
00336 {
00337   // ?? IMPLEMENT ME
00338 }
00339 
00340 void EventChannelStore::insert(EventChannel_i* channel)
00341 {
00342   omni_mutex_lock l(_lock);
00343   bool insertOK =_channels.insert(channel).second;
00344   if(!insertOK)
00345       DB(2,"Attempted to store an EventChannel, when it is already stored.");
00346 }
00347 
00348 void EventChannelStore::erase(EventChannel_i* channel)
00349 {
00350   omni_mutex_lock l(_lock);
00351   set<EventChannel_i*>::iterator pos =_channels.find(channel);
00352   if(pos==_channels.end())
00353       DB(2,"Failed to erase unknown EventChannel.")
00354   else
00355       _channels.erase(pos);
00356 }
00357 
00358 void EventChannelStore::output(ostream &os)
00359 {
00360   omni_mutex_lock l(_lock);
00361   for(set<EventChannel_i*>::iterator i=_channels.begin();
00362       i!=_channels.end();
00363       ++i)
00364   {
00365     (*i)->output(os);
00366   }
00367 }
00368 
00369 
00370 }; // end namespace OmniEvents
00371 

Generated on Mon Jan 9 03:52:13 2006 for OmniEvents by  doxygen 1.4.6