SupplierAdmin.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // SupplierAdmin.h            Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "SupplierAdmin.h"
00025 
00026 #include "EventChannel.h"
00027 #include "ProxyPushConsumer.h"
00028 #include "ProxyPullConsumer.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 
00032 #define MILLION 1000000
00033 #define BILLION 1000000000
00034 
00035 namespace OmniEvents {
00036 
00037 CosEventChannelAdmin::ProxyPushConsumer_ptr
00038 SupplierAdmin_i::obtain_push_consumer()
00039 {
00040   return _pushConsumer->createObject();
00041 }
00042 
00043 
00044 CosEventChannelAdmin::ProxyPullConsumer_ptr
00045 SupplierAdmin_i::obtain_pull_consumer()
00046 {
00047   if(!_pullConsumer)
00048       _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00049   return _pullConsumer->createObject();
00050 }
00051 
00052 
00053 SupplierAdmin_i::SupplierAdmin_i(
00054   const EventChannel_i&   channel,
00055   PortableServer::POA_ptr poa
00056 )
00057 : Servant(poa),
00058   _channel(channel),
00059   _pushConsumer(NULL),
00060   _pullConsumer(NULL),
00061   _queue(),
00062   _nextPull(0,0)
00063 {
00064   // Initialise _nextPull. Only set it if the cycle period is LESS than the
00065   // pull retry period - otherwise just pull every cycle.
00066   if(_channel.pullRetryPeriod_ms() > (_channel.cyclePeriod_ns()/MILLION))
00067   {
00068     omni_thread::get_time(&(_nextPull.first),&(_nextPull.second));
00069   }
00070 
00071   // Always create the ProxyPushConsumer_i default servant. This allows
00072   // lazy clients to connect suppliers without having to go through the
00073   // proper procedure - they can make up an appropriate ObjectId, call push()
00074   // and it will just work (TM).
00075   // Note: A SupplierAdmin_i is always created by the EventChannel to allow this
00076   // behaviour.
00077   _pushConsumer=new ProxyPushConsumer_i(_poa,_queue,_channel.consumerAdmin());
00078   
00079   activateObjectWithId("SupplierAdmin");
00080   _remove_ref();
00081 }
00082 
00083 
00084 SupplierAdmin_i::~SupplierAdmin_i()
00085 {
00086   if(_pushConsumer)
00087   {
00088     delete _pushConsumer;
00089     _pushConsumer=NULL;
00090   }
00091   for(list<CORBA::Any*>::iterator i=_queue.begin(); i!=_queue.end(); ++i)
00092       delete *i;
00093   DB(20,"~SupplierAdmin_i()")
00094 }
00095 
00096 
00097 void SupplierAdmin_i::collect(list<CORBA::Any*>& events)
00098 {
00099   if(_pullConsumer)
00100   {
00101     _pullConsumer->collect();
00102     if(0==_nextPull.first)
00103     { // No delay between pulls.
00104       _pullConsumer->triggerRequest();
00105     }
00106     else
00107     { // Only trigger new pull() calls if `pullRetry' ms have passed.
00108       pair<unsigned long,unsigned long> now;
00109       omni_thread::get_time(&(now.first),&(now.second));
00110       if(now>=_nextPull)
00111       {
00112         _pullConsumer->triggerRequest();
00113 
00114         CORBA::ULong p =_channel.pullRetryPeriod_ms();
00115         do{
00116           _nextPull.second += (p%1000)*MILLION;                    // nsec
00117           _nextPull.first  +=  p/1000 + _nextPull.second/BILLION;  // sec
00118           _nextPull.second %= BILLION;                             // nsec
00119         } while(now>=_nextPull);
00120       }
00121     }
00122   }
00123   _pushConsumer->trigger();
00124   // Pick up events from both pull & push consumers.
00125   events=_queue;
00126   _queue.clear();
00127 }
00128 
00129 
00130 void SupplierAdmin_i::disconnect()
00131 {
00132   if(_pushConsumer)
00133      _pushConsumer->disconnect();
00134   if(_pullConsumer)
00135      _pullConsumer->disconnect();
00136 }
00137 
00138 
00139 void SupplierAdmin_i::reincarnate(const PersistNode& node)
00140 {
00141   // Build Push Consumer proxies
00142   PersistNode* pushcNode =node.child("ProxyPushConsumer");
00143   if(pushcNode && !pushcNode->_child.empty())
00144   {
00145     assert(_pushConsumer!=NULL);
00146     _pushConsumer->reincarnate(*pushcNode);
00147   }
00148 
00149   // Build Pull Consumer proxies
00150   PersistNode* pullcNode =node.child("ProxyPullConsumer");
00151   if(pullcNode && !pullcNode->_child.empty())
00152   {
00153     if(!_pullConsumer)
00154         _pullConsumer=new ProxyPullConsumerManager(_poa,_queue);
00155     _pullConsumer->reincarnate(*pullcNode);
00156   }
00157 }
00158 
00159 
00160 void SupplierAdmin_i::output(ostream& os)
00161 {
00162   if(_pushConsumer)
00163      _pushConsumer->output(os);
00164   if(_pullConsumer)
00165      _pullConsumer->output(os);
00166 }
00167 
00168 
00169 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:14 2006 for OmniEvents by  doxygen 1.4.6