ConsumerAdmin.cc

Go to the documentation of this file.
00001 //                            Package   : omniEvents
00002 // ConsumerAdmin.cc           Created   : 2003/12/04
00003 //                            Author    : Alex Tingle
00004 //
00005 //    Copyright (C) 2003 Alex Tingle.
00006 //
00007 //    This file is part of the omniEvents application.
00008 //
00009 //    omniEvents is free software; you can redistribute it and/or
00010 //    modify it under the terms of the GNU Lesser General Public
00011 //    License as published by the Free Software Foundation; either
00012 //    version 2.1 of the License, or (at your option) any later version.
00013 //
00014 //    omniEvents is distributed in the hope that it will be useful,
00015 //    but WITHOUT ANY WARRANTY; without even the implied warranty of
00016 //    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
00017 //    Lesser General Public License for more details.
00018 //
00019 //    You should have received a copy of the GNU Lesser General Public
00020 //    License along with this library; if not, write to the Free Software
00021 //    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
00022 //
00023 
00024 #include "ConsumerAdmin.h"
00025 
00026 #include "EventChannel.h"
00027 #include "ProxyPushSupplier.h"
00028 #include "ProxyPullSupplier.h"
00029 #include "Orb.h"
00030 #include "PersistNode.h"
00031 #include "Filter.h"
00032 
00033 namespace OmniEvents {
00034 
00035 
00036 CosEventChannelAdmin::ProxyPushSupplier_ptr
00037 ConsumerAdmin_i::obtain_push_supplier()
00038 {
00039   if(!_pushSupplier)
00040   {
00041     _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00042     _pushSupplier->_add_ref();
00043   }
00044   return _pushSupplier->createObject();
00045 }
00046 
00047 
00048 CosEventChannelAdmin::ProxyPullSupplier_ptr
00049 ConsumerAdmin_i::obtain_pull_supplier()
00050 {
00051   if(!_pullSupplier)
00052       _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00053   return _pullSupplier->createObject();
00054 }
00055 
00056 
00057 ConsumerAdmin_i::ConsumerAdmin_i(
00058   const EventChannel_i&   channel,
00059   PortableServer::POA_ptr poa
00060 )
00061 : Servant(poa),
00062   _channel(channel),
00063   _queue(channel.maxQueueLength()),
00064   _pushSupplier(NULL),
00065   _pullSupplier(NULL)
00066 {
00067   if(_channel.properties().hasAttr("FilterId"))
00068   {
00069     string rid =_channel.properties().attrString("FilterId");
00070     _queue.setFilter(new FilterByRepositoryId(rid.c_str()));
00071   }
00072   else if(_channel.properties().hasAttr("FilterKind"))
00073   {
00074     CORBA::TCKind kind =
00075       CORBA::TCKind(_channel.properties().attrLong("FilterKind"));
00076     _queue.setFilter(new FilterByTCKind(kind));
00077   }
00078 
00079   activateObjectWithId("ConsumerAdmin");
00080   _remove_ref();
00081 }
00082 
00083 
00084 ConsumerAdmin_i::~ConsumerAdmin_i()
00085 {
00086   if(_pushSupplier)
00087   {
00088     _pushSupplier->_remove_ref(); // terminates thread.
00089     _pushSupplier=NULL;
00090   }
00091   DB(20,"~ConsumerAdmin_i()")
00092 }
00093 
00094 
00095 void ConsumerAdmin_i::send(CORBA::Any* event)
00096 {
00097   ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00098   _queue.append(event);
00099 }
00100 
00101 
00102 void ConsumerAdmin_i::send(list<CORBA::Any*>& events)
00103 {
00104   if(!events.empty())
00105   {
00106     ProxyPushSupplierManager::PauseThenWake p(_pushSupplier);
00107     for(list<CORBA::Any*>::iterator i=events.begin(); i!=events.end(); ++i)
00108         _queue.append( *i );
00109     events.clear();
00110   }
00111 }
00112 
00113 
00114 void ConsumerAdmin_i::disconnect()
00115 {
00116   if(_pushSupplier)
00117      _pushSupplier->disconnect();
00118   if(_pullSupplier)
00119      _pullSupplier->disconnect();
00120 }
00121 
00122 
00123 void ConsumerAdmin_i::reincarnate(const PersistNode& node)
00124 {
00125   // Build Push Supplier proxies
00126   PersistNode* pushsNode =node.child("ProxyPushSupplier");
00127   if(pushsNode && !pushsNode->_child.empty())
00128   {
00129     _pushSupplier=new ProxyPushSupplierManager(_poa,_queue);
00130     _pushSupplier->_add_ref();
00131     _pushSupplier->reincarnate(*pushsNode);
00132   }
00133 
00134   // Build Pull Supplier proxies
00135   PersistNode* pullsNode =node.child("ProxyPullSupplier");
00136   if(pullsNode && !pullsNode->_child.empty())
00137   {
00138     _pullSupplier=new ProxyPullSupplierManager(_channel,_poa,_queue);
00139     _pullSupplier->reincarnate(*pullsNode);
00140   }
00141 }
00142 
00143 
00144 void ConsumerAdmin_i::output(ostream& os)
00145 {
00146   if(_pushSupplier)
00147   {
00148     omni_mutex_lock l(_pushSupplier->_lock);
00149     _pushSupplier->output(os);
00150   }
00151   if(_pullSupplier)
00152   {
00153     _pullSupplier->output(os);
00154   }
00155 }
00156 
00157 
00158 }; // end namespace OmniEvents

Generated on Mon Jan 9 03:52:13 2006 for OmniEvents by  doxygen 1.4.6