00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024 #include "ProxyPullConsumer.h"
00025 #include "Orb.h"
00026 #include "omniEventsLog.h"
00027 #include "PersistNode.h"
00028 #include <assert.h>
00029
00030 namespace OmniEvents {
00031
00032
00033
00034
00035
00036 PortableServer::Servant
00037 ProxyPullConsumerManager::incarnate(
00038 const PortableServer::ObjectId& oid,
00039 PortableServer::POA_ptr poa
00040 )
00041 {
00042 DB(20,"ProxyPullConsumerManager::incarnate()")
00043 ProxyPullConsumer_i* result =new ProxyPullConsumer_i(_managedPoa,_queue);
00044 _servants.insert(result);
00045 return result;
00046 }
00047
00048 ProxyPullConsumerManager::ProxyPullConsumerManager(
00049 PortableServer::POA_ptr parentPoa,
00050 list<CORBA::Any*>& q
00051 )
00052 : ProxyManager(parentPoa,"ProxyPullConsumer"),
00053 _queue(q)
00054 {
00055
00056 }
00057
00058 ProxyPullConsumerManager::~ProxyPullConsumerManager()
00059 {
00060 DB(20,"~ProxyPullConsumerManager()")
00061 }
00062
00063 CosEventChannelAdmin::ProxyPullConsumer_ptr
00064 ProxyPullConsumerManager::createObject()
00065 {
00066 return createNarrowedReference<CosEventChannelAdmin::ProxyPullConsumer>(
00067 _managedPoa.in(),
00068 CosEventChannelAdmin::_tc_ProxyPullConsumer->id()
00069 );
00070 }
00071
00072 void ProxyPullConsumerManager::collect()
00073 {
00074
00075 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00076 {
00077 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00078 proxy->collect();
00079 }
00080 }
00081
00082 void ProxyPullConsumerManager::triggerRequest()
00083 {
00084
00085 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00086 {
00087 ProxyPullConsumer_i* proxy=dynamic_cast<ProxyPullConsumer_i*>(*i);
00088 proxy->triggerRequest();
00089 }
00090 }
00091
00092 void ProxyPullConsumerManager::disconnect()
00093 {
00094 for(set<Proxy*>::iterator i =_servants.begin(); i!=_servants.end(); ++i)
00095 {
00096 Proxy* p =*i;
00097 ProxyPullConsumer_i* ppc =static_cast<ProxyPullConsumer_i*>(p);
00098 ppc->disconnect_pull_consumer();
00099 }
00100 }
00101
00102
00103
00104
00105
00106
00107
00108
00109 void ProxyPullConsumer_i::connect_pull_supplier(
00110 CosEventComm::PullSupplier_ptr pullSupplier
00111 )
00112 {
00113 if(CORBA::is_nil(pullSupplier))
00114 throw CORBA::BAD_PARAM();
00115 if(!CORBA::is_nil(_target) || !CORBA::is_nil(_req))
00116 throw CosEventChannelAdmin::AlreadyConnected();
00117 _target=CosEventComm::PullSupplier::_duplicate(pullSupplier);
00118
00119 if(omniEventsLog::exists())
00120 {
00121 WriteLock log;
00122 output(log.os);
00123 }
00124 }
00125
00126 void ProxyPullConsumer_i::disconnect_pull_consumer()
00127 {
00128 DB(5,"ProxyPullConsumer_i::disconnect_pull_consumer()");
00129 eraseKey("SupplierAdmin/ProxyPullConsumer");
00130 deactivateObject();
00131 if(CORBA::is_nil(_target))
00132 {
00133 throw CORBA::OBJECT_NOT_EXIST(
00134 IFELSE_OMNIORB4(omni::OBJECT_NOT_EXIST_NoMatch,0),
00135 CORBA::COMPLETED_NO
00136 );
00137 }
00138 else
00139 {
00140 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00141 req->send_deferred();
00142 Orb::inst().deferredRequest(req._retn());
00143 _target=CosEventComm::PullSupplier::_nil();
00144 }
00145 }
00146
00147
00148
00149 ProxyPullConsumer_i::ProxyPullConsumer_i(
00150 PortableServer::POA_ptr poa,
00151 list<CORBA::Any*>& q
00152 )
00153 : Proxy(poa),
00154 _target(CosEventComm::PullSupplier::_nil()),
00155 _queue(q),
00156 _mode(Pull),
00157 _exceptionCount(0)
00158 {}
00159
00160 ProxyPullConsumer_i::~ProxyPullConsumer_i()
00161 {
00162 DB(20,"~ProxyPullConsumer_i()")
00163 }
00164
00165 void ProxyPullConsumer_i::collect()
00166 {
00167 if(!CORBA::is_nil(_req) && _req->poll_response())
00168 {
00169 const char* opname =_req->operation();
00170 assert(opname);
00171 CORBA::Environment_ptr env =_req->env();
00172
00173 if(!CORBA::is_nil(env) && env->exception())
00174 {
00175 CORBA::Exception* ex =env->exception();
00176 DB(10,"ProxyPullConsumer got exception"
00177 IF_OMNIORB4(<<": "<<ex->_name())<<", op:"<<opname);
00178 if(0==strcmp("pull",opname) || 0==strcmp("try_pull",opname))
00179 {
00180 ++_exceptionCount;
00181 _mode=( _mode==Pull? TryPull: Pull );
00182 }
00183 else
00184 DB(2,"Ignoring unrecognised response. operation:"<<opname);
00185 if(_exceptionCount>=4)
00186 {
00187 Orb::inst().reportObjectFailure(HERE,_target.in(),ex);
00188
00189
00190 CORBA::Request_var req=_target->_request("disconnect_pull_supplier");
00191 req->send_deferred();
00192 Orb::inst().deferredRequest(req._retn());
00193
00194 _target=CosEventComm::PullSupplier::_nil();
00195 eraseKey("SupplierAdmin/ProxyPullConsumer");
00196 deactivateObject();
00197 }
00198 }
00199 else
00200 {
00201
00202 bool hasEvent=false;
00203 if(0==strcmp("pull",opname))
00204 {
00205 hasEvent=true;
00206 }
00207 else if(0==strcmp("try_pull",opname))
00208 {
00209 CORBA::NVList_ptr args=_req->arguments();
00210 if(args->count()==1)
00211 {
00212 CORBA::NamedValue_var hasEventArg=args->item(0);
00213 if(0==strcmp(hasEventArg->name(),"has_event"))
00214 {
00215 CORBA::Any* a =hasEventArg->value();
00216 CORBA::Boolean b;
00217 CORBA::Any::to_boolean tb(b);
00218 hasEvent=(((*a)>>=tb) && b);
00219 }
00220 }
00221 }
00222
00223 if(hasEvent)
00224 {
00225 CORBA::Any* event =new CORBA::Any();
00226 _req->return_value() >>= (*event);
00227 _queue.push_back(event);
00228 }
00229
00230 _exceptionCount=0;
00231 }
00232 _req=CORBA::Request::_nil();
00233 }
00234 }
00235
00236 void ProxyPullConsumer_i::triggerRequest()
00237 {
00238 if(CORBA::is_nil(_req) && !CORBA::is_nil(_target))
00239 {
00240 switch(_mode)
00241 {
00242 case Pull:
00243 _req=_target->_request("pull");
00244 break;
00245 case TryPull:
00246 _req=_target->_request("try_pull");
00247 _req->add_out_arg("has_event")<<=CORBA::Any::from_boolean(1);
00248 break;
00249 default:
00250 assert(0);
00251 }
00252 _req->set_return_type(CORBA::_tc_any);
00253 _req->send_deferred();
00254 }
00255 }
00256
00257 void ProxyPullConsumer_i::reincarnate(
00258 const string& oid,
00259 const PersistNode& node
00260 )
00261 {
00262 CosEventComm::PullSupplier_var pullSupplier =
00263 string_to_<CosEventComm::PullSupplier>(node.attrString("IOR").c_str());
00264
00265 activateObjectWithId(oid.c_str());
00266 connect_pull_supplier(pullSupplier.in());
00267 }
00268
00269 void ProxyPullConsumer_i::output(ostream& os)
00270 {
00271 basicOutput(os,"SupplierAdmin/ProxyPullConsumer",_target.in());
00272 }
00273
00274 };