ProxyPullSupplier.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ProxyPullSupplier.cc       Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030 
00031 namespace OmniEvents {
00032 
00033 //
00034 //  ProxyPullSupplierManager
00035 //
00036 
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038   const PortableServer::ObjectId& oid,
00039   PortableServer::POA_ptr         poa
00040 )
00041 {
00042   // Evict the oldest proxy servant, if we have reached the maximum number.
00043   if(_servants.size()>=_channel.maxNumProxies())
00044   {
00045     ProxyPullSupplier_i* oldest =NULL;
00046     unsigned long        age    =0;
00047     for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048         if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049         {
00050           oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051           age=oldest->timestamp();
00052         }
00053     DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054     try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055   }
00056   // Make a new servant.
00057   ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058   _servants.insert(result);
00059   return result;
00060 }
00061 
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063   const EventChannel_i&   channel,
00064   PortableServer::POA_ptr parentPoa,
00065   EventQueue&             q
00066 )
00067 : ProxyManager(parentPoa,"ProxyPullSupplier"),
00068   _queue(q),
00069   _channel(channel)
00070 {
00071   // pass
00072 }
00073 
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076   DB(20,"~ProxyPullSupplierManager()")
00077 }
00078 
00079 CosEventChannelAdmin::ProxyPullSupplier_ptr
00080 ProxyPullSupplierManager::createObject()
00081 {  
00082   return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00083            _managedPoa.in(),
00084            CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00085          );
00086 }
00087 
00088 void ProxyPullSupplierManager::disconnect()
00089 {
00090   for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00091   {
00092     ProxyPullSupplier_i* narrowed =dynamic_cast<ProxyPullSupplier_i*>(*i);
00093     narrowed->disconnect_pull_supplier();
00094   }
00095 }
00096 
00097 
00098 //
00099 //  ProxyPullSupplier_i
00100 //
00101 
00102 // CORBA interface methods
00103 
00104 void ProxyPullSupplier_i::connect_pull_consumer(
00105   CosEventComm::PullConsumer_ptr pullConsumer
00106 )
00107 {
00108   if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00109       throw CosEventChannelAdmin::AlreadyConnected();
00110   touch();
00111   _connected=true;
00112   if(!CORBA::is_nil(pullConsumer))
00113       _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00114 
00115   if(omniEventsLog::exists())
00116   {
00117     WriteLock log;
00118     output(log.os);
00119   }
00120 }
00121 
00122 void ProxyPullSupplier_i::disconnect_pull_supplier()
00123 {
00124   DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00125   touch();
00126   eraseKey("ConsumerAdmin/ProxyPullSupplier");
00127   deactivateObject();
00128   if(!_connected)
00129   {
00130     throw CORBA::OBJECT_NOT_EXIST(
00131       IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00132       CORBA::COMPLETED_NO
00133     );
00134   }
00135   else if(!CORBA::is_nil(_target))
00136   {
00137     CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00138     req->send_deferred();
00139     Orb::inst().deferredRequest(req._retn());
00140   }
00141   _target=CosEventComm::PullConsumer::_nil();
00142 }
00143 
00144 CORBA::Any* ProxyPullSupplier_i::pull()
00145 {
00146   if(!_connected)
00147       throw CosEventComm::Disconnected();
00148   touch();
00149   if(moreEvents())
00150       return new CORBA::Any(*nextEvent());
00151   else
00152       throw CORBA::TRANSIENT(
00153         IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00154         CORBA::COMPLETED_NO
00155       );
00156 }
00157 
00158 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00159 {
00160   if(!_connected)
00161       throw CosEventComm::Disconnected();
00162   touch();
00163   if(moreEvents())
00164   {
00165     has_event=1;
00166     return new CORBA::Any(*nextEvent());
00167   }
00168   else
00169   {
00170     has_event=0;
00171     return new CORBA::Any();
00172   }
00173 }
00174 
00175 //
00176 
00177 ProxyPullSupplier_i::ProxyPullSupplier_i(
00178   PortableServer::POA_ptr poa,
00179   EventQueue& q
00180 )
00181 : Proxy(poa),
00182   EventQueue::Reader(q),
00183   _target(CosEventComm::PullConsumer::_nil()),
00184   _connected(false),
00185   _timestamp(0)
00186 {
00187   touch();
00188 }
00189 
00190 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00191 {
00192   DB(20,"~ProxyPullSupplier_i()")
00193 }
00194 
00195 void ProxyPullSupplier_i::reincarnate(
00196   const string&      oid,
00197   const PersistNode& node
00198 )
00199 {
00200   CosEventComm::PullConsumer_var pullConsumer =
00201     string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00202   // Do not activate until we know that we have read a valid target.
00203   activateObjectWithId(oid.c_str());
00204   connect_pull_consumer(pullConsumer.in());
00205 }
00206 
00207 void ProxyPullSupplier_i::output(ostream& os)
00208 {
00209   basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00210 }
00211 
00212 inline void ProxyPullSupplier_i::touch()
00213 {
00214   unsigned long nsec; // dummy
00215   omni_thread::get_time(&_timestamp,&nsec);
00216 }
00217 
00218 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:13 2006 for OmniEvents by  doxygen 1.4.6