ProxyPullConsumer.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullConsumer.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029 
00030 namespace OmniEvents {
00031 
00032 //
00033 //  ProxyPullConsumerManager
00034 //
00035 
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   DB(20,"ProxyPullConsumerManager::incarnate()")
00043   ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044   _servants.insert(result);
00045   return result;
00046 }
00047 
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049   PortableServer::POA_ptr parentPoa,
00050   list<CORBA::Any*>&      q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053   _queue(q)
00054 {
00055   // pass
00056 }
00057 
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060   DB(20,"~ProxyPullConsumerManager()")
00061 }
00062 
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066   return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067            _managedPoa.in(),
00068            CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069          );
00070 }
00071 
00072 void ProxyPullConsumerManager::collect()
00073 {
00074   // Collect events from each servant in turn.
00075   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076   {
00077     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078     proxy->collect();
00079   }
00080 }
00081 
00082 void ProxyPullConsumerManager::triggerRequest()
00083 {
00084   // Trigger each servant in turn.
00085   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00086   {
00087     ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00088     proxy->triggerRequest();
00089   }
00090 }
00091 
00092 void ProxyPullConsumerManager::disconnect()
00093 {
00094   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00095   {
00096     Proxy* p =*i; // Sun's CC requires this temporary.
00097     ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00098     ppc->disconnect_pull_consumer();
00099   }
00100 }
00101 
00102 
00103 //
00104 //  ProxyPullConsumer_i
00105 //
00106 
00107 // CORBA interface methods
00108 
00109 void ProxyPullConsumer_i::connect_pull_supplier(
00110   CosEventComm::PullSupplier_ptr pullSupplier
00111 )
00112 {
00113   if(CORBA::is_nil(pullSupplier))
00114       throw CORBA::BAD_PARAM();
00115   if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00116       throw CosEventChannelAdmin::AlreadyConnected();
00117   _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00118 
00119   if(omniEventsLog::exists())
00120   {
00121     WriteLock log;
00122     output(log.os);
00123   }
00124 }
00125 
00126 void ProxyPullConsumer_i::disconnect_pull_consumer()
00127 {
00128   DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00129   eraseKey("SupplierAdmin/ProxyPullConsumer");
00130   deactivateObject();
00131   if(CORBA::is_nil(_target))
00132   {
00133     throw CORBA::OBJECT_NOT_EXIST(
00134       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00135       CORBA::COMPLETED_NO
00136     );
00137   }
00138   else
00139   {
00140     CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00141     req->send_deferred();
00142     Orb::inst().deferredRequest(req._retn());
00143     _target=CosEventComm::PullSupplier::_nil();
00144   }
00145 }
00146 
00147 //
00148 
00149 ProxyPullConsumer_i::ProxyPullConsumer_i(
00150   PortableServer::POA_ptr poa,
00151   list<CORBA::Any*>&      q
00152 )
00153 : Proxy(poa),
00154   _target(CosEventComm::PullSupplier::_nil()),
00155   _queue(q),
00156   _mode(Pull), // Prefer 'pull' method calls.
00157   _exceptionCount(0)
00158 {}
00159 
00160 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00161 {
00162   DB(20,"~ProxyPullConsumer_i()")
00163 }
00164 
00165 void ProxyPullConsumer_i::collect()
00166 {
00167   if(!CORBA::is_nil(_req) && _req->poll_response()) 
00168   {
00169     const char* opname =_req->operation();
00170     assert(opname);
00171     CORBA::Environment_ptr env =_req->env(); // No need to release environment.
00172 
00173     if(!CORBA::is_nil(env) && env->exception()) 
00174     {
00175       CORBA::Exception* ex =env->exception(); // No need to free exception.
00176       DB(10,"ProxyPullConsumer got exception"
00177            IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00178       if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
00179       {
00180         ++_exceptionCount;
00181         _mode=( _mode==Pull? TryPull: Pull ); // Try something else next time.
00182       }
00183       else
00184           DB(2,"Ignoring unrecognised response. operation:"<<opname);
00185       if(_exceptionCount>=4)
00186       {
00187         Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00188 
00189         // Try to notify the Supplier that the connection is closing.
00190         CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00191         req->send_deferred();
00192         Orb::inst().deferredRequest(req._retn());
00193 
00194         _target=CosEventComm::PullSupplier::_nil(); // disconnected
00195         eraseKey("SupplierAdmin/ProxyPullConsumer");
00196         deactivateObject();
00197       }
00198     }
00199     else  
00200     {
00201       // Do we have an event?
00202       bool hasEvent=false;
00203       if(0==strcmp("pull",opname))
00204       {
00205         hasEvent=true;
00206       }
00207       else if(0==strcmp("try_pull",opname))
00208       {
00209         CORBA::NVList_ptr args=_req->arguments(); // No need to release args.
00210         if(args->count()==1)
00211         {
00212           CORBA::NamedValue_var hasEventArg=args->item(0);
00213           if(0==strcmp(hasEventArg->name(),"has_event"))
00214           {
00215             CORBA::Any* a =hasEventArg->value();
00216             CORBA::Boolean b;
00217             CORBA::Any::to_boolean tb(b); //MS VC++6 is on drugs!
00218             hasEvent=(((*a)>>=tb) && b);
00219           }
00220         }
00221       }
00222       // Pick up an event, if we have one.
00223       if(hasEvent)
00224       {
00225         CORBA::Any* event =new CORBA::Any();
00226         _req->return_value() >>= (*event);
00227         _queue.push_back(event);
00228       }
00229       // Reset the exception count.
00230       _exceptionCount=0;
00231     }
00232     _req=CORBA::Request::_nil();
00233   }
00234 } // ProxyPullConsumer_i::end collect()
00235 
00236 void ProxyPullConsumer_i::triggerRequest()
00237 {
00238   if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00239   {
00240     switch(_mode)
00241     {
00242       case Pull:
00243           _req=_target->_request("pull");
00244           break;
00245       case TryPull:
00246           _req=_target->_request("try_pull");
00247           _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00248           break;
00249       default:
00250           assert(0);
00251     }
00252     _req->set_return_type(CORBA::_tc_any);
00253     _req->send_deferred();
00254   }
00255 }
00256 
00257 void ProxyPullConsumer_i::reincarnate(
00258   const string&      oid,
00259   const PersistNode& node
00260 )
00261 {
00262   CosEventComm::PullSupplier_var pullSupplier =
00263     string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00264   // Do not activate until we know that we have read a valid target.
00265   activateObjectWithId(oid.c_str());
00266   connect_pull_supplier(pullSupplier.in());
00267 }
00268 
00269 void ProxyPullConsumer_i::output(ostream& os)
00270 {
00271   basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00272 }
00273 
00274 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:13 2006 for OmniEvents by  doxygen 1.4.6