pushcons.cc

Go to the documentation of this file.
00001 // -*- Mode: C++; -*-
00002 //                            Package   : omniEvents
00003 //   pushcons.cc              Created   : 1/4/98
00004 //                            Author    : Paul Nader (pwn)
00005 //
00006 //    Copyright (C) 1998 Paul Nader, 2003-2004 Alex Tingle
00007 //
00008 //    This file is part of the omniEvents application.
00009 //
00010 //    omniEvents is free software; you can redistribute it and/or
00011 //    modify it under the terms of the GNU Lesser General Public
00012 //    License as published by the Free Software Foundation; either
00013 //    version 2.1 of the License, or (at your option) any later version.
00014 //
00015 //    omniEvents is distributed in the hope that it will be useful,
00016 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00017 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00018 //    Lesser General Public License for more details.
00019 //
00020 //    You should have received a copy of the GNU Lesser General Public
00021 //    License along with this library; if not, write to the Free Software
00022 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00023 //
00024 // Description:
00025 //    Push Model consumer implementation
00026 //      
00027 
00028 /*
00029   $Log: pushcons.cc,v $
00030   Revision 1.12  2004/10/08 09:06:08  alextingle
00031   More robust exception minor code handling.
00032 
00033   Revision 1.11  2004/08/18 17:49:45  alextingle
00034   Added check for SIGPIPE before trying to use it.
00035 
00036   Revision 1.10  2004/08/06 16:19:23  alextingle
00037   -k & -K options removed.
00038   Naming service names may now be as complex as you like.
00039 
00040   Revision 1.9  2004/04/30 17:54:47  alextingle
00041   Corrected handling of CORBA::Any.
00042 
00043   Revision 1.8  2004/04/20 16:52:17  alextingle
00044   All examples updated for latest version on omniEvents. Server may now be
00045   specified as a 'corbaloc' string or IOR, instead of as naming service id/kind.
00046 
00047   Revision 1.7  2004/04/01 22:28:36  alextingle
00048   Corrected usage message.
00049 
00050   Revision 1.6  2004/03/23 19:09:26  alextingle
00051   Fixed typos.
00052 
00053   Revision 1.5  2004/02/21 19:07:45  alextingle
00054   Corrected servants to use POA instead of BOA.
00055 
00056   Revision 1.4  2004/02/04 22:29:55  alextingle
00057   Reworked all C++ examples.
00058   Removed catch(...) as it tends to make it harder to see what's going on.
00059   Now uses POA instead of BOA.
00060   Uses omniORB4's Exception name probing.
00061   No longer uses 'naming.h/cc' utility code.
00062 
00063   Revision 1.3  2003/11/03 22:19:56  alextingle
00064   Removed all platform specific switches. Now uses autoconf, config.h.
00065   Removed stub header in order to allow makefile dependency checking to work
00066   correctly.
00067   Corrected usage of omni_condition/omni_mutex. Mutexes are now always unlocked by
00068   the same thread that locked them.
00069 
00070   Revision 1.1.1.1.2.1  2002/09/28 22:20:51  shamus13
00071   Added ifdefs to enable omniEvents to compile
00072   with both omniORB3 and omniORB4. If __OMNIORB4__
00073   is defined during compilation, omniORB4 headers
00074   and command line option syntax is used, otherwise
00075   fall back to omniORB3 style.
00076 
00077   Revision 1.1.1.1  2002/09/25 19:00:26  shamus13
00078   Import of OmniEvents source tree from release 2.1.1
00079 
00080   Revision 0.13  2000/08/30 04:39:48  naderp
00081   Port to omniORB 3.0.1.
00082 
00083   Revision 0.12  2000/03/16 05:37:27  naderp
00084   Added stdlib.h for getopt.
00085 
00086   Revision 0.11  2000/03/06 13:27:02  naderp
00087   Using util getRootNamingContext function.
00088   Using stub headers.
00089   Fixed error messages.
00090 
00091   Revision 0.10  2000/03/02 03:20:24  naderp
00092   Added retry resiliency for handling COMM_FAUILURE exceptions.
00093 
00094   Revision 0.9  1999/11/02 13:39:15  naderp
00095   Added <signal.h>
00096 
00097   Revision 0.8  1999/11/02 07:57:04  naderp
00098   Updated usage.
00099 
00100 Revision 0.7  99/11/01  18:10:29  18:10:29  naderp (Paul Nader)
00101 Added ahndling of COMM_FAILURE exception for connect_push_consumer.
00102 
00103 Revision 0.6  99/11/01  16:11:03  16:11:03  naderp (Paul Nader)
00104 omniEvents 2.0 Release.
00105 
00106 Revision 0.5  99/10/27  19:46:01  19:46:01  naderp (Paul Nader)
00107 Ignoring Unix SIGPIPE signal.
00108 Catching COMM_FAILURE exception for obtain_push_supplier.
00109 Continuing if it fails to obtain Proxy Supplier.
00110 Try/Catch block for disconnect_push_supplier.
00111 
00112 Revision 0.4  99/04/23  16:05:46  16:05:46  naderp (Paul Nader)
00113 gcc port.
00114 
00115 Revision 0.3  99/04/23  09:34:03  09:34:03  naderp (Paul Nader)
00116 Windows Port.
00117 
00118 Revision 0.2  99/04/21  18:06:26  18:06:26  naderp (Paul Nader)
00119 *** empty log message ***
00120 
00121 Revision 0.1.1.1  98/11/27  16:59:37  16:59:37  naderp (Paul Nader)
00122 Added -s option to sleep after disconnecting.
00123 
00124 Revision 0.1  98/11/25  14:08:21  14:08:21  naderp (Paul Nader)
00125 Initial Revision
00126 
00127 */
00128 
00129 #ifdef HAVE_CONFIG_H
00130 #  include "config.h"
00131 #endif
00132 
00133 #ifdef HAVE_GETOPT
00134 #  include <unistd.h>
00135 extern char* optarg;
00136 extern int optind;
00137 #else
00138 #  include "getopt.h"
00139 #endif
00140 
00141 #ifdef HAVE_IOSTREAM
00142 #  include <iostream>
00143 #else
00144 #  include <iostream.h>
00145 #endif
00146 
00147 #ifdef HAVE_STD_IOSTREAM
00148 using namespace std;
00149 #endif
00150 
00151 #ifdef HAVE_STDLIB_H
00152 #  include <stdlib.h>
00153 #endif
00154 
00155 #ifdef HAVE_SIGNAL_H
00156 #  include <signal.h>
00157 #endif
00158 
00159 #include "CosEventComm.hh"
00160 #include "CosEventChannelAdmin.hh"
00161 #include "naming.h"
00162 
00163 static omni_mutex mutex;
00164 static omni_condition connect_cond(&mutex);
00165 static void usage(int argc, char **argv);
00166 
00167 class Consumer_i : virtual public POA_CosEventComm::PushConsumer {
00168 public:
00169   Consumer_i(long disconnect=0): _disconnect(disconnect) {}
00170 
00171   void push(const CORBA::Any& data);
00172   void disconnect_push_consumer ();
00173 
00174 private:
00175   long _disconnect;
00176 };
00177 
00178 void Consumer_i::push(const CORBA::Any& data) {
00179   CORBA::ULong l;
00180   static int i = 0;
00181 
00182   i++;
00183   if( data>>=l )
00184   {
00185     cout<<"Push Consumer: push() called. Data : "<< l <<endl;
00186 
00187     // Exercise Disconnect
00188     if (i == _disconnect)
00189     {
00190        i = 0;
00191        // NOTE : The proxy_supplier object is disposed at the server
00192        //        during the disconnect_push_supplier call. Do NOT
00193        //        use the proxy_supplier reference after disconnecting.
00194 
00195        // Signal main thread to disconnect and re-connect.
00196        connect_cond.signal();
00197     }
00198   }
00199   else
00200   {
00201     cerr<<"Push Consumer: push() called. UNEXPECTED TYPE"<<endl;
00202   }
00203 }
00204 
00205 void Consumer_i::disconnect_push_consumer () {
00206   cout << "Push Consumer: disconnected." << endl;
00207 }
00208 
00209 int
00210 main(int argc, char **argv)
00211 {
00212   //
00213   // Start orb.
00214   CORBA::ORB_ptr orb = CORBA::ORB_init(argc,argv);
00215 
00216   // Process Options
00217   int         discnum       =0;
00218   int         sleepInterval =0;
00219   const char* channelName   ="EventChannel";
00220 
00221   int c;
00222   while ((c = getopt(argc,argv,"hd:s:n:")) != EOF)
00223   {
00224      switch (c)
00225      {
00226         case 'd': discnum = atoi(optarg);
00227                   break;
00228 
00229         case 's': sleepInterval = atoi(optarg);
00230                   break;
00231 
00232         case 'n': channelName = optarg;
00233                   break;
00234 
00235         case 'h': usage(argc,argv);
00236                   exit(0);
00237         default : usage(argc,argv);
00238                   exit(-1);
00239      }
00240   }
00241 
00242 #if defined(HAVE_SIGNAL_H) && defined(SIGPIPE)
00243   // Ignore broken pipes
00244   signal(SIGPIPE, SIG_IGN);
00245 #endif
00246 
00247   Consumer_i* consumer = new Consumer_i (discnum);
00248   CosEventChannelAdmin::EventChannel_var channel;
00249 
00250   const char* action=""; // Use this variable to help report errors.
00251   try {
00252     CORBA::Object_var obj;
00253 
00254     action="resolve initial reference 'RootPOA'";
00255     obj=orb->resolve_initial_references("RootPOA");
00256     PortableServer::POA_var rootPoa =PortableServer::POA::_narrow(obj);
00257     if(CORBA::is_nil(rootPoa))
00258         throw CORBA::OBJECT_NOT_EXIST();
00259 
00260     action="activate the RootPOA's POAManager";
00261     PortableServer::POAManager_var pman =rootPoa->the_POAManager();
00262     pman->activate();
00263 
00264     //
00265     // Obtain object reference to EventChannel
00266     // (from command-line argument or from the Naming Service).
00267     if(optind<argc)
00268     {
00269       action="convert URI from command line into object reference";
00270       obj=orb->string_to_object(argv[optind]);
00271     }
00272     else
00273     {
00274       action="resolve initial reference 'NameService'";
00275       obj=orb->resolve_initial_references("NameService");
00276       CosNaming::NamingContext_var rootContext=
00277         CosNaming::NamingContext::_narrow(obj);
00278       if(CORBA::is_nil(rootContext))
00279           throw CORBA::OBJECT_NOT_EXIST();
00280 
00281       action="find EventChannel in NameService";
00282       cout << action << endl;
00283       obj=rootContext->resolve(str2name(channelName));
00284     }
00285 
00286     action="narrow object reference to event channel";
00287     channel=CosEventChannelAdmin::EventChannel::_narrow(obj);
00288     if(CORBA::is_nil(channel))
00289     {
00290        cerr << "Failed to narrow Event Channel reference." << endl;
00291        exit(1);
00292     }
00293 
00294   }
00295   catch(CORBA::ORB::InvalidName& ex) { // resolve_initial_references
00296      cerr<<"Failed to "<<action<<". ORB::InvalidName"<<endl;
00297      exit(1);
00298   }
00299   catch(CosNaming::NamingContext::InvalidName& ex) { // resolve
00300      cerr<<"Failed to "<<action<<". NamingContext::InvalidName"<<endl;
00301      exit(1);
00302   }
00303   catch(CosNaming::NamingContext::NotFound& ex) { // resolve
00304      cerr<<"Failed to "<<action<<". NamingContext::NotFound"<<endl;
00305      exit(1);
00306   }
00307   catch(CosNaming::NamingContext::CannotProceed& ex) { // resolve
00308      cerr<<"Failed to "<<action<<". NamingContext::CannotProceed"<<endl;
00309      exit(1);
00310   }
00311   catch(CORBA::TRANSIENT& ex) { // _narrow()
00312      cerr<<"Failed to "<<action<<". TRANSIENT"<<endl;
00313      exit(1);
00314   }
00315   catch(CORBA::OBJECT_NOT_EXIST& ex) { // _narrow()
00316      cerr<<"Failed to "<<action<<". OBJECT_NOT_EXIST"<<endl;
00317      exit(1);
00318   }
00319   catch(CORBA::SystemException& ex) {
00320      cerr<<"Failed to "<<action<<".";
00321 #if defined(HAVE_OMNIORB4)
00322      cerr<<" "<<ex._name();
00323      if(ex.NP_minorString())
00324          cerr<<" ("<<ex.NP_minorString()<<")";
00325 #endif
00326      cerr<<endl;
00327      exit(1);
00328   }
00329   catch(CORBA::Exception& ex) {
00330      cerr<<"Failed to "<<action<<"."
00331 #if defined(HAVE_OMNIORB4)
00332        " "<<ex._name()
00333 #endif
00334        <<endl;
00335      exit(1);
00336   }
00337 
00338   //
00339   // Get Consumer admin interface - retrying on Comms Failure.
00340   CosEventChannelAdmin::ConsumerAdmin_var consumer_admin;
00341   while (1)
00342   {
00343      try {
00344         consumer_admin = channel->for_consumers ();
00345         if (CORBA::is_nil (consumer_admin))
00346         {
00347            cerr << "Event Channel returned nil Consumer Admin!" << endl;
00348            exit(1);
00349         }
00350         break;
00351      }
00352      catch (CORBA::COMM_FAILURE& ex) {
00353         cerr << "Caught COMM_FAILURE exception "
00354              << "obtaining Consumer Admin! Retrying..."
00355              << endl;
00356         continue;
00357      }
00358   }
00359   cout << "Obtained ConsumerAdmin." << endl;
00360 
00361   while (1) {
00362      //
00363      // Get proxy supplier - retrying on Comms Failure.
00364      CosEventChannelAdmin::ProxyPushSupplier_var proxy_supplier;
00365      while (1)
00366      {
00367         try {
00368            proxy_supplier = consumer_admin->obtain_push_supplier ();
00369            if (CORBA::is_nil (proxy_supplier))
00370            {
00371               cerr << "Consumer Admin returned nil proxy_supplier!"
00372                    << endl;
00373               exit (1);
00374            }
00375            break;
00376         }
00377         catch (CORBA::COMM_FAILURE& ex) {
00378            cerr << "Caught COMM_FAILURE Exception "
00379                 << "obtaining Push Supplier! Retrying..."
00380                 << endl;
00381            continue;
00382         }
00383      }
00384      cout << "Obtained ProxyPushSupplier." << endl;
00385    
00386      //
00387      // Connect Push Consumer - retrying on Comms Failure.
00388      while (1)
00389      {
00390         try {
00391            proxy_supplier->connect_push_consumer(consumer->_this());
00392            break;
00393         }
00394         catch (CORBA::BAD_PARAM& ex) {
00395            cerr << "Caught BAD_PARAM Exception connecting Push Consumer!"
00396                 << endl;
00397            exit (1);
00398         }
00399         catch (CosEventChannelAdmin::AlreadyConnected& ex) {
00400            cerr << "Proxy Push Supplier already connected!"
00401                 << endl;
00402            break;
00403         }
00404         catch (CORBA::COMM_FAILURE& ex) {
00405            cerr << "Caught COMM_FAILURE exception "
00406                 << "connecting Push Consumer! Retrying..."
00407                 << endl;
00408            continue;
00409         }
00410      }
00411      cout << "Connected Push Consumer." << endl;
00412 
00413      // Wait for indication to disconnect before re-connecting.
00414      {
00415        omni_mutex_lock condition_lock(mutex);
00416        connect_cond.wait();
00417      }
00418 
00419      // Disconnect - retrying on Comms Failure.
00420      while (1)
00421      {
00422         try {
00423            proxy_supplier->disconnect_push_supplier();
00424            break;
00425         }
00426         catch (CORBA::COMM_FAILURE& ex) {
00427            cerr << "Caught COMM_FAILURE Exception "
00428                 << "disconnecting Push Consumer! Retrying..."
00429                 << endl;
00430            continue;
00431         }
00432      }
00433      cout << "Disconnected Push Consumer." << endl;
00434    
00435      // Yawn
00436      cout << "Sleeping " << sleepInterval << " seconds." << endl;
00437      omni_thread::sleep(sleepInterval);
00438   }
00439 
00440   // NEVER GET HERE
00441   return 0;
00442 }
00443 
00444 static void
00445 usage(int argc, char **argv)
00446 {
00447   cerr<<
00448 "\nCreate a PushConsumer to receive events from a channel.\n"
00449 "syntax: "<<(argc?argv[0]:"pushcons")<<" OPTIONS [CHANNEL_URI]\n"
00450 "\n"
00451 "CHANNEL_URI: The event channel may be specified as a URI.\n"
00452 " This may be an IOR, or a corbaloc::: or corbaname::: URI.\n"
00453 "\n"
00454 "OPTIONS:                                         DEFAULT:\n"
00455 " -d NUM   disconnect after receiving NUM events   [0 - never disconnect]\n"
00456 " -s SECS  sleep SECS seconds after disconnecting  [0]\n"
00457 " -n NAME  channel name (if URI is not specified)  [\"EventChannel\"]\n"
00458 " -h       display this help text\n" << endl;
00459 }

Generated on Mon Jan 9 03:52:14 2006 for OmniEvents by  doxygen 1.4.6