00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullSupplier.h"
00025 #include "EventChannel.h"
00026 #include "Orb.h"
00027 #include "omniEventsLog.h"
00028 #include "PersistNode.h"
00029 #include <assert.h>
00030
00031 namespace OmniEvents {
00032
00033
00034
00035
00036
00037 PortableServer::Servant ProxyPullSupplierManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042
00043 if(_servants.size()>=_channel.maxNumProxies())
00044 {
00045 ProxyPullSupplier_i* oldest =NULL;
00046 unsigned long age =0;
00047 for(set<Proxy*>::iterator i=_servants.begin(); i!=_servants.end(); ++i)
00048 if(!oldest || dynamic_cast<ProxyPullSupplier_i*>(*i)->timestamp()<age)
00049 {
00050 oldest=dynamic_cast<ProxyPullSupplier_i*>(*i);
00051 age=oldest->timestamp();
00052 }
00053 DB(5,"Evicting oldest ProxyPullSupplier to make space for a new one")
00054 try{ oldest->disconnect_pull_supplier(); }catch(CORBA::OBJECT_NOT_EXIST&){}
00055 }
00056
00057 ProxyPullSupplier_i* result =new ProxyPullSupplier_i(_managedPoa,_queue);
00058 _servants.insert(result);
00059 return result;
00060 }
00061
00062 ProxyPullSupplierManager::ProxyPullSupplierManager(
00063 const EventChannel_i& channel,
00064 PortableServer::POA_ptr parentPoa,
00065 EventQueue& q
00066 )
00067 : ProxyManager(parentPoa,"ProxyPullSupplier"),
00068 _queue(q),
00069 _channel(channel)
00070 {
00071
00072 }
00073
00074 ProxyPullSupplierManager::~ProxyPullSupplierManager()
00075 {
00076 DB(20,"~ProxyPullSupplierManager()")
00077 }
00078
00079 CosEventChannelAdmin::ProxyPullSupplier_ptr
00080 ProxyPullSupplierManager::createObject()
00081 {
00082 return createNarrowedReference<CosEventChannelAdmin::ProxyPullSupplier>(
00083 _managedPoa.in(),
00084 CosEventChannelAdmin::_tc_ProxyPullSupplier->id()
00085 );
00086 }
00087
00088 void ProxyPullSupplierManager::disconnect()
00089 {
00090 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00091 {
00092 ProxyPullSupplier_i* narrowed =dynamic_cast<ProxyPullSupplier_i*>(*i);
00093 narrowed->disconnect_pull_supplier();
00094 }
00095 }
00096
00097
00098
00099
00100
00101
00102
00103
00104 void ProxyPullSupplier_i::connect_pull_consumer(
00105 CosEventComm::PullConsumer_ptr pullConsumer
00106 )
00107 {
00108 if(_connected || !CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00109 throw CosEventChannelAdmin::AlreadyConnected();
00110 touch();
00111 _connected=true;
00112 if(!CORBA::is_nil(pullConsumer))
00113 _target=CosEventComm::PullConsumer::_duplicate(pullConsumer);
00114
00115 if(omniEventsLog::exists())
00116 {
00117 WriteLock log;
00118 output(log.os);
00119 }
00120 }
00121
00122 void ProxyPullSupplier_i::disconnect_pull_supplier()
00123 {
00124 DB(5,"ProxyPullSupplier_i::disconnect_pull_supplier()");
00125 touch();
00126 eraseKey("ConsumerAdmin/ProxyPullSupplier");
00127 deactivateObject();
00128 if(!_connected)
00129 {
00130 throw CORBA::OBJECT_NOT_EXIST(
00131 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00132 CORBA::COMPLETED_NO
00133 );
00134 }
00135 else if(!CORBA::is_nil(_target))
00136 {
00137 CORBA::Request_var req=_target->_request("disconnect_pull_consumer");
00138 req->send_deferred();
00139 Orb::inst().deferredRequest(req._retn());
00140 }
00141 _target=CosEventComm::PullConsumer::_nil();
00142 }
00143
00144 CORBA::Any* ProxyPullSupplier_i::pull()
00145 {
00146 if(!_connected)
00147 throw CosEventComm::Disconnected();
00148 touch();
00149 if(moreEvents())
00150 return new CORBA::Any(*nextEvent());
00151 else
00152 throw CORBA::TRANSIENT(
00153 IFELSE_OMNIORB4(omni::TRANSIENT_CallTimedout,0),
00154 CORBA::COMPLETED_NO
00155 );
00156 }
00157
00158 CORBA::Any* ProxyPullSupplier_i::try_pull(CORBA::Boolean& has_event)
00159 {
00160 if(!_connected)
00161 throw CosEventComm::Disconnected();
00162 touch();
00163 if(moreEvents())
00164 {
00165 has_event=1;
00166 return new CORBA::Any(*nextEvent());
00167 }
00168 else
00169 {
00170 has_event=0;
00171 return new CORBA::Any();
00172 }
00173 }
00174
00175
00176
00177 ProxyPullSupplier_i::ProxyPullSupplier_i(
00178 PortableServer::POA_ptr poa,
00179 EventQueue& q
00180 )
00181 : Proxy(poa),
00182 EventQueue::Reader(q),
00183 _target(CosEventComm::PullConsumer::_nil()),
00184 _connected(false),
00185 _timestamp(0)
00186 {
00187 touch();
00188 }
00189
00190 ProxyPullSupplier_i::~ProxyPullSupplier_i()
00191 {
00192 DB(20,"~ProxyPullSupplier_i()")
00193 }
00194
00195 void ProxyPullSupplier_i::reincarnate(
00196 const string& oid,
00197 const PersistNode& node
00198 )
00199 {
00200 CosEventComm::PullConsumer_var pullConsumer =
00201 string_to_<CosEventComm::PullConsumer>(node.attrString("IOR").c_str());
00202
00203 activateObjectWithId(oid.c_str());
00204 connect_pull_consumer(pullConsumer.in());
00205 }
00206
00207 void ProxyPullSupplier_i::output(ostream& os)
00208 {
00209 basicOutput(os,"ConsumerAdmin/ProxyPullSupplier",_target.in());
00210 }
00211
00212 inline void ProxyPullSupplier_i::touch()
00213 {
00214 unsigned long nsec;
00215 omni_thread::get_time(&_timestamp,&nsec);
00216 }
00217
00218 };