ProxyPushSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00035 class omni_mutex_kcol {
00036     omni_mutex& mutex;
00037 public:
00038     omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039     ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041     // dummy copy constructor and operator= to prevent copying
00042     omni_mutex_kcol(const omni_mutex_kcol&);
00043     omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045 
00046 
00047 //
00048 //  ProxyPushSupplierManager
00049 //
00050 
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053   const PortableServer::ObjectId& oid,
00054   PortableServer::POA_ptr         poa
00055 )
00056 {
00057   ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058   PauseThenWake p(this);
00059   _servants.insert(result);
00060   return result;
00061 }
00062 
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065   const PortableServer::ObjectId& oid,
00066   PortableServer::POA_ptr         adapter,
00067   PortableServer::Servant         serv,
00068   CORBA::Boolean                  cleanup_in_progress,
00069   CORBA::Boolean                  remaining_activations
00070 )
00071 {
00072   omni_mutex_lock pause(_lock);
00073   ProxyManager::etherealize(oid,adapter,serv,
00074     cleanup_in_progress,remaining_activations);
00075 }
00076 
00077 ProxyPushSupplierManager::ProxyPushSupplierManager(
00078   PortableServer::POA_ptr parentPoa,
00079   EventQueue& q
00080 )
00081 : ProxyManager(parentPoa,"ProxyPushSupplier"),
00082   omni_thread(NULL,PRIORITY_HIGH),
00083   _queue(q),
00084   _lock(),_condition(&_lock),
00085   _refCount(1)
00086 {
00087   start();
00088 }
00089 
00090 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00091 {
00092   DB(20,"~ProxyPushSupplierManager()")
00093 }
00094 
00095 CosEventChannelAdmin::ProxyPushSupplier_ptr
00096 ProxyPushSupplierManager::createObject()
00097 {  
00098   return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00099            _managedPoa.in(),
00100            CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00101          );
00102 }
00103 
00104 void ProxyPushSupplierManager::disconnect()
00105 {
00106   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00107   {
00108     Proxy* p =*i; // Sun's CC requires this temporary.
00109     ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00110     pps->disconnect_push_supplier();
00111   }
00112 }
00113 
00114 void
00115 ProxyPushSupplierManager::run(void*)
00116 {
00117   // This loop repeatedly triggers all of the servants in turn. As long as
00118   // something happens each time, then we loop as fast as we can.
00119   // As soon as activity dries up, we start to wait longer and longer between
00120   // loops (up to a maximum). When there is no work to do, just block until
00121   // a new event arrives.
00122   //
00123   // Rationale: The faster we loop the more events we can deliver to each
00124   // consumer per second. However, when nothing is happening, this busy loop
00125   // just soaks up CPU and kills performance. The optimum sleep time varies
00126   // wildly from platform to platform, and also depends upon the typical ping
00127   // time to the consumers.
00128   //
00129   // This dynamic approach should deliver reasonable performance when things
00130   // are hectic, but not soak up too much CPU when not much is happening.
00131   //
00132   const unsigned long sleepTimeNanosec0 =0x8000;   // 33us (doubled before use)
00133   const unsigned long maxSleepNanosec   =0x800000; // 8.4ms
00134   unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00135 
00136   omni_mutex_lock conditionLock(_lock);
00137   while(true)
00138   {
00139     try {
00140       if(_refCount<1)
00141           break;
00142 
00143       bool busy=false;
00144       bool waiting=false;
00145 
00146       // Trigger each servant in turn.
00147       for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00148       {
00149         Proxy* p =*i; // Sun's CC requires this temporary.
00150         ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00151         pps->trigger(busy,waiting);
00152       }
00153 
00154       if(busy)
00155       {
00156         // Something happened last time round. So we'll be optimistic and
00157         // immediately go round for another go. Yield first, just to let the
00158         // other kids get in if they need to.
00159         omni_mutex_kcol l(_lock); // 'lock' reversed!
00160         omni_thread::yield();
00161         // Reset the sleep time.
00162         sleepTimeNanosec=sleepTimeNanosec0;
00163       }
00164       else if(waiting)
00165       {
00166         // Nothing happened, so we'll wait for a bit and then give it another
00167         // go. Each time we wait for twice as long, up to the maximum.
00168         if(sleepTimeNanosec<maxSleepNanosec)
00169             sleepTimeNanosec<<=1; // (multiply by 2)
00170         unsigned long sec,nsec;
00171         omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00172         _condition.timedwait(sec,nsec);
00173       }
00174       else
00175       {
00176         // There is nothing to do, so block until a new event arrives.
00177         _condition.wait();
00178       }
00179 
00180     }
00181     catch (CORBA::SystemException& ex) {
00182       DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00183          IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00184     }
00185     catch (CORBA::Exception& ex) {
00186       DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00187          IF_OMNIORB4(": "<<ex._name()<<) ".")
00188     }
00189     catch(...) {
00190       DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00191       break;
00192     }
00193   }
00194 }
00195 
00196 void ProxyPushSupplierManager::_add_ref()
00197 {
00198   omni_mutex_lock pause(_lock);
00199   ++_refCount;
00200 }
00201 
00202 void ProxyPushSupplierManager::_remove_ref()
00203 {
00204   int myref;
00205   {
00206     PauseThenWake p(this);
00207     myref = --_refCount;
00208   }
00209   if(myref<0)
00210       DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00211   else if(myref==0)
00212       DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00213 }
00214 
00215 
00216 //
00217 //  ProxyPushSupplier_i
00218 //
00219 
00220 void ProxyPushSupplier_i::connect_push_consumer(
00221   CosEventComm::PushConsumer_ptr pushConsumer)
00222 {
00223   if(CORBA::is_nil(pushConsumer))
00224       throw CORBA::BAD_PARAM();
00225   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00226       throw CosEventChannelAdmin::AlreadyConnected();
00227   _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00228 
00229   // Test to see whether pushSupplier is a ProxyPushSupplier.
00230   // If so, then we will aggressively try to reconnect, when we are reincarnated
00231   CORBA::Request_var req =_target->_request("_is_a");
00232   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00233   req->set_return_type(CORBA::_tc_boolean);
00234   req->send_deferred();
00235   Orb::inst().deferredRequest(req._retn(),this); // Register for callback
00236 
00237   if(omniEventsLog::exists())
00238   {
00239     WriteLock log;
00240     output(log.os);
00241   }
00242 }
00243 
00244 
00245 void ProxyPushSupplier_i::disconnect_push_supplier()
00246 {
00247   DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00248   eraseKey("ConsumerAdmin/ProxyPushSupplier");
00249   deactivateObject();
00250   if(CORBA::is_nil(_target))
00251   {
00252     throw CORBA::OBJECT_NOT_EXIST(
00253       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00254       CORBA::COMPLETED_NO
00255     );
00256   }
00257   else
00258   {
00259     CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00260     req->send_deferred();
00261     Orb::inst().deferredRequest(req._retn());
00262     _target=CosEventComm::PushConsumer::_nil();
00263   }
00264 }
00265 
00266 
00267 ProxyPushSupplier_i::ProxyPushSupplier_i(
00268   PortableServer::POA_ptr poa,
00269   EventQueue&             q
00270 )
00271 : Proxy(poa),
00272   EventQueue::Reader(q),
00273   _target(CosEventComm::PushConsumer::_nil()),
00274   _targetIsProxy(false)
00275 {
00276   // pass
00277 }
00278 
00279 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00280 {
00281   DB(20,"~ProxyPushSupplier_i()")
00282 }
00283 
00284 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00285 {
00286   if(!CORBA::is_nil(_req) && _req->poll_response()) // response has arrived
00287   {
00288     CORBA::Environment_ptr env=_req->env(); // No need to free environment.
00289     if(!CORBA::is_nil(env) && env->exception())
00290     {
00291       // Shut down the connection
00292       CORBA::Exception* ex =env->exception(); // No need to free exception.
00293       DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00294       Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00295       _req=CORBA::Request::_nil();
00296 
00297       // Try to notify the Consumer that the connection is closing.
00298       CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00299       req->send_deferred();
00300       Orb::inst().deferredRequest(req._retn());
00301 
00302       _target=CosEventComm::PushConsumer::_nil(); // disconnected.
00303       eraseKey("ConsumerAdmin/ProxyPushSupplier");
00304       deactivateObject();
00305       return; // No more work to do
00306     }
00307     _req=CORBA::Request::_nil();
00308     busy=true;
00309   }
00310   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00311   {
00312     _req=_target->_request("push");
00313     _req->add_in_arg() <<= *(nextEvent());
00314     _req->send_deferred();
00315     busy=true;
00316   }
00317   if(!CORBA::is_nil(_req)) // More work to do, if _req NOT nil.
00318       waiting=true;
00319 }
00320 
00321 
00322 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00323 {
00324   if(_targetIsProxy)
00325   {
00326     // There should only ever be one of these callbacks per proxy,
00327     // because each proxy should only be connected once.
00328     DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00329   }
00330   else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00331   {
00332     if(_targetIsProxy && omniEventsLog::exists())
00333     {
00334       WriteLock log;
00335       output(log.os);
00336       DB(15,"ProxyPushSupplier is federated.");
00337     }
00338   }
00339   else
00340   {
00341     DB(2,"ProxyPushSupplier got unexpected callback.");
00342     _targetIsProxy=false; // Reset it just to be sure.
00343   }
00344 }
00345 
00346 
00347 void ProxyPushSupplier_i::reincarnate(
00348   const string&      oid,
00349   const PersistNode& node
00350 )
00351 {
00352   try
00353   {
00354     using namespace CosEventChannelAdmin;
00355 
00356     string ior( node.attrString("IOR").c_str() );
00357     CosEventComm::PushConsumer_var pushConsumer =
00358       string_to_<CosEventComm::PushConsumer>(ior.c_str());
00359     // Do not activate until we know that we have read a valid target.
00360     activateObjectWithId(oid.c_str());
00361     _target=pushConsumer._retn();
00362     _targetIsProxy=bool(node.attrLong("proxy"));
00363 
00364     // If pushConsumer is a proxy, then try to reconnect.
00365     if(_targetIsProxy)
00366     {
00367       DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00368       // This will only work if the proxy is implemented in the same way as
00369       // omniEvents, so connect_() automatically creates a proxy.
00370       ProxyPushConsumer_var proxyCons =
00371         string_to_<ProxyPushConsumer>(ior.c_str());
00372       CosEventComm::PushSupplier_var thisSupp =_this();
00373       proxyCons->connect_push_supplier(thisSupp);
00374       DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00375     }
00376   }
00377   catch(CosEventChannelAdmin::AlreadyConnected&){ // connect_push_supplier()
00378     // The supplier doesn't need to be reconnected.
00379     DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00380   }
00381   catch(CosEventChannelAdmin::TypeError&){ // connect_push_supplier()
00382     // Don't know what to make of this...
00383     DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00384   }
00385   catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'pushConsumer' not responding.
00386   catch(CORBA::TRANSIENT&       ) {} // object 'pushConsumer' not responding.
00387   catch(CORBA::COMM_FAILURE&    ) {} // object 'pushConsumer' not responding.
00388 }
00389 
00390 
00391 void ProxyPushSupplier_i::output(ostream &os)
00392 {
00393   basicOutput(
00394     os,"ConsumerAdmin/ProxyPushSupplier",
00395     _target.in(),
00396     _targetIsProxy? " proxy=1": NULL
00397   );
00398 }
00399 
00400 
00401 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:14 2006 for OmniEvents by  doxygen 1.4.6