ProxyPushConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPushConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPushConsumer.h"
00025 #include "ConsumerAdmin.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 
00030 #include <assert.h>
00031 
00032 namespace OmniEvents {
00033 
00034 void ProxyPushConsumer_i::connect_push_supplier(
00035   CosEventComm::PushSupplier_ptr pushSupplier)
00036 {
00037   // pushSupplier is permitted to be nil.
00038   if(CORBA::is_nil(pushSupplier))
00039       return;
00040 
00041   string oidstr =currentObjectId();
00042   Connections_t::iterator pos =_connections.find(oidstr);
00043 
00044   if(pos!=_connections.end())
00045       throw CosEventChannelAdmin::AlreadyConnected();
00046 
00047   Connection& newConnection =
00048     _connections.insert(Connections_t::value_type(
00049       oidstr,
00050       Connection(
00051         _channelName.in(),
00052         oidstr,
00053         CosEventComm::PushSupplier::_duplicate(pushSupplier)
00054       )
00055     )).first->second; // insert() returns pair<iterator,bool>
00056 
00057   // Test to see whether pushSupplier is a ProxyPushSupplier.
00058   // If so, then we will aggressively try to reconnect, when we are reincarnated
00059   CORBA::Request_var req =pushSupplier->_request("_is_a");
00060   req->add_in_arg() <<= CosEventChannelAdmin::_tc_ProxyPushSupplier->id();
00061   req->set_return_type(CORBA::_tc_boolean);
00062   req->send_deferred();
00063   Orb::inst().deferredRequest(req._retn(),&newConnection); // Register callback
00064 
00065   if(omniEventsLog::exists())
00066   {
00067     WriteLock log;
00068     newConnection.output(log.os);
00069   }
00070 }
00071 
00072 
00073 void ProxyPushConsumer_i::disconnect_push_consumer()
00074 {
00075 #ifdef HAVE_OMNIORB4
00076   DB(5,"ProxyPushConsumer_i::disconnect_push_consumer()")
00077   string oidstr =currentObjectId();
00078   Connections_t::iterator pos =_connections.find(oidstr);
00079 
00080   if(pos!=_connections.end())
00081   {
00082     CORBA::Request_var req =
00083       pos->second._target->_request("disconnect_push_supplier");
00084     req->send_deferred();
00085     Orb::inst().deferredRequest(req._retn());
00086     if(omniEventsLog::exists())
00087     {
00088       // Erase this connection from the log file.
00089       WriteLock log;
00090       log.os<<"-ecf/"<<_channelName.in();
00091       log.os<<"/SupplierAdmin/ProxyPushConsumer/"<<oidstr<<'\n';
00092     }
00093     _connections.erase(pos);
00094   }
00095 #else /* Silently ignore disconnects with omniORB3 */
00096   DB(5,"Ignoring disconnect_push_consumer(). Upgrade to omniORB4!")
00097 #endif
00098 }
00099 
00100 
00101 void ProxyPushConsumer_i::push(const CORBA::Any& event)
00102 {
00103 #ifdef OMNIEVENTS_REAL_TIME_PUSH
00104   if(!_useLocalQueue)
00105   {
00106     _consumerAdmin.send(new CORBA::Any(event));
00107     _useLocalQueue=true;
00108   }
00109   else
00110 #endif
00111     _queue.push_back(new CORBA::Any(event));
00112 }
00113 
00114 
00115 ProxyPushConsumer_i::ProxyPushConsumer_i(
00116   PortableServer::POA_ptr p,
00117   list<CORBA::Any*>&      q,
00118   ConsumerAdmin_i&        consumerAdmin
00119 )
00120 : Servant(PortableServer::POA::_nil()),
00121   _connections(),
00122   _channelName(p->the_name()),
00123   _consumerAdmin(consumerAdmin),
00124   _queue(q),
00125   _useLocalQueue(false)
00126 {
00127   using namespace PortableServer;
00128 
00129   // POLICIES:
00130   //  Lifespan          =PERSISTENT             // we can persist
00131   //  Assignment        =USER_ID                // write our own oid
00132   //  Uniqueness        =MULTIPLE_ID            // only one servant
00133   //  ImplicitActivation=NO_IMPLICIT_ACTIVATION // disable auto activation
00134   //  RequestProcessing =USE_DEFAULT_SERVANT    // only one servant
00135   //  ServantRetention  =NON_RETAIN             // stateless POA
00136   //  Thread            =SINGLE_THREAD_MODEL    // keep it simple
00137 
00138   CORBA::PolicyList policies;
00139   policies.length(7);
00140   policies[0]=p->create_lifespan_policy(PERSISTENT);
00141   policies[1]=p->create_id_assignment_policy(USER_ID);
00142   policies[2]=p->create_id_uniqueness_policy(MULTIPLE_ID);
00143   policies[3]=p->create_implicit_activation_policy(NO_IMPLICIT_ACTIVATION);
00144   policies[4]=p->create_request_processing_policy(USE_DEFAULT_SERVANT);
00145   policies[5]=p->create_servant_retention_policy(NON_RETAIN);
00146   policies[6]=p->create_thread_policy(SINGLE_THREAD_MODEL);
00147 
00148   try
00149   {  
00150     // Create a POA for this proxy type in this channel.
00151     string          poaName =string(_channelName.in())+".ProxyPushConsumer";
00152     POAManager_var  parentManager =p->the_POAManager();
00153     _poa=p->create_POA(poaName.c_str(),parentManager.in(),policies);
00154 
00155 
00156   }
00157   catch(POA::AdapterAlreadyExists&) // create_POA
00158   {
00159     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00160           "POA::AdapterAlreadyExists")
00161   }
00162   catch(POA::InvalidPolicy& ex) // create_POA
00163   {
00164     DB(0,"ProxyPushConsumer_i::ProxyPushConsumer_i() - "
00165           "POA::InvalidPolicy: "<<ex.index)
00166   }
00167 
00168   // Destroy the policy objects (Not strictly necessary in omniORB)
00169   for(CORBA::ULong i=0; i<policies.length(); ++i)
00170       policies[i]->destroy();
00171 
00172   // This object is the POA's default servant.
00173   _poa->set_servant(this);
00174 }
00175 
00176 
00177 ProxyPushConsumer_i::~ProxyPushConsumer_i()
00178 {
00179   DB(20,"~ProxyPushConsumer_i()")
00180 }
00181 
00182 
00183 CosEventChannelAdmin::ProxyPushConsumer_ptr
00184 ProxyPushConsumer_i::createObject()
00185 {
00186   return createNarrowedReference<CosEventChannelAdmin::ProxyPushConsumer>(
00187            _poa.in(),
00188            CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00189          );
00190 }
00191 
00192 
00193 void ProxyPushConsumer_i::disconnect()
00194 {
00195   Connections_t::iterator curr,next=_connections.begin();
00196   while(next!=_connections.end())
00197   {
00198     curr=next++;
00199     CORBA::Request_var req =
00200       curr->second._target->_request("disconnect_push_supplier");
00201     req->send_deferred();
00202     Orb::inst().deferredRequest(req._retn());
00203     _connections.erase(curr);
00204   }
00205 }
00206 
00207 
00208 void ProxyPushConsumer_i::reincarnate(const PersistNode& node)
00209 {
00210   // Reincarnate all connections from node's children.
00211   for(map<string,PersistNode*>::const_iterator i=node._child.begin();
00212       i!=node._child.end();
00213       ++i)
00214   {
00215     const char* oidstr =i->first.c_str();
00216     string      ior( i->second->attrString("IOR") );
00217     bool        isProxy( i->second->attrLong("proxy") );
00218     assert(_connections.find(oidstr)==_connections.end());
00219     try
00220     {
00221       using namespace CosEventComm;
00222       using namespace CosEventChannelAdmin;
00223 
00224       PushSupplier_var supp =string_to_<PushSupplier>(ior.c_str());
00225       _connections.insert(Connections_t::value_type(
00226         oidstr,
00227         Connection(_channelName.in(),oidstr,supp._retn(),isProxy)
00228       ));
00229       DB(5,"Reincarnated ProxyPushConsumer: "<<oidstr)
00230 
00231       // If supp is a ProxyPushSupplier, then try to reconnect.
00232       if(isProxy)
00233       {
00234         DB(15,"Attempting to reconnect ProxyPushConsumer: "<<oidstr)
00235         // This will only work if the proxy is implemented in the same way as
00236         // omniEvents, so connect_() automatically creates a proxy.
00237         ProxyPushSupplier_var proxySupp =
00238           string_to_<ProxyPushSupplier>(ior.c_str());
00239         PortableServer::ObjectId_var objectId =
00240           PortableServer::string_to_ObjectId(oidstr);
00241         CORBA::Object_var obj =
00242           _poa->create_reference_with_id(
00243             objectId.in(),
00244             CosEventChannelAdmin::_tc_ProxyPushConsumer->id()
00245           );
00246         PushConsumer_var thisCons =CosEventComm::PushConsumer::_narrow(obj);
00247         proxySupp->connect_push_consumer(thisCons.in());
00248         DB(7,"Reconnected ProxyPushConsumer: "<<oidstr)
00249       }
00250     }
00251     catch(CORBA::BAD_PARAM&) {
00252       // This will happen when IOR fails to narrow.
00253       DB(5,"Failed to reincarnate ProxyPushConsumer: "<<oidstr)
00254     }
00255     catch(CosEventChannelAdmin::AlreadyConnected&){ //connect_push_consumer()
00256       // The supplier doesn't need to be reconnected.
00257       DB(7,"Remote ProxyPushSupplier already connected: "<<oidstr)
00258     }
00259     catch(CosEventChannelAdmin::TypeError&){ // connect_push_consumer()
00260       // Don't know what to make of this...
00261       DB(2,"Remote ProxyPushSupplier threw TypeError: "<<oidstr)
00262     }
00263     catch(CORBA::OBJECT_NOT_EXIST&) {} // object 'supp' not responding.
00264     catch(CORBA::TRANSIENT&       ) {} // object 'supp' not responding.
00265     catch(CORBA::COMM_FAILURE&    ) {} // object 'supp' not responding.
00266   } // end loop for(i)
00267 }
00268 
00269 
00270 void ProxyPushConsumer_i::output(ostream& os) const
00271 {
00272   for(Connections_t::const_iterator i=_connections.begin();
00273       i!=_connections.end();
00274       ++i)
00275   {
00276     i->second.output(os);
00277   }
00278 }
00279 
00280 
00281 string ProxyPushConsumer_i::currentObjectId() const
00282 {
00283 #ifdef HAVE_OMNIORB4
00284   try
00285   {
00286     using namespace PortableServer;
00287     ObjectId_var oid =Orb::inst()._POACurrent->get_object_id();
00288     CORBA::String_var oidStr =ObjectId_to_string(oid.in());
00289     return string(oidStr.in());
00290   }
00291   catch(PortableServer::Current::NoContext&) // get_object_id()
00292   {
00293     DB(0,"No context!!")
00294   }
00295   catch(CORBA::BAD_PARAM&) // ObjectId_to_string()
00296   {
00297     // Should never get here in omniORB, because ObjectID is a char*.
00298     assert(0);
00299   }
00300   return "ERROR";
00301 #else
00302   throw CORBA::NO_IMPLEMENT();
00303 #endif
00304 }
00305 
00306 
00307 //
00308 //  ProxyPushConsumer_i::Connection
00309 //
00310 
00311 ProxyPushConsumer_i::Connection::Connection(
00312   const char*                    channelName,
00313   const string&                  oidstr,
00314   CosEventComm::PushSupplier_ptr pushSupplier,
00315   bool                           isProxy
00316 ):_channelName(channelName),
00317   _oidstr(oidstr),
00318   _target(pushSupplier),
00319   _targetIsProxy(isProxy)
00320 {
00321   // pass
00322 }
00323 
00324 void ProxyPushConsumer_i::Connection::callback(CORBA::Request_ptr req)
00325 {
00326   bool save =_targetIsProxy;
00327   if(req->return_value()>>=CORBA::Any::to_boolean(_targetIsProxy))
00328   {
00329     if(_targetIsProxy && omniEventsLog::exists())
00330     {
00331       WriteLock log;
00332       output(log.os);
00333       DB(15,"ProxyPushConsumer is federated.");
00334     }
00335   }
00336   else
00337   {
00338     DB(2,"ProxyPushConsumer got unexpected callback.");
00339     _targetIsProxy=save; // Reset it just to be sure.
00340   }
00341 }
00342 
00343 void ProxyPushConsumer_i::Connection::output(ostream& os) const
00344 {
00345   os<<"ecf/"<<_channelName;
00346   os<<"/SupplierAdmin/ProxyPushConsumer/"<<_oidstr;
00347 
00348   if(!CORBA::is_nil(_target.in()))
00349   {
00350     CORBA::String_var iorstr;
00351     iorstr = Orb::inst()._orb->object_to_string(_target.in());
00352     os<<" IOR="<<iorstr.in();
00353     if(_targetIsProxy)
00354         os<<" proxy=1";
00355   }
00356   os<<" ;;\n";
00357 }
00358 
00359 
00360 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:14 2006 for OmniEvents by  doxygen 1.4.6