OpenDNSSEC-signer 1.3.0
/build/buildd/opendnssec-1.3.0/signer/src/daemon/worker.c
Go to the documentation of this file.
00001 /*
00002  * $Id: worker.c 5320 2011-07-12 10:42:26Z jakob $
00003  *
00004  * Copyright (c) 2009 NLNet Labs. All rights reserved.
00005  *
00006  * Redistribution and use in source and binary forms, with or without
00007  * modification, are permitted provided that the following conditions
00008  * are met:
00009  * 1. Redistributions of source code must retain the above copyright
00010  *    notice, this list of conditions and the following disclaimer.
00011  * 2. Redistributions in binary form must reproduce the above copyright
00012  *    notice, this list of conditions and the following disclaimer in the
00013  *    documentation and/or other materials provided with the distribution.
00014  *
00015  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
00016  * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
00017  * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
00018  * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
00019  * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
00020  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE
00021  * GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
00022  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
00023  * IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
00024  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN
00025  * IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
00026  *
00027  */
00028 
00034 #include "adapter/adapi.h"
00035 #include "daemon/engine.h"
00036 #include "daemon/worker.h"
00037 #include "shared/allocator.h"
00038 #include "scheduler/schedule.h"
00039 #include "scheduler/task.h"
00040 #include "shared/locks.h"
00041 #include "shared/log.h"
00042 #include "shared/status.h"
00043 #include "shared/util.h"
00044 #include "signer/tools.h"
00045 #include "signer/zone.h"
00046 #include "signer/zonedata.h"
00047 
00048 #include <time.h> /* time() */
00049 
00050 ods_lookup_table worker_str[] = {
00051     { WORKER_WORKER, "worker" },
00052     { WORKER_DRUDGER, "drudger" },
00053     { 0, NULL }
00054 };
00055 
00056 
00061 worker_type*
00062 worker_create(allocator_type* allocator, int num, worker_id type)
00063 {
00064     worker_type* worker;
00065 
00066     if (!allocator) {
00067         return NULL;
00068     }
00069     ods_log_assert(allocator);
00070 
00071     worker = (worker_type*) allocator_alloc(allocator, sizeof(worker_type));
00072     if (!worker) {
00073         return NULL;
00074     }
00075 
00076     ods_log_debug("create worker[%i]", num +1);
00077     lock_basic_init(&worker->worker_lock);
00078     lock_basic_set(&worker->worker_alarm);
00079     lock_basic_lock(&worker->worker_lock);
00080     worker->allocator = allocator;
00081     worker->thread_num = num +1;
00082     worker->engine = NULL;
00083     worker->task = NULL;
00084     worker->working_with = TASK_NONE;
00085     worker->need_to_exit = 0;
00086     worker->type = type;
00087     worker->clock_in = 0;
00088     worker->jobs_appointed = 0;
00089     worker->jobs_completed = 0;
00090     worker->jobs_failed = 0;
00091     worker->sleeping = 0;
00092     worker->waiting = 0;
00093     lock_basic_unlock(&worker->worker_lock);
00094     return worker;
00095 }
00096 
00097 
00102 static const char*
00103 worker2str(worker_id type)
00104 {
00105     ods_lookup_table *lt = ods_lookup_by_id(worker_str, type);
00106     if (lt) {
00107         return lt->name;
00108     }
00109     return NULL;
00110 }
00111 
00112 
00117 static int
00118 worker_fulfilled(worker_type* worker)
00119 {
00120     return (worker->jobs_completed + worker->jobs_failed) ==
00121         worker->jobs_appointed;
00122 }
00123 
00124 
00129 static void
00130 worker_perform_task(worker_type* worker)
00131 {
00132     engine_type* engine = NULL;
00133     zone_type* zone = NULL;
00134     task_type* task = NULL;
00135     task_id what = TASK_NONE;
00136     time_t when = 0;
00137     time_t never = (3600*24*365);
00138     ods_status status = ODS_STATUS_OK;
00139     int fallthrough = 0;
00140     int backup = 0;
00141     char* working_dir = NULL;
00142     char* cfg_filename = NULL;
00143     uint32_t tmpserial = 0;
00144     time_t start = 0;
00145     time_t end = 0;
00146 
00147     /* sanity checking */
00148     if (!worker || !worker->task || !worker->task->zone || !worker->engine) {
00149         return;
00150     }
00151     ods_log_assert(worker);
00152     ods_log_assert(worker->task);
00153     ods_log_assert(worker->task->zone);
00154 
00155     engine = (engine_type*) worker->engine;
00156     task = (task_type*) worker->task;
00157     zone = (zone_type*) worker->task->zone;
00158     ods_log_debug("[%s[%i]] perform task %s for zone %s at %u",
00159        worker2str(worker->type), worker->thread_num, task_what2str(task->what),
00160        task_who2str(task->who), (uint32_t) worker->clock_in);
00161 
00162     /* do what you have been told to do */
00163     switch (task->what) {
00164         case TASK_SIGNCONF:
00165             worker->working_with = TASK_SIGNCONF;
00166             /* perform 'load signconf' task */
00167             ods_log_verbose("[%s[%i]] load signconf for zone %s",
00168                 worker2str(worker->type), worker->thread_num,
00169                 task_who2str(task->who));
00170             status = zone_load_signconf(zone, &what);
00171 
00172             /* what to do next */
00173             when = time_now();
00174             if (status == ODS_STATUS_UNCHANGED) {
00175                 if (task->halted != TASK_NONE) {
00176                     goto task_perform_continue;
00177                 } else {
00178                     status = ODS_STATUS_OK;
00179                 }
00180             }
00181 
00182             if (status == ODS_STATUS_OK) {
00183                 status = zone_publish_dnskeys(zone, 0);
00184             }
00185             if (status == ODS_STATUS_OK) {
00186                 status = zone_prepare_nsec3(zone, 0);
00187             }
00188             if (status == ODS_STATUS_OK) {
00189                 status = zonedata_commit(zone->zonedata);
00190             }
00191 
00192             if (status == ODS_STATUS_OK) {
00193                 task->interrupt = TASK_NONE;
00194                 task->halted = TASK_NONE;
00195             } else {
00196                 if (task->halted == TASK_NONE) {
00197                     goto task_perform_fail;
00198                 }
00199                 goto task_perform_continue;
00200             }
00201             fallthrough = 0;
00202             break;
00203         case TASK_READ:
00204             worker->working_with = TASK_READ;
00205             /* perform 'read input adapter' task */
00206             ods_log_verbose("[%s[%i]] read zone %s",
00207                 worker2str(worker->type), worker->thread_num,
00208                 task_who2str(task->who));
00209             status = tools_input(zone);
00210 
00211             /* what to do next */
00212             what = TASK_NSECIFY;
00213             when = time_now();
00214             if (status != ODS_STATUS_OK) {
00215                 if (task->halted == TASK_NONE) {
00216                     goto task_perform_fail;
00217                 }
00218                 goto task_perform_continue;
00219             }
00220             fallthrough = 1;
00221         case TASK_NSECIFY:
00222             worker->working_with = TASK_NSECIFY;
00223             ods_log_verbose("[%s[%i]] nsecify zone %s",
00224                 worker2str(worker->type), worker->thread_num,
00225                 task_who2str(task->who));
00226             status = tools_nsecify(zone);
00227 
00228             /* what to do next */
00229             what = TASK_SIGN;
00230             when = time_now();
00231             if (status == ODS_STATUS_OK) {
00232                 if (task->interrupt > TASK_SIGNCONF) {
00233                     task->interrupt = TASK_NONE;
00234                     task->halted = TASK_NONE;
00235                 }
00236             } else {
00237                 if (task->halted == TASK_NONE) {
00238                     goto task_perform_fail;
00239                 }
00240                 goto task_perform_continue;
00241             }
00242             fallthrough = 1;
00243         case TASK_SIGN:
00244             worker->working_with = TASK_SIGN;
00245             ods_log_verbose("[%s[%i]] sign zone %s",
00246                 worker2str(worker->type), worker->thread_num,
00247                 task_who2str(task->who));
00248             tmpserial = zone->zonedata->internal_serial;
00249             status = zone_update_serial(zone);
00250             if (status != ODS_STATUS_OK) {
00251                 ods_log_error("[%s[%i]] unable to sign zone %s: "
00252                     "failed to increment serial",
00253                     worker2str(worker->type), worker->thread_num,
00254                     task_who2str(task->who));
00255             } else {
00256                 /* start timer */
00257                 start = time(NULL);
00258                 if (zone->stats) {
00259                     lock_basic_lock(&zone->stats->stats_lock);
00260                     if (!zone->stats->start_time) {
00261                         zone->stats->start_time = start;
00262                     }
00263                     zone->stats->sig_count = 0;
00264                     zone->stats->sig_soa_count = 0;
00265                     zone->stats->sig_reuse = 0;
00266                     zone->stats->sig_time = 0;
00267                     lock_basic_unlock(&zone->stats->stats_lock);
00268                 }
00269 
00270                 /* queue menial, hard signing work */
00271                 status = zonedata_queue(zone->zonedata, engine->signq, worker);
00272                 ods_log_debug("[%s[%i]] wait until drudgers are finished "
00273                     " signing zone %s", worker2str(worker->type),
00274                     worker->thread_num, task_who2str(task->who));
00275 
00276                 /* sleep until work is done */
00277                 worker_sleep_unless(worker, 0);
00278                 if (worker->jobs_failed > 0) {
00279                     ods_log_error("[%s[%i]] sign zone %s failed: %u of %u "
00280                         "signatures failed", worker2str(worker->type),
00281                         worker->thread_num, task_who2str(task->who),
00282                         worker->jobs_failed, worker->jobs_appointed);
00283                     status = ODS_STATUS_ERR;
00284                 }
00285                 worker->jobs_appointed = 0;
00286                 worker->jobs_completed = 0;
00287                 worker->jobs_failed = 0;
00288 
00289                 /* stop timer */
00290                 end = time(NULL);
00291                 if (status == ODS_STATUS_OK && zone->stats) {
00292                     lock_basic_lock(&zone->stats->stats_lock);
00293                     zone->stats->sig_time = (end-start);
00294                     lock_basic_unlock(&zone->stats->stats_lock);
00295                 }
00296             }
00297 
00298             /* what to do next */
00299             if (status != ODS_STATUS_OK) {
00300                 /* rollback serial */
00301                 zone->zonedata->internal_serial = tmpserial;
00302                 if (task->halted == TASK_NONE) {
00303                     goto task_perform_fail;
00304                 }
00305                 goto task_perform_continue;
00306             } else {
00307                 if (task->interrupt > TASK_SIGNCONF) {
00308                     task->interrupt = TASK_NONE;
00309                     task->halted = TASK_NONE;
00310                 }
00311             }
00312             what = TASK_AUDIT;
00313             when = time_now();
00314             fallthrough = 1;
00315         case TASK_AUDIT:
00316             worker->working_with = TASK_AUDIT;
00317             if (zone->signconf->audit) {
00318                 ods_log_verbose("[%s[%i]] audit zone %s",
00319                     worker2str(worker->type), worker->thread_num,
00320                     task_who2str(task->who));
00321                 working_dir = strdup(engine->config->working_dir);
00322                 cfg_filename = strdup(engine->config->cfg_filename);
00323                 status = tools_audit(zone, working_dir, cfg_filename);
00324                 if (working_dir)  { free((void*)working_dir); }
00325                 if (cfg_filename) { free((void*)cfg_filename); }
00326                 working_dir = NULL;
00327                 cfg_filename = NULL;
00328             } else {
00329                 status = ODS_STATUS_OK;
00330             }
00331 
00332             /* what to do next */
00333             if (status != ODS_STATUS_OK) {
00334                 if (task->halted == TASK_NONE) {
00335                     goto task_perform_fail;
00336                 }
00337                 goto task_perform_continue;
00338             }
00339             what = TASK_WRITE;
00340             when = time_now();
00341             fallthrough = 1;
00342         case TASK_WRITE:
00343             worker->working_with = TASK_WRITE;
00344             ods_log_verbose("[%s[%i]] write zone %s",
00345                 worker2str(worker->type), worker->thread_num,
00346                 task_who2str(task->who));
00347 
00348             status = tools_output(zone);
00349             zone->processed = 1;
00350 
00351             /* what to do next */
00352             if (status != ODS_STATUS_OK) {
00353                 if (task->halted == TASK_NONE) {
00354                     goto task_perform_fail;
00355                 }
00356                 goto task_perform_continue;
00357             } else {
00358                 if (task->interrupt > TASK_SIGNCONF) {
00359                     task->interrupt = TASK_NONE;
00360                     task->halted = TASK_NONE;
00361                 }
00362             }
00363             if (duration2time(zone->signconf->sig_resign_interval)) {
00364                 what = TASK_SIGN;
00365                 when = time_now() +
00366                     duration2time(zone->signconf->sig_resign_interval);
00367             } else {
00368                 what = TASK_NONE;
00369                 when = time_now() + never;
00370             }
00371             backup = 1;
00372             fallthrough = 0;
00373             break;
00374         case TASK_NONE:
00375             worker->working_with = TASK_NONE;
00376             ods_log_warning("[%s[%i]] none task for zone %s",
00377                 worker2str(worker->type), worker->thread_num,
00378                 task_who2str(task->who));
00379             when = time_now() + never;
00380             fallthrough = 0;
00381             break;
00382         default:
00383             ods_log_warning("[%s[%i]] unknown task, trying full sign zone %s",
00384                 worker2str(worker->type), worker->thread_num,
00385                 task_who2str(task->who));
00386             what = TASK_SIGNCONF;
00387             when = time_now();
00388             fallthrough = 0;
00389             break;
00390     }
00391 
00392     /* no error, reset backoff */
00393     task->backoff = 0;
00394 
00395     /* set next task */
00396     if (fallthrough == 0 && task->interrupt != TASK_NONE &&
00397         task->interrupt != what) {
00398         ods_log_debug("[%s[%i]] interrupt task %s for zone %s",
00399             worker2str(worker->type), worker->thread_num,
00400             task_what2str(what), task_who2str(task->who));
00401 
00402         task->what = task->interrupt;
00403         task->when = time_now();
00404         task->halted = what;
00405     } else {
00406         ods_log_debug("[%s[%i]] next task %s for zone %s",
00407             worker2str(worker->type), worker->thread_num,
00408             task_what2str(what), task_who2str(task->who));
00409 
00410         task->what = what;
00411         task->when = when;
00412         if (!fallthrough) {
00413             task->interrupt = TASK_NONE;
00414             task->halted = TASK_NONE;
00415         }
00416     }
00417 
00418     /* backup the last successful run */
00419     if (backup) {
00420         status = zone_backup(zone);
00421         if (status != ODS_STATUS_OK) {
00422             ods_log_warning("[%s[%i]] unable to backup zone %s: %s",
00423             worker2str(worker->type), worker->thread_num,
00424             task_who2str(task->who), ods_status2str(status));
00425             /* just a warning */
00426             status = ODS_STATUS_OK;
00427         }
00428         backup = 0;
00429     }
00430     return;
00431 
00432 task_perform_fail:
00433     /* in case of failure, also mark zone processed (for single run usage) */
00434     zone->processed = 1;
00435 
00436     if (task->backoff) {
00437         task->backoff *= 2;
00438         if (task->backoff > ODS_SE_MAX_BACKOFF) {
00439             task->backoff = ODS_SE_MAX_BACKOFF;
00440         }
00441     } else {
00442         task->backoff = 60;
00443     }
00444     ods_log_error("[%s[%i]] backoff task %s for zone %s with %u seconds",
00445         worker2str(worker->type), worker->thread_num,
00446         task_what2str(task->what), task_who2str(task->who), task->backoff);
00447 
00448     task->when = time_now() + task->backoff;
00449     return;
00450 
00451 task_perform_continue:
00452     ods_log_info("[%s[%i]] continue task %s for zone %s",
00453         worker2str(worker->type), worker->thread_num,
00454         task_what2str(task->halted), task_who2str(task->who));
00455 
00456     what = task->halted;
00457     task->what = what;
00458     task->when = time_now();
00459     task->interrupt = TASK_NONE;
00460     task->halted = TASK_NONE;
00461     if (zone->processed) {
00462         task->when += duration2time(zone->signconf->sig_resign_interval);
00463     }
00464     return;
00465 }
00466 
00467 
00472 static void
00473 worker_work(worker_type* worker)
00474 {
00475     time_t now, timeout = 1;
00476     zone_type* zone = NULL;
00477     ods_status status = ODS_STATUS_OK;
00478 
00479     ods_log_assert(worker);
00480     ods_log_assert(worker->type == WORKER_WORKER);
00481 
00482     while (worker->need_to_exit == 0) {
00483         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00484             worker->thread_num);
00485         lock_basic_lock(&worker->engine->taskq->schedule_lock);
00486         /* [LOCK] schedule */
00487         worker->task = schedule_pop_task(worker->engine->taskq);
00488         /* [UNLOCK] schedule */
00489         if (worker->task) {
00490             worker->working_with = worker->task->what;
00491             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00492 
00493             zone = worker->task->zone;
00494             lock_basic_lock(&zone->zone_lock);
00495             /* [LOCK] zone */
00496             ods_log_debug("[%s[%i]] start working on zone %s",
00497                 worker2str(worker->type), worker->thread_num, zone->name);
00498 
00499             worker->clock_in = time(NULL);
00500             worker_perform_task(worker);
00501 
00502             zone->task = worker->task;
00503 
00504             ods_log_debug("[%s[%i]] finished working on zone %s",
00505                 worker2str(worker->type), worker->thread_num, zone->name);
00506             /* [UNLOCK] zone */
00507 
00508             lock_basic_lock(&worker->engine->taskq->schedule_lock);
00509             /* [LOCK] zone, schedule */
00510             worker->task = NULL;
00511             worker->working_with = TASK_NONE;
00512             status = schedule_task(worker->engine->taskq, zone->task, 1);
00513             /* [UNLOCK] zone, schedule */
00514             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00515             lock_basic_unlock(&zone->zone_lock);
00516 
00517             timeout = 1;
00518         } else {
00519             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00520                 worker->thread_num);
00521 
00522             /* [LOCK] schedule */
00523             worker->task = schedule_get_first_task(worker->engine->taskq);
00524             /* [UNLOCK] schedule */
00525             lock_basic_unlock(&worker->engine->taskq->schedule_lock);
00526 
00527             now = time_now();
00528             if (worker->task && !worker->engine->taskq->loading) {
00529                 timeout = (worker->task->when - now);
00530             } else {
00531                 timeout *= 2;
00532                 if (timeout > ODS_SE_MAX_BACKOFF) {
00533                     timeout = ODS_SE_MAX_BACKOFF;
00534                 }
00535             }
00536             worker->task = NULL;
00537             worker_sleep(worker, timeout);
00538         }
00539     }
00540     return;
00541 }
00542 
00543 
00548 static void
00549 worker_drudge(worker_type* worker)
00550 {
00551     zone_type* zone = NULL;
00552     task_type* task = NULL;
00553     rrset_type* rrset = NULL;
00554     ods_status status = ODS_STATUS_OK;
00555     worker_type* chief = NULL;
00556     hsm_ctx_t* ctx = NULL;
00557 
00558     ods_log_assert(worker);
00559     ods_log_assert(worker->type == WORKER_DRUDGER);
00560 
00561     ctx = hsm_create_context();
00562     if (ctx == NULL) {
00563         ods_log_error("[%s[%i]] unable to drudge: error "
00564             "creating libhsm context", worker2str(worker->type),
00565             worker->thread_num);
00566     }
00567 
00568     while (worker->need_to_exit == 0) {
00569         ods_log_debug("[%s[%i]] report for duty", worker2str(worker->type),
00570             worker->thread_num);
00571 
00572         lock_basic_lock(&worker->engine->signq->q_lock);
00573         /* [LOCK] schedule */
00574         rrset = (rrset_type*) fifoq_pop(worker->engine->signq, &chief);
00575         /* [UNLOCK] schedule */
00576         lock_basic_unlock(&worker->engine->signq->q_lock);
00577         if (rrset) {
00578             /* set up the work */
00579             if (chief) {
00580                 task = chief->task;
00581             }
00582             if (task) {
00583                 zone = task->zone;
00584             }
00585             if (!zone) {
00586                 ods_log_error("[%s[%i]] unable to drudge: no zone reference",
00587                     worker2str(worker->type), worker->thread_num);
00588             }
00589             if (zone && ctx) {
00590                 ods_log_assert(rrset);
00591                 ods_log_assert(zone);
00592                 ods_log_assert(zone->dname);
00593                 ods_log_assert(zone->signconf);
00594                 ods_log_assert(ctx);
00595 
00596                 worker->clock_in = time(NULL);
00597                 status = rrset_sign(ctx, rrset, zone->dname, zone->signconf,
00598                     chief->clock_in, zone->stats);
00599             } else {
00600                 status = ODS_STATUS_ASSERT_ERR;
00601             }
00602 
00603             if (chief) {
00604                 lock_basic_lock(&chief->worker_lock);
00605                 if (status == ODS_STATUS_OK) {
00606                     chief->jobs_completed += 1;
00607                 } else {
00608                     chief->jobs_failed += 1;
00609                     /* destroy context? */
00610                 }
00611                 lock_basic_unlock(&chief->worker_lock);
00612 
00613                 if (worker_fulfilled(chief) && chief->sleeping) {
00614                     worker_wakeup(chief);
00615                 }
00616             }
00617             rrset = NULL;
00618             zone = NULL;
00619             task = NULL;
00620             chief = NULL;
00621         } else {
00622             ods_log_debug("[%s[%i]] nothing to do", worker2str(worker->type),
00623                 worker->thread_num);
00624             worker_wait(&worker->engine->signq->q_lock,
00625                 &worker->engine->signq->q_threshold);
00626         }
00627     }
00628 
00629     /* cleanup open HSM sessions */
00630     hsm_destroy_context(ctx);
00631     ctx = NULL;
00632     return;
00633 }
00634 
00635 
00640 void
00641 worker_start(worker_type* worker)
00642 {
00643     ods_log_assert(worker);
00644     switch (worker->type) {
00645         case WORKER_DRUDGER:
00646             worker_drudge(worker);
00647             break;
00648         case WORKER_WORKER:
00649             worker_work(worker);
00650             break;
00651         default:
00652             ods_log_error("[worker] illegal worker (id=%i)", worker->type);
00653             return;
00654     }
00655     return;
00656 }
00657 
00658 
00663 void
00664 worker_sleep(worker_type* worker, time_t timeout)
00665 {
00666     ods_log_assert(worker);
00667     lock_basic_lock(&worker->worker_lock);
00668     /* [LOCK] worker */
00669     worker->sleeping = 1;
00670     lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00671         timeout);
00672     /* [UNLOCK] worker */
00673     lock_basic_unlock(&worker->worker_lock);
00674     return;
00675 }
00676 
00677 
00682 void
00683 worker_sleep_unless(worker_type* worker, time_t timeout)
00684 {
00685     ods_log_assert(worker);
00686     lock_basic_lock(&worker->worker_lock);
00687     /* [LOCK] worker */
00688     if (!worker_fulfilled(worker)) {
00689         worker->sleeping = 1;
00690         lock_basic_sleep(&worker->worker_alarm, &worker->worker_lock,
00691             timeout);
00692     }
00693     /* [UNLOCK] worker */
00694     lock_basic_unlock(&worker->worker_lock);
00695     return;
00696 }
00697 
00698 
00703 void
00704 worker_wakeup(worker_type* worker)
00705 {
00706     ods_log_assert(worker);
00707     if (worker && worker->sleeping && !worker->waiting) {
00708         ods_log_debug("[%s[%i]] wake up", worker2str(worker->type),
00709            worker->thread_num);
00710         lock_basic_lock(&worker->worker_lock);
00711         /* [LOCK] worker */
00712         lock_basic_alarm(&worker->worker_alarm);
00713         worker->sleeping = 0;
00714         /* [UNLOCK] worker */
00715         lock_basic_unlock(&worker->worker_lock);
00716     }
00717     return;
00718 }
00719 
00720 
00725 void
00726 worker_wait(lock_basic_type* lock, cond_basic_type* condition)
00727 {
00728     lock_basic_lock(lock);
00729     /* [LOCK] worker */
00730     lock_basic_sleep(condition, lock, 0);
00731     /* [UNLOCK] worker */
00732     lock_basic_unlock(lock);
00733     return;
00734 }
00735 
00736 
00741 void
00742 worker_notify(lock_basic_type* lock, cond_basic_type* condition)
00743 {
00744     lock_basic_lock(lock);
00745     /* [LOCK] lock */
00746     lock_basic_alarm(condition);
00747     /* [UNLOCK] lock */
00748     lock_basic_unlock(lock);
00749     return;
00750 }
00751 
00752 
00757 void
00758 worker_notify_all(lock_basic_type* lock, cond_basic_type* condition)
00759 {
00760     lock_basic_lock(lock);
00761     /* [LOCK] lock */
00762     lock_basic_broadcast(condition);
00763     /* [UNLOCK] lock */
00764     lock_basic_unlock(lock);
00765     return;
00766 }
00767 
00768 
00773 void
00774 worker_cleanup(worker_type* worker)
00775 {
00776     allocator_type* allocator;
00777     cond_basic_type worker_cond;
00778     lock_basic_type worker_lock;
00779 
00780     if (!worker) {
00781         return;
00782     }
00783     allocator = worker->allocator;
00784     worker_cond = worker->worker_alarm;
00785     worker_lock = worker->worker_lock;
00786 
00787     allocator_deallocate(allocator, (void*) worker);
00788     lock_basic_destroy(&worker_lock);
00789     lock_basic_off(&worker_cond);
00790     return;
00791 }