00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPushConsumer.h"
00025 #include "ConsumerAdmin.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029
00030 #include <assert.h>
00031
00032 namespace OmniEvents {
00033
00034 void ProxyPushConsumer_i::connect_push_supplier(
00035 CosEventComm::PushSupplier_ptr pushSupplier)
00036 {
00037
00038 if(CORBA::is_nil(pushSupplier))
00039 return;
00040
00041 string oidstr =currentObjectId();
00042 Connections_t::iterator pos =_connections.find(oidstr);
00043
00044 if(pos!=_connections.end())
00045 throw CosEventChannelAdmin::AlreadyConnected();
00046
00047 Connection& newConnection =
00048 _connections.insert(Connections_t::value_type(
00049 oidstr,
00050 Connection(
00051 _channelName.in(),
00052 oidstr,
00053 CosEventComm::PushSupplier::_duplicate(pushSupplier)
00054 )
00055 )).first->second;
00056
00057
00058
00059 CORBA::Request_var req =pushSupplier->_request("_is_a");
00060 req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
00061 req->set_return_type(CORBA::_tc_boolean);
00062 req->send_deferred();
00063 Orb::inst().deferredRequest(req._retn(),&newConnection);
00064
00065 if(omniEventsLog::exists())
00066 {
00067 WriteLock log;
00068 newConnection.output(log.os);
00069 }
00070 }
00071
00072
00073 void ProxyPushConsumer_i::disconnect_push_consumer()
00074 {
00075 #ifdef HAVE_OMNIORB4
00076 DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
00077 string oidstr =currentObjectId();
00078 Connections_t::iterator pos =_connections.find(oidstr);
00079
00080 if(pos!=_connections.end())
00081 {
00082 CORBA::Request_var req =
00083 pos->second._target->_request("disconnect_push_supplier");
00084 req->send_deferred();
00085 Orb::inst().deferredRequest(req._retn());
00086 if(omniEventsLog::exists())
00087 {
00088
00089 WriteLock log;
00090 log.os<<"-ecf/"<<_channelName.in();
00091 log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
00092 }
00093 _connections.erase(pos);
00094 }
00095 #else
00096 DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
00097 #endif
00098 }
00099
00100
00101 void ProxyPushConsumer_i::push(const CORBA::Any& event)
00102 {
00103 #ifdef OMNIEVENTS_REAL_TIME_PUSH
00104 if(!_useLocalQueue)
00105 {
00106 _consumerAdmin.send(new CORBA::Any(event));
00107 _useLocalQueue=true;
00108 }
00109 else
00110 #endif
00111 _queue.push_back(new CORBA::Any(event));
00112 }
00113
00114
00115 ProxyPushConsumer_i::ProxyPushConsumer_i(
00116 PortableServer::POA_ptr p,
00117 list<CORBA::Any*>& q,
00118 ConsumerAdmin_i& consumerAdmin
00119 )
00120 : Servant(PortableServer::POA::_nil()),
00121 _connections(),
00122 _channelName(p->the_name()),
00123 _consumerAdmin(consumerAdmin),
00124 _queue(q),
00125 _useLocalQueue(false)
00126 {
00127 using namespace PortableServer;
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138 CORBA::PolicyList policies;
00139 policies.length(7);
00140 policies[0]=p->create_lifespan_policy(PERSISTENT);
00141 policies[1]=p->create_id_assignment_policy(USER_ID);
00142 policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
00143 policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
00144 policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
00145 policies[5]=p->create_servant_retention_policy(NON_RETAIN);
00146 policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00147
00148 try
00149 {
00150
00151 string poaName =string(_channelName.in())+".ProxyPushConsumer";
00152 POAManager_var parentManager =p->the_POAManager();
00153 _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
00154
00155
00156 }
00157 catch(POA::AdapterAlreadyExists&)
00158 {
00159 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00160 "POA::AdapterAlreadyExists")
00161 }
00162 catch(POA::InvalidPolicy& ex)
00163 {
00164 DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00165 "POA::InvalidPolicy: "<<ex.index)
00166 }
00167
00168
00169 for(CORBA::ULong i=0; i<policies.length(); ++i)
00170 policies[i]->destroy();
00171
00172
00173 _poa->set_servant(this);
00174 }
00175
00176
00177 ProxyPushConsumer_i::~ProxyPushConsumer_i()
00178 {
00179 DB(20,"~ProxyPushConsumer_i()")
00180 }
00181
00182
00183 CosEventChannelAdmin::ProxyPushConsumer_ptr
00184 ProxyPushConsumer_i::createObject()
00185 {
00186 return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
00187 _poa.in(),
00188 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00189 );
00190 }
00191
00192
00193 void ProxyPushConsumer_i::disconnect()
00194 {
00195 Connections_t::iterator curr,next=_connections.begin();
00196 while(next!=_connections.end())
00197 {
00198 curr=next++;
00199 CORBA::Request_var req =
00200 curr->second._target->_request("disconnect_push_supplier");
00201 req->send_deferred();
00202 Orb::inst().deferredRequest(req._retn());
00203 _connections.erase(curr);
00204 }
00205 }
00206
00207
00208 void ProxyPushConsumer_i::reincarnate(const PersistNode& node)
00209 {
00210
00211 for(map<string,PersistNode*>::const_iterator i=node._child.begin();
00212 i!=node._child.end();
00213 ++i)
00214 {
00215 const char* oidstr =i->first.c_str();
00216 string ior( i->second->attrString("IOR") );
00217 bool isProxy( i->second->attrLong("proxy") );
00218 assert(_connections.find(oidstr)==_connections.end());
00219 try
00220 {
00221 using namespace CosEventComm;
00222 using namespace CosEventChannelAdmin;
00223
00224 PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
00225 _connections.insert(Connections_t::value_type(
00226 oidstr,
00227 Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
00228 ));
00229 DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
00230
00231
00232 if(isProxy)
00233 {
00234 DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
00235
00236
00237 ProxyPushSupplier_var proxySupp =
00238 string_to_<ProxyPushSupplier>(ior.c_str());
00239 PortableServer::ObjectId_var objectId =
00240 PortableServer::string_to_ObjectId(oidstr);
00241 CORBA::Object_var obj =
00242 _poa->create_reference_with_id(
00243 objectId.in(),
00244 CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00245 );
00246 PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
00247 proxySupp->connect_push_consumer(thisCons.in());
00248 DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
00249 }
00250 }
00251 catch(CORBA::BAD_PARAM&) {
00252
00253 DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
00254 }
00255 catch(CosEventChannelAdmin::AlreadyConnected&){
00256
00257 DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
00258 }
00259 catch(CosEventChannelAdmin::TypeError&){
00260
00261 DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
00262 }
00263 catch(CORBA::OBJECT_NOT_EXIST&) {}
00264 catch(CORBA::TRANSIENT& ) {}
00265 catch(CORBA::COMM_FAILURE& ) {}
00266 }
00267 }
00268
00269
00270 void ProxyPushConsumer_i::output(ostream& os) const
00271 {
00272 for(Connections_t::const_iterator i=_connections.begin();
00273 i!=_connections.end();
00274 ++i)
00275 {
00276 i->second.output(os);
00277 }
00278 }
00279
00280
00281 string ProxyPushConsumer_i::currentObjectId() const
00282 {
00283 #ifdef HAVE_OMNIORB4
00284 try
00285 {
00286 using namespace PortableServer;
00287 ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
00288 CORBA::String_var oidStr =ObjectId_to_string(oid.in());
00289 return string(oidStr.in());
00290 }
00291 catch(PortableServer::Current::NoContext&)
00292 {
00293 DB(0,"No context!!")
00294 }
00295 catch(CORBA::BAD_PARAM&)
00296 {
00297
00298 assert(0);
00299 }
00300 return "ERROR";
00301 #else
00302 throw CORBA::NO_IMPLEMENT();
00303 #endif
00304 }
00305
00306
00307
00308
00309
00310
00311 ProxyPushConsumer_i::Connection::Connection(
00312 const char* channelName,
00313 const string& oidstr,
00314 CosEventComm::PushSupplier_ptr pushSupplier,
00315 bool isProxy
00316 ):_channelName(channelName),
00317 _oidstr(oidstr),
00318 _target(pushSupplier),
00319 _targetIsProxy(isProxy)
00320 {
00321
00322 }
00323
00324 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
00325 {
00326 bool save =_targetIsProxy;
00327 if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00328 {
00329 if(_targetIsProxy && omniEventsLog::exists())
00330 {
00331 WriteLock log;
00332 output(log.os);
00333 DB(15,"ProxyPushConsumer is federated.");
00334 }
00335 }
00336 else
00337 {
00338 DB(2,"ProxyPushConsumer got unexpected callback.");
00339 _targetIsProxy=save;
00340 }
00341 }
00342
00343 void ProxyPushConsumer_i::Connection::output(ostream& os) const
00344 {
00345 os<<"ecf/"<<_channelName;
00346 os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
00347
00348 if(!CORBA::is_nil(_target.in()))
00349 {
00350 CORBA::String_var iorstr;
00351 iorstr = Orb::inst()._orb->object_to_string(_target.in());
00352 os<<" IOR="<<iorstr.in();
00353 if(_targetIsProxy)
00354 os<<" proxy=1";
00355 }
00356 os<<" ;;\n";
00357 }
00358
00359
00360 };