00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPushSupplier.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00035 class omni_mutex_kcol {
00036 omni_mutex& mutex;
00037 public:
00038 omni_mutex_kcol(omni_mutex& m) : mutex(m) { mutex.unlock(); }
00039 ~omni_mutex_kcol(void) { mutex.lock(); }
00040 private:
00041
00042 omni_mutex_kcol(const omni_mutex_kcol&);
00043 omni_mutex_kcol& operator=(const omni_mutex_kcol&);
00044 };
00045
00046
00047
00048
00049
00050
00051 PortableServer::Servant
00052 ProxyPushSupplierManager::incarnate(
00053 const PortableServer::ObjectId& oid,
00054 PortableServer::POA_ptr poa
00055 )
00056 {
00057 ProxyPushSupplier_i* result =new ProxyPushSupplier_i(_managedPoa,_queue);
00058 PauseThenWake p(this);
00059 _servants.insert(result);
00060 return result;
00061 }
00062
00063 void
00064 ProxyPushSupplierManager::etherealize(
00065 const PortableServer::ObjectId& oid,
00066 PortableServer::POA_ptr adapter,
00067 PortableServer::Servant serv,
00068 CORBA::Boolean cleanup_in_progress,
00069 CORBA::Boolean remaining_activations
00070 )
00071 {
00072 omni_mutex_lock pause(_lock);
00073 ProxyManager::etherealize(oid,adapter,serv,
00074 cleanup_in_progress,remaining_activations);
00075 }
00076
00077 ProxyPushSupplierManager::ProxyPushSupplierManager(
00078 PortableServer::POA_ptr parentPoa,
00079 EventQueue& q
00080 )
00081 : ProxyManager(parentPoa,"ProxyPushSupplier"),
00082 omni_thread(NULL,PRIORITY_HIGH),
00083 _queue(q),
00084 _lock(),_condition(&_lock),
00085 _refCount(1)
00086 {
00087 start();
00088 }
00089
00090 ProxyPushSupplierManager::~ProxyPushSupplierManager()
00091 {
00092 DB(20,"~ProxyPushSupplierManager()")
00093 }
00094
00095 CosEventChannelAdmin::ProxyPushSupplier_ptr
00096 ProxyPushSupplierManager::createObject()
00097 {
00098 return createNarrowedReference<CosEventChannelAdmin::ProxyPushSupplier>(
00099 _managedPoa.in(),
00100 CosEventChannelAdmin::_tc_ProxyPushSupplier->id()
00101 );
00102 }
00103
00104 void ProxyPushSupplierManager::disconnect()
00105 {
00106 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00107 {
00108 Proxy* p =*i;
00109 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00110 pps->disconnect_push_supplier();
00111 }
00112 }
00113
00114 void
00115 ProxyPushSupplierManager::run(void*)
00116 {
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132 const unsigned long sleepTimeNanosec0 =0x8000;
00133 const unsigned long maxSleepNanosec =0x800000;
00134 unsigned long sleepTimeNanosec =sleepTimeNanosec0;
00135
00136 omni_mutex_lock conditionLock(_lock);
00137 while(true)
00138 {
00139 try {
00140 if(_refCount<1)
00141 break;
00142
00143 bool busy=false;
00144 bool waiting=false;
00145
00146
00147 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00148 {
00149 Proxy* p =*i;
00150 ProxyPushSupplier_i* pps =static_cast<ProxyPushSupplier_i*>(p);
00151 pps->trigger(busy,waiting);
00152 }
00153
00154 if(busy)
00155 {
00156
00157
00158
00159 omni_mutex_kcol l(_lock);
00160 omni_thread::yield();
00161
00162 sleepTimeNanosec=sleepTimeNanosec0;
00163 }
00164 else if(waiting)
00165 {
00166
00167
00168 if(sleepTimeNanosec<maxSleepNanosec)
00169 sleepTimeNanosec<<=1;
00170 unsigned long sec,nsec;
00171 omni_thread::get_time(&sec,&nsec,0,sleepTimeNanosec);
00172 _condition.timedwait(sec,nsec);
00173 }
00174 else
00175 {
00176
00177 _condition.wait();
00178 }
00179
00180 }
00181 catch (CORBA::SystemException& ex) {
00182 DB(2,"ProxyPushSupplierManager ignoring CORBA system exception"
00183 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00184 }
00185 catch (CORBA::Exception& ex) {
00186 DB(2,"ProxyPushSupplierManager ignoring CORBA exception"
00187 IF_OMNIORB4(": "<<ex._name()<<) ".")
00188 }
00189 catch(...) {
00190 DB(2,"ProxyPushSupplierManager thread killed by unknown exception.")
00191 break;
00192 }
00193 }
00194 }
00195
00196 void ProxyPushSupplierManager::_add_ref()
00197 {
00198 omni_mutex_lock pause(_lock);
00199 ++_refCount;
00200 }
00201
00202 void ProxyPushSupplierManager::_remove_ref()
00203 {
00204 int myref;
00205 {
00206 PauseThenWake p(this);
00207 myref = --_refCount;
00208 }
00209 if(myref<0)
00210 DB(2,"ProxyPushSupplierManager has negative ref count! "<<myref)
00211 else if(myref==0)
00212 DB(15,"ProxyPushSupplierManager has zero ref count -- shutdown.")
00213 }
00214
00215
00216
00217
00218
00219
00220 void ProxyPushSupplier_i::connect_push_consumer(
00221 CosEventComm::PushConsumer_ptr pushConsumer)
00222 {
00223 if(CORBA::is_nil(pushConsumer))
00224 throw CORBA::BAD_PARAM();
00225 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00226 throw CosEventChannelAdmin::AlreadyConnected();
00227 _target=CosEventComm::PushConsumer::_duplicate(pushConsumer);
00228
00229
00230
00231 CORBA::Request_var req =_target->_request("_is_a");
00232 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushConsumer->id();
00233 req->set_return_type(CORBA::_tc_boolean);
00234 req->send_deferred();
00235 Orb::inst().deferredRequest(req._retn(),this);
00236
00237 if(omniEventsLog::exists())
00238 {
00239 WriteLock log;
00240 output(log.os);
00241 }
00242 }
00243
00244
00245 void ProxyPushSupplier_i::disconnect_push_supplier()
00246 {
00247 DB(5,"ProxyPushSupplier_i::disconnect_push_supplier()");
00248 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00249 deactivateObject();
00250 if(CORBA::is_nil(_target))
00251 {
00252 throw CORBA::OBJECT_NOT_EXIST(
00253 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00254 CORBA::COMPLETED_NO
00255 );
00256 }
00257 else
00258 {
00259 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00260 req->send_deferred();
00261 Orb::inst().deferredRequest(req._retn());
00262 _target=CosEventComm::PushConsumer::_nil();
00263 }
00264 }
00265
00266
00267 ProxyPushSupplier_i::ProxyPushSupplier_i(
00268 PortableServer::POA_ptr poa,
00269 EventQueue& q
00270 )
00271 : Proxy(poa),
00272 EventQueue::Reader(q),
00273 _target(CosEventComm::PushConsumer::_nil()),
00274 _targetIsProxy(false)
00275 {
00276
00277 }
00278
00279 ProxyPushSupplier_i::~ProxyPushSupplier_i()
00280 {
00281 DB(20,"~ProxyPushSupplier_i()")
00282 }
00283
00284 inline void ProxyPushSupplier_i::trigger(bool& busy, bool& waiting)
00285 {
00286 if(!CORBA::is_nil(_req) && _req->poll_response())
00287 {
00288 CORBA::Environment_ptr env=_req->env();
00289 if(!CORBA::is_nil(env) && env->exception())
00290 {
00291
00292 CORBA::Exception* ex =env->exception();
00293 DB(10,"ProxyPushSupplier got exception" IF_OMNIORB4(": "<<ex->_name()) );
00294 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00295 _req=CORBA::Request::_nil();
00296
00297
00298 CORBA::Request_var req=_target->_request("disconnect_push_consumer");
00299 req->send_deferred();
00300 Orb::inst().deferredRequest(req._retn());
00301
00302 _target=CosEventComm::PushConsumer::_nil();
00303 eraseKey("ConsumerAdmin/ProxyPushSupplier");
00304 deactivateObject();
00305 return;
00306 }
00307 _req=CORBA::Request::_nil();
00308 busy=true;
00309 }
00310 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target) && moreEvents())
00311 {
00312 _req=_target->_request("push");
00313 _req->add_in_arg() <<= *(nextEvent());
00314 _req->send_deferred();
00315 busy=true;
00316 }
00317 if(!CORBA::is_nil(_req))
00318 waiting=true;
00319 }
00320
00321
00322 void ProxyPushSupplier_i::callback(CORBA::Request_ptr req)
00323 {
00324 if(_targetIsProxy)
00325 {
00326
00327
00328 DB(2,"WARNING: Multiple connections to ProxyPushSupplier.");
00329 }
00330 else if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00331 {
00332 if(_targetIsProxy && omniEventsLog::exists())
00333 {
00334 WriteLock log;
00335 output(log.os);
00336 DB(15,"ProxyPushSupplier is federated.");
00337 }
00338 }
00339 else
00340 {
00341 DB(2,"ProxyPushSupplier got unexpected callback.");
00342 _targetIsProxy=false;
00343 }
00344 }
00345
00346
00347 void ProxyPushSupplier_i::reincarnate(
00348 const string& oid,
00349 const PersistNode& node
00350 )
00351 {
00352 try
00353 {
00354 using namespace CosEventChannelAdmin;
00355
00356 string ior( node.attrString("IOR").c_str() );
00357 CosEventComm::PushConsumer_var pushConsumer =
00358 string_to_<CosEventComm::PushConsumer>(ior.c_str());
00359
00360 activateObjectWithId(oid.c_str());
00361 _target=pushConsumer._retn();
00362 _targetIsProxy=bool(node.attrLong("proxy"));
00363
00364
00365 if(_targetIsProxy)
00366 {
00367 DB(15,"Attempting to reconnect ProxyPushSupplier: "<<oid.c_str())
00368
00369
00370 ProxyPushConsumer_var proxyCons =
00371 string_to_<ProxyPushConsumer>(ior.c_str());
00372 CosEventComm::PushSupplier_var thisSupp =_this();
00373 proxyCons->connect_push_supplier(thisSupp);
00374 DB(7,"Reconnected ProxyPushSupplier: "<<oid.c_str())
00375 }
00376 }
00377 catch(CosEventChannelAdmin::AlreadyConnected&){
00378
00379 DB(7,"Remote ProxyPushConsumer already connected: "<<oid.c_str())
00380 }
00381 catch(CosEventChannelAdmin::TypeError&){
00382
00383 DB(2,"Remote ProxyPushConsumer threw TypeError: "<<oid.c_str())
00384 }
00385 catch(CORBA::OBJECT_NOT_EXIST&) {}
00386 catch(CORBA::TRANSIENT& ) {}
00387 catch(CORBA::COMM_FAILURE& ) {}
00388 }
00389
00390
00391 void ProxyPushSupplier_i::output(ostream &os)
00392 {
00393 basicOutput(
00394 os,"ConsumerAdmin/ProxyPushSupplier",
00395 _target.in(),
00396 _targetIsProxy? " proxy=1": NULL
00397 );
00398 }
00399
00400
00401 };