00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129 #ifdef HAVE_CONFIG_H
00130 # include "config.h"
00131 #endif
00132
00133 #ifdef HAVE_GETOPT
00134 # include <unistd.h>
00135 extern char* optarg;
00136 extern int optind;
00137 #else
00138 # include "getopt.h"
00139 #endif
00140
00141 #ifdef HAVE_IOSTREAM
00142 # include <iostream>
00143 #else
00144 # include <iostream.h>
00145 #endif
00146
00147 #ifdef HAVE_STD_IOSTREAM
00148 using namespace std;
00149 #endif
00150
00151 #ifdef HAVE_STDLIB_H
00152 # include <stdlib.h>
00153 #endif
00154
00155 #ifdef HAVE_SIGNAL_H
00156 # include <signal.h>
00157 #endif
00158
00159 #include "CosEventComm.hh"
00160 #include "CosEventChannelAdmin.hh"
00161 #include "naming.h"
00162
00163 static omni_mutex mutex;
00164 static omni_condition connect_cond(&mutex);
00165 static void usage(int argc, char **argv);
00166
00167 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00168 public:
00169 Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00170
00171 void push(const CORBA::Any& data);
00172 void disconnect_push_consumer ();
00173
00174 private:
00175 long _disconnect;
00176 };
00177
00178 void Consumer_i::push(const CORBA::Any& data) {
00179 CORBA::ULong l;
00180 static int i = 0;
00181
00182 i++;
00183 if( data>>=l )
00184 {
00185 cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00186
00187
00188 if (i == _disconnect)
00189 {
00190 i = 0;
00191
00192
00193
00194
00195
00196 connect_cond.signal();
00197 }
00198 }
00199 else
00200 {
00201 cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00202 }
00203 }
00204
00205 void Consumer_i::disconnect_push_consumer () {
00206 cout << "Push Consumer: disconnected." << endl;
00207 }
00208
00209 int
00210 main(int argc, char **argv)
00211 {
00212
00213
00214 CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00215
00216
00217 int discnum =0;
00218 int sleepInterval =0;
00219 const char* channelName ="EventChannel";
00220
00221 int c;
00222 while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00223 {
00224 switch (c)
00225 {
00226 case 'd': discnum = atoi(optarg);
00227 break;
00228
00229 case 's': sleepInterval = atoi(optarg);
00230 break;
00231
00232 case 'n': channelName = optarg;
00233 break;
00234
00235 case 'h': usage(argc,argv);
00236 exit(0);
00237 default : usage(argc,argv);
00238 exit(-1);
00239 }
00240 }
00241
00242 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00243
00244 signal(SIGPIPE, SIG_IGN);
00245 #endif
00246
00247 Consumer_i* consumer = new Consumer_i (discnum);
00248 CosEventChannelAdmin::EventChannel_var channel;
00249
00250 const char* action="";
00251 try {
00252 CORBA::Object_var obj;
00253
00254 action="resolve initial reference 'RootPOA'";
00255 obj=orb->resolve_initial_references("RootPOA");
00256 PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00257 if(CORBA::is_nil(rootPoa))
00258 throw CORBA::OBJECT_NOT_EXIST();
00259
00260 action="activate the RootPOA's POAManager";
00261 PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00262 pman->activate();
00263
00264
00265
00266
00267 if(optind<argc)
00268 {
00269 action="convert URI from command line into object reference";
00270 obj=orb->string_to_object(argv[optind]);
00271 }
00272 else
00273 {
00274 action="resolve initial reference 'NameService'";
00275 obj=orb->resolve_initial_references("NameService");
00276 CosNaming::NamingContext_var rootContext=
00277 CosNaming::NamingContext::_narrow(obj);
00278 if(CORBA::is_nil(rootContext))
00279 throw CORBA::OBJECT_NOT_EXIST();
00280
00281 action="find EventChannel in NameService";
00282 cout << action << endl;
00283 obj=rootContext->resolve(str2name(channelName));
00284 }
00285
00286 action="narrow object reference to event channel";
00287 channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00288 if(CORBA::is_nil(channel))
00289 {
00290 cerr << "Failed to narrow Event Channel reference." << endl;
00291 exit(1);
00292 }
00293
00294 }
00295 catch(CORBA::ORB::InvalidName& ex) {
00296 cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00297 exit(1);
00298 }
00299 catch(CosNaming::NamingContext::InvalidName& ex) {
00300 cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00301 exit(1);
00302 }
00303 catch(CosNaming::NamingContext::NotFound& ex) {
00304 cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00305 exit(1);
00306 }
00307 catch(CosNaming::NamingContext::CannotProceed& ex) {
00308 cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00309 exit(1);
00310 }
00311 catch(CORBA::TRANSIENT& ex) {
00312 cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00313 exit(1);
00314 }
00315 catch(CORBA::OBJECT_NOT_EXIST& ex) {
00316 cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00317 exit(1);
00318 }
00319 catch(CORBA::SystemException& ex) {
00320 cerr<<"Failed to "<<action<<".";
00321 #if defined(HAVE_OMNIORB4)
00322 cerr<<" "<<ex._name();
00323 if(ex.NP_minorString())
00324 cerr<<" ("<<ex.NP_minorString()<<")";
00325 #endif
00326 cerr<<endl;
00327 exit(1);
00328 }
00329 catch(CORBA::Exception& ex) {
00330 cerr<<"Failed to "<<action<<"."
00331 #if defined(HAVE_OMNIORB4)
00332 " "<<ex._name()
00333 #endif
00334 <<endl;
00335 exit(1);
00336 }
00337
00338
00339
00340 CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00341 while (1)
00342 {
00343 try {
00344 consumer_admin = channel->for_consumers ();
00345 if (CORBA::is_nil (consumer_admin))
00346 {
00347 cerr << "Event Channel returned nil Consumer Admin!" << endl;
00348 exit(1);
00349 }
00350 break;
00351 }
00352 catch (CORBA::COMM_FAILURE& ex) {
00353 cerr << "Caught COMM_FAILURE exception "
00354 << "obtaining Consumer Admin! Retrying..."
00355 << endl;
00356 continue;
00357 }
00358 }
00359 cout << "Obtained ConsumerAdmin." << endl;
00360
00361 while (1) {
00362
00363
00364 CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00365 while (1)
00366 {
00367 try {
00368 proxy_supplier = consumer_admin->obtain_push_supplier ();
00369 if (CORBA::is_nil (proxy_supplier))
00370 {
00371 cerr << "Consumer Admin returned nil proxy_supplier!"
00372 << endl;
00373 exit (1);
00374 }
00375 break;
00376 }
00377 catch (CORBA::COMM_FAILURE& ex) {
00378 cerr << "Caught COMM_FAILURE Exception "
00379 << "obtaining Push Supplier! Retrying..."
00380 << endl;
00381 continue;
00382 }
00383 }
00384 cout << "Obtained ProxyPushSupplier." << endl;
00385
00386
00387
00388 while (1)
00389 {
00390 try {
00391 proxy_supplier->connect_push_consumer(consumer->_this());
00392 break;
00393 }
00394 catch (CORBA::BAD_PARAM& ex) {
00395 cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00396 << endl;
00397 exit (1);
00398 }
00399 catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00400 cerr << "Proxy Push Supplier already connected!"
00401 << endl;
00402 break;
00403 }
00404 catch (CORBA::COMM_FAILURE& ex) {
00405 cerr << "Caught COMM_FAILURE exception "
00406 << "connecting Push Consumer! Retrying..."
00407 << endl;
00408 continue;
00409 }
00410 }
00411 cout << "Connected Push Consumer." << endl;
00412
00413
00414 {
00415 omni_mutex_lock condition_lock(mutex);
00416 connect_cond.wait();
00417 }
00418
00419
00420 while (1)
00421 {
00422 try {
00423 proxy_supplier->disconnect_push_supplier();
00424 break;
00425 }
00426 catch (CORBA::COMM_FAILURE& ex) {
00427 cerr << "Caught COMM_FAILURE Exception "
00428 << "disconnecting Push Consumer! Retrying..."
00429 << endl;
00430 continue;
00431 }
00432 }
00433 cout << "Disconnected Push Consumer." << endl;
00434
00435
00436 cout << "Sleeping " << sleepInterval << " seconds." << endl;
00437 omni_thread::sleep(sleepInterval);
00438 }
00439
00440
00441 return 0;
00442 }
00443
00444 static void
00445 usage(int argc, char **argv)
00446 {
00447 cerr<<
00448 "\nCreate a PushConsumer to receive events from a channel.\n"
00449 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00450 "\n"
00451 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00452 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00453 "\n"
00454 "OPTIONS: DEFAULT:\n"
00455 " -d NUM disconnect after receiving NUM events [0 - never disconnect]\n"
00456 " -s SECS sleep SECS seconds after disconnecting [0]\n"
00457 " -n NAME channel name (if URI is not specified) [\"EventChannel\"]\n"
00458 " -h display this help text\n" << endl;
00459 }