00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030
00031
00032
00033
00034
00035
00036
00037
00038
00039
00040
00041
00042
00043
00044
00045
00046
00047
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081
00082
00083
00084
00085
00086
00087
00088
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144 #include "omniEventsLog.h"
00145
00146 #ifdef HAVE_CONFIG_H
00147 # include "config.h"
00148 #endif
00149
00150 #include <stdio.h>
00151
00152 #ifdef HAVE_STDLIB_H
00153 # include <stdlib.h>
00154 #endif
00155
00156 #ifdef HAVE_SYS_TYPES_H
00157 # include <sys/types.h>
00158 #endif
00159
00160 #ifdef HAVE_SYS_STAT_H
00161 # include <sys/stat.h>
00162 #endif
00163
00164 #ifdef HAVE_FCNTL_H
00165 # include <fcntl.h>
00166 #endif
00167
00168 #if defined(__VMS) && __CRTL_VER < 70000000
00169 # include <omniVMS/unlink.hxx>
00170 #endif
00171
00172 #ifdef __WIN32__
00173 # include <io.h>
00174 # include <winbase.h>
00175 # define stat(x,y) _stat(x,y)
00176 # define unlink(x) _unlink(x)
00177 # define STRUCT_STAT struct _stat
00178 #else
00179 # define STRUCT_STAT struct stat
00180 #endif // __WIN32__
00181
00182 #ifdef HAVE_UNISTD_H
00183 # include <unistd.h>
00184 #endif
00185
00186 #ifdef HAVE_LIBC_H
00187 # include <libc.h>
00188 #endif
00189
00190 #ifdef HAVE_SYS_PARAM_H
00191 # include <sys/param.h>
00192 #endif
00193
00194 #include <errno.h>
00195 #include <time.h>
00196 #include <assert.h>
00197 #include "gethostname.h"
00198
00199 #include "EventChannelFactory.h"
00200 #include "Orb.h"
00201 #include "defaults.h"
00202
00203
00204
00205
00206
00207 #if defined(HAVE_FSTREAM_OPEN)
00208 # define FLAG_TRUNCATE ios::trunc
00209 # define FLAG_APPEND ios::app
00210 # define FLAG_SYNC 0
00211 #elif defined(HAVE_FSTREAM_ATTACH)
00212 # if defined(__WIN32__)
00213 # define FLAG_SYNC 0
00214 # elif defined(O_SYNC)
00215 # define FLAG_SYNC O_SYNC
00216 # else
00217 # define FLAG_SYNC O_FSYNC // FreeBSD 3.2 does not have O_SYNC???
00218 # endif
00219 # define FLAG_TRUNCATE O_CREAT|O_TRUNC
00220 # define FLAG_APPEND O_APPEND
00221 #else
00222 # error "Can't open a file without ofstream::open() or ofstream::attach()"
00223 #endif
00224
00225
00226
00227
00228
00229 #ifdef __VMS
00230 # define VMS_SEMICOLON ";"
00231 #else
00232 # define VMS_SEMICOLON
00233 #endif
00234
00235 extern int yyparse();
00236 extern int yydebug;
00237 extern FILE *yyin;
00238
00239 namespace OmniEvents {
00240
00246 class timestamp
00247 {
00248 char str[29];
00249 public:
00250 timestamp(void)
00251 {
00252 str[0] = '[';
00253 str[1] = str[28] = '\0';
00254 }
00255 const char* t(void)
00256 {
00257 time_t t =time(NULL);
00258 char* p =ctime(&t);
00259 if(strncmp(p, &str[1], 24) == 0)
00260 return "";
00261 strncpy(&str[1], p, 24);
00262 str[25] = ']';
00263 str[26] = ' ';
00264 str[27] = ' ';
00265 return str;
00266 }
00267 };
00268
00269 timestamp ts;
00270
00271
00272
00273
00274
00275 omniEventsLog *omniEventsLog::theLog = NULL;
00276
00277 omniEventsLog::omniEventsLog(const char* logdir) :
00278 _logstream(),
00279 _activeFilename(NULL),
00280 _backupFilename(NULL),
00281 _checkpointFilename(NULL),
00282 _workerThread(NULL),
00283 _factory(NULL),
00284 _checkpointNeeded(true),
00285 _lock()
00286 {
00287 omniEventsLog::theLog = this;
00288 initializeFileNames(logdir);
00289 }
00290
00291
00292 omniEventsLog::~omniEventsLog()
00293 {
00294 omniEventsLog::theLog = NULL;
00295 }
00296
00297
00298 bool omniEventsLog::fileExists(const char* filename) const
00299 {
00300 STRUCT_STAT sb;
00301 return(::stat(filename,&sb) == 0);
00302 }
00303
00304
00305 PersistNode* omniEventsLog::bootstrap(int port, const char* endPointNoListen)
00306 {
00307
00308
00309 PersistNode* initialState=new PersistNode();
00310 PersistNode* ecf =initialState->addnode("ecf");
00311 ecf->addattr("port",port);
00312 if(endPointNoListen && endPointNoListen[0])
00313 ecf->addattr(string("endPointNoListen=")+endPointNoListen);
00314 return initialState;
00315 }
00316
00317
00318 PersistNode* omniEventsLog::parse()
00319 {
00320
00321
00322 ifstream persiststream(_activeFilename);
00323 if(!persiststream)
00324 {
00325 cerr << "Error: cannot read database file '"
00326 << _activeFilename << "'." << endl;
00327 if( fileExists(_backupFilename) )
00328 {
00329 cerr <<
00330 " Backup file '" << _backupFilename << "' exists.\n"
00331 " Either rename it to '" << _activeFilename << "' to\n"
00332 " to recover the server's state, or delete it to create a new\n"
00333 " database file." << endl;
00334 }
00335 exit(1);
00336 }
00337 PersistNode* initialState=new PersistNode(persiststream);
00338 persiststream.close();
00339
00340
00341
00342 const char* errorStr =NULL;
00343 PersistNode* ecf=initialState->child("ecf");
00344 if(!ecf)
00345 errorStr="Can't find EventChannelFactory.";
00346 else if(ecf->attrLong("port",-1)<=0)
00347 errorStr="EventChannelFactory is not assigned a valid port.";
00348
00349 if(errorStr)
00350 {
00351 cerr<<"Error parsing database '"<<_activeFilename<<"'.\n"
00352 <<errorStr<<" Try deleting the file (and any backup)."<<endl;
00353 exit(1);
00354 }
00355
00356 return initialState;
00357 }
00358
00359
00360 void omniEventsLog::incarnateFactory(PersistNode* initialState)
00361 {
00362 assert(initialState!=NULL);
00363
00364
00365
00366 try
00367 {
00368 openOfstream(_logstream,_activeFilename,FLAG_APPEND);
00369 }
00370 catch (IOError& ex)
00371 {
00372 cerr << "Error: cannot "
00373 << (fileExists(_activeFilename)?"write to":"create new")
00374 << " database file '" << _activeFilename
00375 << "': " << strerror(errno) << endl;
00376 cerr << "\nUse option '-l' or set the environment variable "
00377 << OMNIEVENTS_LOGDIR_ENV_VAR
00378 << "\nto specify the directory where the files are kept.\n"
00379 << endl;
00380 _logstream.close();
00381 unlink(_activeFilename);
00382 exit(1);
00383 }
00384
00385
00386
00387 PersistNode* ecf=initialState->child("ecf");
00388 assert(ecf!=NULL);
00389 _factory =new EventChannelFactory_i(*ecf);
00390 assert(!CORBA::is_nil(_factory->_this()));
00391 }
00392
00393
00394 void omniEventsLog::runWorker()
00395 {
00396 assert(_factory!=NULL);
00397
00398 _workerThread=new omniEventsLogWorker(
00399 this,
00400 &omniEventsLog::checkpoint,
00401 omni_thread::PRIORITY_NORMAL
00402 );
00403 }
00404
00405
00406 void omniEventsLog::output(ostream& os)
00407 {
00408 _factory->output(os);
00409 os<<endl;
00410 }
00411
00412
00413 void omniEventsLog::checkpoint(void)
00414 {
00415 int idle_time_btw_chkpt;
00416 static int firstCheckPoint = 1;
00417 char *itbc = getenv("OMNIEVENTS_ITBC");
00418 if (itbc == NULL || sscanf(itbc,"%d",&idle_time_btw_chkpt) != 1)
00419 {
00420 idle_time_btw_chkpt=OMNIEVENTS_LOG_CHECKPOINT_PERIOD;
00421 }
00422
00423 omni_mutex mutex;
00424 omni_condition cond(&mutex);
00425
00426 mutex.lock();
00427 while (1) {
00428
00429
00430
00431
00432
00433 if (! firstCheckPoint)
00434 {
00435 unsigned long s, n;
00436 omni_thread::get_time(&s, &n, idle_time_btw_chkpt);
00437 cond.timedwait(s,n);
00438
00439 _lock.lock();
00440 if(!_checkpointNeeded)
00441 {
00442 _lock.unlock();
00443 continue;
00444 }
00445 }
00446 else
00447 {
00448 _lock.lock();
00449 firstCheckPoint = 0;
00450 }
00451
00452 DB(1,ts.t() << "Checkpointing Phase 1: Prepare.")
00453
00454 ofstream ckpf;
00455 int fd = -1;
00456
00457 try
00458 {
00459 try
00460 {
00461 openOfstream(ckpf,_checkpointFilename,FLAG_TRUNCATE|FLAG_SYNC,&fd);
00462 }
00463 catch(IOError& ex)
00464 {
00465 DB(0,ts.t() << "Error: cannot open checkpoint file '"
00466 << _checkpointFilename << "' for writing.")
00467 throw;
00468 }
00469
00470 output(ckpf);
00471
00472 ckpf.close();
00473 if(!ckpf)
00474 throw IOError();
00475
00476
00477 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00478 if(close(fd) < 0)
00479 throw IOError();
00480 #endif
00481
00482 }
00483 catch(IOError& ex)
00484 {
00485 DB(0,ts.t()<<"I/O error writing checkpoint file: "<<strerror(errno)
00486 <<"\nAbandoning checkpoint")
00487 ckpf.close();
00488
00489 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00490 close(fd);
00491 #endif
00492 unlink(_checkpointFilename);
00493 _lock.unlock();
00494 continue;
00495 }
00496
00497
00498
00499
00500
00501 DB(1,ts.t() << "Checkpointing Phase 2: Commit.")
00502
00503
00504 #if defined(__sunos__) && defined(__SUNPRO_CC) && __SUNPRO_CC < 0x500
00505 close(_logstream.rdbuf()->fd());
00506 #endif
00507
00508 _logstream.close();
00509
00510 unlink(_backupFilename);
00511
00512 #if defined(__WIN32__)
00513 if(rename(_activeFilename, _backupFilename) != 0)
00514 #elif defined(__VMS)
00515 if(rename(_activeFilename, _backupFilename) < 0)
00516 #else
00517 if(link(_activeFilename,_backupFilename) < 0)
00518 #endif
00519 {
00520
00521 DB(0,ts.t() << "Error: failed to link backup file '"
00522 << _backupFilename << "' to old log file '"
00523 << _activeFilename << "'.")
00524 exit(1);
00525 }
00526
00527 #if !defined( __VMS) && !defined(__WIN32__)
00528 if(unlink(_activeFilename) < 0)
00529 {
00530
00531 DB(0,ts.t() << "Error: failed to unlink old log file '"
00532 << _activeFilename << "': " << strerror(errno))
00533 exit(1);
00534 }
00535 #endif
00536
00537 #if defined(__WIN32__)
00538 if(rename(_checkpointFilename,_activeFilename) != 0)
00539 #elif defined(__VMS)
00540 if(rename(_checkpointFilename,_activeFilename) < 0)
00541 #else
00542 if(link(_checkpointFilename,_activeFilename) < 0)
00543 #endif
00544 {
00545
00546 DB(0,ts.t() << "Error: failed to link log file '" << _activeFilename
00547 << "' to checkpoint file '" << _checkpointFilename << "'.")
00548 exit(1);
00549 }
00550
00551 #if !defined( __VMS) && !defined(__WIN32__)
00552 if (unlink(_checkpointFilename) < 0)
00553 {
00554
00555 DB(0,ts.t() << "Error: failed to unlink checkpoint file '"
00556 << _checkpointFilename << "'.")
00557 exit(1);
00558 }
00559 #endif
00560
00561 try
00562 {
00563 openOfstream(_logstream,_activeFilename,FLAG_APPEND|FLAG_SYNC,&fd);
00564 }
00565 catch (IOError& ex)
00566 {
00567 DB(0,ts.t() << "Error: cannot open new log file '" << _activeFilename
00568 << "' for writing.")
00569 exit(1);
00570 }
00571
00572 DB(1,ts.t() << "Checkpointing completed.")
00573
00574 _checkpointNeeded=false;
00575 _lock.unlock();
00576 }
00577 mutex.unlock();
00578 }
00579
00580
00591 void omniEventsLog::initializeFileNames(const char* logdir)
00592 {
00593 if(!logdir)
00594 logdir=getenv(OMNIEVENTS_LOGDIR_ENV_VAR);
00595 if(!logdir)
00596 logdir=OMNIEVENTS_LOG_DEFAULT_LOCATION;
00597
00598 const char* logname ="omnievents-";
00599 char hostname[MAXHOSTNAMELEN];
00600 if (0!=gethostname(hostname,MAXHOSTNAMELEN))
00601 {
00602 cerr << "Error: cannot get the name of this host." << endl;
00603 exit(1);
00604 }
00605 const char* sep ="";
00606
00607 #if defined(__WIN32__)
00608 sep="\\";
00609 #elif defined(__VMS)
00610 char last( logdir[strlen(logdir)-1] );
00611 if (last != ':' && last != ']')
00612 {
00613 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00614 << ") is not a directory name." << endl;
00615 exit(1);
00616 }
00617 #else // Unix
00618 if (logdir[0] != '/')
00619 {
00620 cerr << "Error: " << OMNIEVENTS_LOGDIR_ENV_VAR << " (" << logdir
00621 << ") is not an absolute path name." << endl;
00622 exit(1);
00623 }
00624 if (logdir[strlen(logdir)-1] != '/')
00625 sep="/";
00626 #endif
00627
00628
00629
00630
00631 setFilename(_activeFilename,logdir,sep,logname,hostname,".log" VMS_SEMICOLON);
00632 setFilename(_backupFilename,logdir,sep,logname,hostname,".bak" VMS_SEMICOLON);
00633 setFilename(
00634 _checkpointFilename,logdir,sep,logname,hostname,".ckp" VMS_SEMICOLON);
00635 }
00636
00637
00641 void omniEventsLog::setFilename(
00642 char*& filename, const char* logdir, const char* sep,
00643 const char* logname, const char* hostname, const char* ext)
00644 {
00645 size_t len=1+
00646 strlen(logdir)+strlen(sep)+strlen(logname)+strlen(hostname)+strlen(ext);
00647 filename=new char[len];
00648 sprintf(filename,"%s%s%s%s%s",logdir,sep,logname,hostname,ext);
00649 }
00650
00651
00665 void omniEventsLog::openOfstream(
00666 ofstream& s, const char* filename, int flags, int* fd)
00667 {
00668 #if defined(HAVE_FSTREAM_OPEN)
00669 # ifdef HAVE_STD_IOSTREAM
00670 ios::openmode openmodeflags =ios::out|ios::openmode(flags);
00671 # else
00672 int openmodeflags =ios::out|flags;
00673 # endif
00674
00675 # ifdef FSTREAM_OPEN_PROT
00676 s.open(filename,openmodeflags,0644);
00677 # else
00678 s.open(filename,openmodeflags);
00679 # endif
00680 if (!s)
00681 throw IOError();
00682
00683 #elif defined(HAVE_FSTREAM_ATTACH)
00684 # ifdef __WIN32__
00685 int localFd = _open(filename, O_WRONLY | flags, _S_IWRITE);
00686 # else
00687 int localFd = open(filename, O_WRONLY | flags, 0644);
00688 # endif
00689 if (localFd < 0)
00690 throw IOError();
00691 if(fd)
00692 (*fd)=localFd;
00693 s.attach(localFd);
00694 #endif
00695 }
00696
00697
00698
00699
00700
00701 omniEventsLogWorker::omniEventsLogWorker(
00702 omniEventsLog* object,
00703 Method method,
00704 priority_t priority
00705 ):omni_thread(NULL,priority)
00706 {
00707 DB(15, "omniEventsLogWorker::omniEventsLogWorker()");
00708
00709 _method=method;
00710 _object=object;
00711
00712 start_undetached();
00713 }
00714
00715
00716 void* omniEventsLogWorker::run_undetached(void *)
00717 {
00718 try {
00719 DB(15, "omniEventsLogWorker : run_undetached Start");
00720 (_object->*_method)();
00721 DB(15, "omniEventsLogWorker : run_undetached End");
00722 }
00723 catch (CORBA::SystemException& ex) {
00724 DB(0,"omniEventsLogWorker killed by CORBA system exception"
00725 IF_OMNIORB4(": "<<ex._name()<<" ("<<NP_MINORSTRING(ex)<<")") ".")
00726 }
00727 catch (CORBA::Exception& ex) {
00728 DB(0,"omniEventsLogWorker killed by CORBA exception"
00729 IF_OMNIORB4(": "<<ex._name()<<) ".")
00730 }
00731 catch(...) {
00732 DB(0,"omniEventsLogWorker killed by unknown exception.")
00733 }
00734 return NULL;
00735 }
00736
00737 omniEventsLogWorker::~omniEventsLogWorker()
00738 {
00739 DB(20, "omniEventsLogWorker::~omniEventsLogWorker()");
00740 }
00741
00742
00743 };