[Pkg-nagios-changes] [SCM] Debian packaging for mod gearman. branch, master, updated. f5edd3e88aa2138dcf6d044c835fbd0ada944168

Mark Clarkson (none) mclarkson at hptm2.
Fri Feb 11 11:18:07 UTC 2011


The following commit has been merged in the master branch:
commit 2336370145a568ddc5790f6325de817251246ea2
Merge: 3fc5dd0267d82d7425463d45c7680a6497106cf3 47e540d71d60d3ce676ffc22b7fad1975a19c0a5
Author: Mark Clarkson <mark.clarkson at smorg.co.uk>
Date:   Wed Jan 26 20:37:39 2011 +0000

    Merge branch 'master' of git://github.com/sni/mod_gearman
    
    Conflicts:
    	worker/worker_client.c

diff --combined common/utils.c
index d8a9ed3,8a4915a..c988774
--- a/common/utils.c
+++ b/common/utils.c
@@@ -226,9 -226,6 +226,9 @@@ int set_default_options(mod_gm_opt_t *o
      opt->server_num         = 0;
      for(i=0;i<=GM_LISTSIZE;i++)
          opt->server_list[i] = NULL;
 +    opt->dupserver_num         = 0;
 +    for(i=0;i<=GM_LISTSIZE;i++)
 +        opt->dupserver_list[i] = NULL;
      opt->hostgroups_num     = 0;
      for(i=0;i<=GM_LISTSIZE;i++)
          opt->hostgroups_list[i] = NULL;
@@@ -501,18 -498,6 +501,18 @@@ int parse_args_line(mod_gm_opt_t *opt, 
          }
      }
  
 +    /* duplicate server */
 +    else if ( !strcmp( key, "dupserver" ) ) {
 +        char *servername;
 +        while ( (servername = strsep( &value, "," )) != NULL ) {
 +            servername = trim(servername);
 +            if ( strcmp( servername, "" ) ) {
 +                opt->dupserver_list[opt->dupserver_num] = strdup(servername);
 +                opt->dupserver_num++;
 +            }
 +        }
 +    }
 +
      /* servicegroups */
      else if (   !strcmp( key, "servicegroups" )
               || !strcmp( key, "servicegroup" )
@@@ -648,9 -633,6 +648,9 @@@ void dumpconfig(mod_gm_opt_t *opt, int 
      for(i=0;i<opt->server_num;i++)
          gm_log( GM_LOG_DEBUG, "server:              %s\n", opt->server_list[i]);
      gm_log( GM_LOG_DEBUG, "\n" );
 +    for(i=0;i<opt->dupserver_num;i++)
 +        gm_log( GM_LOG_DEBUG, "dupserver:           %s\n", opt->dupserver_list[i]);
 +    gm_log( GM_LOG_DEBUG, "\n" );
      if(mode == GM_NEB_MODE) {
          gm_log( GM_LOG_DEBUG, "perfdata:            %s\n", opt->perfdata     == GM_ENABLED ? "yes" : "no");
      }
@@@ -694,8 -676,6 +694,8 @@@ void mod_gm_free_opt(mod_gm_opt_t *opt
      int i;
      for(i=0;i<opt->server_num;i++)
          free(opt->server_list[i]);
 +    for(i=0;i<opt->dupserver_num;i++)
 +        free(opt->dupserver_list[i]);
      for(i=0;i<opt->hostgroups_num;i++)
          free(opt->hostgroups_list[i]);
      for(i=0;i<opt->servicegroups_num;i++)
@@@ -964,3 -944,20 +964,20 @@@ int run_check(char *processed_command, 
  
      return retval;
  }
+ 
+ 
+ /* verify if a pid is alive */
+ int pid_alive(int pid) {
+     if(pid < 0) { pid = -pid; }
+ 
+     /* 1/-1 are undefined pids in our case */
+     if(pid == 1)
+         return TRUE;
+ 
+     /* send kill 0 to verify the proc is alive */
+     if(kill(pid, 0) == 0) {
+         return TRUE;
+     }
+ 
+     return FALSE;
+ }
diff --combined include/common.h
index 8a27d20,5840a6a..082b642
--- a/include/common.h
+++ b/include/common.h
@@@ -28,11 -28,11 +28,11 @@@
  #define MOD_GM_COMMON_H
  
  /* constants */
- #define GM_VERSION                  "0.8"
+ #define GM_VERSION                  "1.0"
  #define GM_ENABLED                      1
  #define GM_DISABLED                     0
- #define GM_BUFFERSIZE               16384
- #define GM_MAX_OUTPUT               11000   /* must be ~30% below GM_BUFFERSIZE for base64/encryption */
+ #define GM_BUFFERSIZE               98304
+ #define GM_MAX_OUTPUT               65536   /* must be ~30% below GM_BUFFERSIZE for base64/encryption */
  #define GM_LISTSIZE                   512
  
  #define GM_MIN_LIB_GEARMAN_VERSION   0.14
@@@ -55,10 -55,10 +55,10 @@@
  
  #define GM_DEFAULT_JOB_TIMEOUT         60
  #define GM_DEFAULT_JOB_RETRIES          1
- #define GM_CHILD_SHUTDOWN_TIMEOUT       5
+ #define GM_CHILD_SHUTDOWN_TIMEOUT      30
  #define GM_DEFAULT_RESULT_QUEUE  "check_results"
  #define GM_DEFAULT_IDLE_TIMEOUT        10
- #define GM_DEFAULT_MAX_JOBS            20
+ #define GM_DEFAULT_MAX_JOBS          1000
  #define MAX_CMD_ARGS                 4096
  
  /* worker */
@@@ -96,8 -96,7 +96,7 @@@
  #define STATE_CRITICAL                  2
  #define STATE_UNKNOWN                   3
  
- /* size of the shared memory segment */
- #define GM_SHM_SIZE                   300
+ #define GM_SHM_SIZE                  4096
  
  /* options structure */
  typedef struct mod_gm_opt_struct {
@@@ -107,8 -106,6 +106,8 @@@
      char         * keyfile;
      char         * server_list[GM_LISTSIZE];
      int            server_num;
 +    char         * dupserver_list[GM_LISTSIZE];
 +    int            dupserver_num;
      char         * hostgroups_list[GM_LISTSIZE];
      int            hostgroups_num;
      char         * servicegroups_list[GM_LISTSIZE];
diff --combined worker/worker.c
index ef47cf0,bbb6808..5876ba0
--- a/worker/worker.c
+++ b/worker/worker.c
@@@ -33,10 -33,15 +33,15 @@@ pthread_t status_thr
  
  int     orig_argc;
  char ** orig_argv;
+ int     last_time_increased;
+ volatile sig_atomic_t shmid;
+ int   * shm;
  
  /* work starts here */
  int main (int argc, char **argv) {
-     int sid, x, status, now, last_time_checked, target_number_of_workers;
+     int sid, x;
+ 
+     last_time_increased = 0;
  
      /* store the original command line for later reloads */
      store_original_comandline(argc, argv);
@@@ -99,7 -104,7 +104,7 @@@
  
      /* check and write pid file */
      if(write_pid_file() != GM_OK) {
-         exit( EXIT_SUCCESS );
+         exit(EXIT_FAILURE);
      }
  
      /* init crypto functions */
@@@ -111,6 -116,13 +116,13 @@@
  
      gm_log( GM_LOG_DEBUG, "main process started\n");
  
+     /* start a single non forked standalone worker */
+     if(mod_gm_opt->debug_level >= 10) {
+         gm_log( GM_LOG_TRACE, "starting standalone worker\n");
+         worker_client(GM_WORKER_STANDALONE, 1, shmid);
+         exit(EXIT_SUCCESS);
+     }
+ 
      /* setup shared memory */
      setup_child_communicator();
  
@@@ -122,52 -134,131 +134,131 @@@
          make_new_child(GM_WORKER_MULTI);
      }
  
+     /* maintain worker population */
+     monitor_loop();
+ 
+     clean_exit(15);
+     exit( EXIT_SUCCESS );
+ }
+ 
+ 
+ /* main loop for checking worker */
+ void monitor_loop() {
+     int status;
+ 
      /* maintain the population */
-     now                 = (int)time(NULL);
-     last_time_checked   = now;
      while (1) {
-         /* check number of workers every 3 seconds
-          * sleep gets canceled anyway when receiving signals
-          */
-         sleep(3);
+         /* check number of workers every second */
+         sleep(1);
  
          /* collect finished workers */
-         while(waitpid(-1, &status, WNOHANG) > 0) {
-             current_number_of_workers--;
-             gm_log( GM_LOG_TRACE, "waitpid() %d\n", status);
-             update_runtime_data();
-         }
+         while(waitpid(-1, &status, WNOHANG) > 0)
+             gm_log( GM_LOG_TRACE, "waitpid() worker exited with: %d\n", status);
  
-         if(current_number_of_jobs < 0) { current_number_of_jobs = 0; }
-         if(current_number_of_jobs > current_number_of_workers) { current_number_of_jobs = current_number_of_workers; }
+         check_worker_population();
+     }
+     return;
+ }
  
-         /* keep up minimum population */
-         for (x = current_number_of_workers; x < mod_gm_opt->min_worker; x++) {
-             make_new_child(GM_WORKER_MULTI);
-         }
  
-         now = (int)time(NULL);
-         if(last_time_checked +2 > now)
-             continue;
-         last_time_checked = time(NULL);
+ /* count current worker and jobs */
+ void count_current_worker() {
+     int x;
  
-         target_number_of_workers = adjust_number_of_worker(mod_gm_opt->min_worker, mod_gm_opt->max_worker, current_number_of_workers, current_number_of_jobs);
-         for (x = current_number_of_workers; x < target_number_of_workers; x++) {
-             /* top up the worker pool */
-             make_new_child(GM_WORKER_MULTI);
+     gm_log( GM_LOG_TRACE, "count_current_worker()\n");
+     gm_log( GM_LOG_TRACE, "done jobs:     shm[0] = %d\n", shm[0]);
+ 
+     /* shm states:
+      *   0 -> undefined
+      *  -1 -> free
+      * <-1 -> used but idle
+      * > 1 -> used and working
+      */
+ 
+     /* check if status worker died */
+     if( shm[3] != -1 && pid_alive(shm[3]) == FALSE ) {
+         gm_log( GM_LOG_TRACE, "removed stale status worker, old pid: %d\n", shm[3] );
+         shm[3] = -1;
+     }
+     gm_log( GM_LOG_TRACE, "status worker: shm[3] = %d\n", shm[3]);
+ 
+     /* check all known worker */
+     current_number_of_workers = 0;
+     current_number_of_jobs    = 0;
+     for(x=4; x < mod_gm_opt->max_worker+4; x++) {
+         /* verify worker is alive */
+         if( shm[x] != -1 && pid_alive(shm[x]) == FALSE ) {
+             gm_log( GM_LOG_TRACE, "removed stale worker %d, old pid: %d\n", x, shm[x]);
+             shm[x] = -1;
+         }
+         gm_log( GM_LOG_TRACE, "worker slot:   shm[%d] = %d\n", x, shm[x]);
+         if(shm[x] != -1) {
+             current_number_of_workers++;
+         }
+         if(shm[x] > 0) {
+             current_number_of_jobs++;
          }
      }
  
-     clean_exit(15);
-     exit( EXIT_SUCCESS );
+     shm[1] = current_number_of_workers; /* total worker   */
+     shm[2] = current_number_of_jobs;    /* running worker */
+ 
+     gm_log( GM_LOG_TRACE, "worker: %d  -  running: %d\n", current_number_of_workers, current_number_of_jobs);
+ 
+     return;
+ }
+ 
+ /* start new worker if needed */
+ void check_worker_population() {
+     int x, now, target_number_of_workers;
+ 
+     gm_log( GM_LOG_TRACE, "check_worker_population()\n");
+ 
+     /* set current worker number */
+     count_current_worker();
+ 
+     /* check if status worker died */
+     if( shm[3] == -1 ) {
+         make_new_child(GM_WORKER_STATUS);
+     }
+ 
+     /* keep up minimum population */
+     for (x = current_number_of_workers; x < mod_gm_opt->min_worker; x++) {
+         make_new_child(GM_WORKER_MULTI);
+         current_number_of_workers++;
+     }
+ 
+     now = (int)time(NULL);
+     if(last_time_increased +2 > now)
+         return;
+ 
+     target_number_of_workers = adjust_number_of_worker(mod_gm_opt->min_worker, mod_gm_opt->max_worker, current_number_of_workers, current_number_of_jobs);
+     for (x = current_number_of_workers; x < target_number_of_workers; x++) {
+         last_time_increased = now;
+         /* top up the worker pool */
+         make_new_child(GM_WORKER_MULTI);
+     }
+     return;
  }
  
  
  /* start up new worker */
  int make_new_child(int mode) {
      pid_t pid = 0;
+     int next_shm_index;
  
-     gm_log( GM_LOG_TRACE, "make_new_child()\n");
+     gm_log( GM_LOG_TRACE, "make_new_child(%d)\n", mode);
+ 
+     if(mode == GM_WORKER_STATUS) {
+         gm_log( GM_LOG_TRACE, "forking status worker\n");
+         next_shm_index = 3;
+     } else {
+         gm_log( GM_LOG_TRACE, "forking worker\n");
+         next_shm_index = get_next_shm_index();
+     }
+ 
+     signal(SIGINT,  SIG_DFL);
+     signal(SIGTERM, SIG_DFL);
  
      /* fork a child process */
      pid=fork();
@@@ -181,23 -272,21 +272,21 @@@
  
      /* we are in the child process */
      else if(pid==0){
-         gm_log( GM_LOG_DEBUG, "worker started with pid: %d\n", getpid() );
  
-         signal(SIGUSR1, SIG_IGN);
-         signal(SIGINT,  SIG_DFL);
-         signal(SIGTERM, SIG_DFL);
+         gm_log( GM_LOG_DEBUG, "child started with pid: %d\n", getpid() );
+         shm[next_shm_index] = -getpid();
  
          /* do the real work */
-         worker_client(mode);
+         worker_client(mode, next_shm_index, shmid);
  
          exit(EXIT_SUCCESS);
      }
  
      /* parent  */
      else if(pid > 0){
-         if(mode != GM_WORKER_STATUS)
-             current_number_of_workers++;
-             update_runtime_data();
+         signal(SIGINT, clean_exit);
+         signal(SIGTERM,clean_exit);
+         shm[next_shm_index] = -pid;
      }
  
      return GM_OK;
@@@ -342,8 -431,6 +431,8 @@@ void print_usage() 
      printf("\n");
      printf("       [ --server=<server>     ]\n");
      printf("\n");
 +    printf("       [ --dupserver=<server>  ]\n");
 +    printf("\n");
      printf("       [ --hosts               ]\n");
      printf("       [ --services            ]\n");
      printf("       [ --events              ]\n");
@@@ -367,48 -454,31 +456,31 @@@
  }
  
  
- /* check child signal pipe */
- void check_signal(int sig) {
-     gm_log( GM_LOG_TRACE, "check_signal(%i)\n", sig);
-     update_runtime_data();
-     return;
- }
- 
  /* create shared memory segments */
  void setup_child_communicator() {
-     struct sigaction usr1_action;
-     sigset_t block_mask;
-     int shmid;
-     int * shm;
+     int x;
  
      gm_log( GM_LOG_TRACE, "setup_child_communicator()\n");
  
-     /* setup signal handler */
-     sigfillset (&block_mask); /* block all signals */
-     usr1_action.sa_handler = check_signal;
-     usr1_action.sa_mask    = block_mask;
-     usr1_action.sa_flags   = 0;
-     sigaction (SIGUSR1, &usr1_action, NULL);
- 
      /* Create the segment. */
      mod_gm_shm_key = getpid(); /* use pid as shm key */
      if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, IPC_CREAT | 0600)) < 0) {
          perror("shmget");
-         exit(1);
+         exit( EXIT_FAILURE );
      }
  
      /* Now we attach the segment to our data space. */
      if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
          perror("shmat");
-         exit(1);
+         exit( EXIT_FAILURE );
+     }
+     shm[0] = 0;  /* done jobs         */
+     shm[1] = 0;  /* total worker      */
+     shm[2] = 0;  /* running worker    */
+     shm[3] = -1; /* status worker pid */
+     for(x = 0; x < mod_gm_opt->max_worker; x++) {
+         shm[x+4] = -1; /* status worker   */
      }
-     shm[0] = 0; /* current jobs    */
-     shm[1] = 0; /* current worker  */
-     shm[2] = 0; /* done jobs       */
- 
-     /* detach from shared memory */
-     if(shmdt(shm) < 0)
-         perror("shmdt");
  
      return;
  }
@@@ -419,6 -489,12 +491,12 @@@ int adjust_number_of_worker(int min, in
      int perc_running;
      int idle;
      int target = min;
+ 
+     if(cur_workers == 0) {
+         gm_log( GM_LOG_TRACE, "adjust_number_of_worker(min %d, max %d, worker %d, jobs %d) -> %d\n", min, max, cur_workers, cur_jobs, mod_gm_opt->min_worker);
+         return mod_gm_opt->min_worker;
+     }
+ 
      perc_running = (int)cur_jobs*100/cur_workers;
      idle         = (int)cur_workers - cur_jobs;
  
@@@ -454,16 -530,12 +532,12 @@@ void clean_exit(int sig) 
      /* stop all childs */
      stop_childs(GM_WORKER_STOP);
  
-     /* kill remaining worker */
-     if(current_number_of_workers > 0) {
-         pid_t pid = getpid();
-         kill(-pid, SIGKILL);
-     }
+     /* detach shm */
+     if(shmdt(shm) < 0)
+         perror("shmdt");
  
      gm_log( GM_LOG_INFO, "mod_gearman worker exited\n");
- 
      mod_gm_free_opt(mod_gm_opt);
- 
      exit( EXIT_SUCCESS );
  }
  
@@@ -472,8 -544,9 +546,9 @@@
  void stop_childs(int mode) {
      int status, chld;
      int waited = 0;
-     int shmid;
-     int skipfirst = 0;
+     int x, curpid;
+ 
+     gm_log( GM_LOG_TRACE, "stop_childs(%d)\n", mode);
  
      /* ignore some signals for now */
      signal(SIGTERM, SIG_IGN);
@@@ -483,56 -556,81 +558,81 @@@
       * send term signal to our childs
       * children will finish the current job and exit
       */
-     gm_log( GM_LOG_TRACE, "send SIGTERM\n");
      killpg(0, SIGTERM);
- 
-     gm_log( GM_LOG_TRACE, "waiting for childs to exit...\n");
      while(current_number_of_workers > 0) {
+         gm_log( GM_LOG_TRACE, "send SIGTERM\n");
+         for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+             curpid = shm[x];
+             if(curpid < 0) { curpid = -curpid; }
+             if( curpid != 0 ) {
+                 kill(curpid, SIGTERM);
+             }
+         }
          while((chld = waitpid(-1, &status, WNOHANG)) != -1 && chld > 0) {
-             current_number_of_workers--;
              gm_log( GM_LOG_TRACE, "wait() %d exited with %d\n", chld, status);
-             /* start one worker less than exited, because of the status worker */
-             if(skipfirst == 0 && mode == GM_WORKER_RESTART) {
-                 make_new_child(GM_WORKER_MULTI);
-             }
-             skipfirst = 1;
          }
          sleep(1);
          waited++;
          if(waited > GM_CHILD_SHUTDOWN_TIMEOUT) {
              break;
          }
-         gm_log( GM_LOG_TRACE, "still waiting (%d)...\n", waited);
+         count_current_worker();
+         if(current_number_of_workers == 0)
+             return;
+         gm_log( GM_LOG_TRACE, "still waiting (%d) %d childs missing...\n", waited, current_number_of_workers);
      }
  
      if(mode == GM_WORKER_STOP) {
-         if(current_number_of_workers > 0) {
-             gm_log( GM_LOG_TRACE, "sending SIGINT...\n");
-             killpg(0, SIGINT);
+         killpg(0, SIGINT);
+         count_current_worker();
+         if(current_number_of_workers == 0)
+             return;
+ 
+         gm_log( GM_LOG_TRACE, "sending SIGINT...\n");
+         for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+             curpid = shm[x];
+             if(curpid < 0) { curpid = -curpid; }
+             if( curpid != 0 ) {
+                 kill(curpid, SIGINT);
+             }
          }
  
          while((chld = waitpid(-1, &status, WNOHANG)) != -1 && chld > 0) {
              gm_log( GM_LOG_TRACE, "wait() %d exited with %d\n", chld, status);
          }
  
+         /* kill them the hard way */
+         count_current_worker();
+         if(current_number_of_workers == 0)
+             return;
+         for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+             if( shm[x] != 0 ) {
+                 curpid = shm[x];
+                 if(curpid < 0) { curpid = -curpid; }
+                 if( curpid != 0 ) {
+                     kill(curpid, SIGKILL);
+                 }
+             }
+         }
+ 
+         /* count childs a last time */
+         count_current_worker();
+         if(current_number_of_workers == 0)
+             return;
+ 
          /*
           * clean up shared memory
           * will be removed when last client detaches
           */
-         if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
-             perror("shmget");
-         }
          if( shmctl( shmid, IPC_RMID, 0 ) == -1 ) {
              perror("shmctl");
+         } else {
+             gm_log( GM_LOG_TRACE, "shared memory deleted\n");
          }
-         gm_log( GM_LOG_TRACE, "shared memory deleted\n");
  
-         if(current_number_of_workers > 0) {
-             /* this will kill us too */
-             gm_log( GM_LOG_TRACE, "sending SIGKILL...\n");
-             killpg(0, SIGKILL);
-         }
+         /* this will kill us too */
+         gm_log( GM_LOG_ERROR, "exiting by SIGKILL...\n");
+         killpg(0, SIGKILL);
      }
  
      /* restore signal handlers for a clean exit */
@@@ -623,34 -721,29 +723,29 @@@ void reload_config(int sig) 
  }
  
  
- /* update the number of worker and jobs */
- void update_runtime_data() {
-     int shmid;
-     int *shm;
+ /* return and reserve next shm index*/
+ int get_next_shm_index() {
+     int x;
+     int next_index = 0;
  
-     gm_log( GM_LOG_TRACE, "update_worker_num()\n");
+     gm_log( GM_LOG_TRACE, "get_next_shm_index()\n" );
  
-     /* Locate the segment. */
-     if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
-         perror("shmget");
-         exit(1);
+     for(x = 4; x < mod_gm_opt->max_worker+4; x++) {
+         if(shm[x] == -1) {
+             next_index      = x;
+             shm[next_index] = 1;
+             break;
+         }
      }
  
-     /* Now we attach the segment to our data space. */
-     if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
-         perror("shmat");
-         exit(1);
+     if(next_index == 0) {
+         gm_log(GM_LOG_ERROR, "unable to get next shm id\n");
+         clean_exit(15);
+         exit(EXIT_FAILURE);
      }
+     gm_log( GM_LOG_TRACE, "get_next_shm_index() -> %d\n", next_index );
  
-     gm_log( GM_LOG_TRACE, "update_runtime_data: %i\n", shm[0]);
-     current_number_of_jobs = shm[0];
-     shm[1] = current_number_of_workers;
- 
-     /* detach from shared memory */
-     if(shmdt(shm) < 0)
-         perror("shmdt");
- 
-     return;
+     return next_index;
  }
  
  
diff --combined worker/worker_client.c
index f150270,0d7b460..1b6e391
--- a/worker/worker_client.c
+++ b/worker/worker_client.c
@@@ -35,48 -35,48 +35,55 @@@ char hostname[GM_BUFFERSIZE]
  
  gearman_worker_st worker;
  gearman_client_st client;
 +gearman_client_st client_dup;
  
  gm_job_t * current_job;
+ pid_t current_pid;
  pid_t current_child_pid;
  gm_job_t * exec_job;
  
  int jobs_done = 0;
  int sleep_time_after_error = 1;
  int worker_run_mode;
- 
+ int shm_index = 0;
+ volatile sig_atomic_t shmid;
  
  /* callback for task completed */
- void worker_client(int worker_mode) {
+ void worker_client(int worker_mode, int index, int shid) {
  
-     gm_log( GM_LOG_TRACE, "worker client started\n" );
+     gm_log( GM_LOG_TRACE, "%s worker client started\n", (worker_mode == GM_WORKER_STATUS ? "status" : "job" ));
  
      /* set signal handlers for a clean exit */
      signal(SIGINT, clean_worker_exit);
      signal(SIGTERM,clean_worker_exit);
  
      worker_run_mode = worker_mode;
+     shm_index       = index;
+     shmid           = shid;
+     current_pid     = getpid();
  
      gethostname(hostname, GM_BUFFERSIZE-1);
  
      /* create worker */
      if(set_worker(&worker) != GM_OK) {
          gm_log( GM_LOG_ERROR, "cannot start worker\n" );
+         clean_worker_exit(0);
          exit( EXIT_FAILURE );
      }
  
      /* create client */
      if ( create_client( mod_gm_opt->server_list, &client ) != GM_OK ) {
          gm_log( GM_LOG_ERROR, "cannot start client\n" );
+         clean_worker_exit(0);
          exit( EXIT_FAILURE );
      }
  
 +    /* create duplicate client */
 +    if ( create_client_dup( mod_gm_opt->dupserver_list, &client_dup ) != GM_OK ) {
 +        gm_log( GM_LOG_ERROR, "cannot start client for duplicate server\n" );
 +        exit( EXIT_FAILURE );
 +    }
 +
      worker_loop();
  
      return;
@@@ -89,9 -89,11 +96,11 @@@ void worker_loop() 
      while ( 1 ) {
          gearman_return_t ret;
  
-         /* wait three minutes for a job, otherwise exit */
-         if(worker_run_mode == GM_WORKER_MULTI)
+         /* wait for a job, otherwise exit when hit the idle timeout */
+         if(mod_gm_opt->idle_timeout > 0 && worker_run_mode == GM_WORKER_MULTI) {
+             signal(SIGALRM, idle_sighandler);
              alarm(mod_gm_opt->idle_timeout);
+         }
  
          signal(SIGPIPE, SIG_IGN);
          ret = gearman_worker_work( &worker );
@@@ -100,7 -102,6 +109,7 @@@
              gearman_job_free_all( &worker );
              gearman_worker_free( &worker );
              gearman_client_free( &client );
 +            if( mod_gm_opt->dupserver_num ) gearman_client_free( &client_dup );
  
              /* sleep on error to avoid cpu intensive infinite loops */
              sleep(sleep_time_after_error);
@@@ -111,7 -112,6 +120,7 @@@
              /* create new connections */
              set_worker( &worker );
              create_client( mod_gm_opt->server_list, &client );
 +            create_client( mod_gm_opt->dupserver_list, &client_dup );
          }
      }
  
@@@ -123,21 -123,21 +132,21 @@@
  void *get_job( gearman_job_st *job, void *context, size_t *result_size, gearman_return_t *ret_ptr ) {
      sigset_t block_mask;
      sigset_t old_mask;
-     int wsize;
+     int wsize, valid_lines;
      char workload[GM_BUFFERSIZE];
      char * decrypted_data;
      char * decrypted_data_c;
      char * decrypted_orig;
      char *ptr;
-     char command[GM_BUFFERSIZE];
+ 
+     /* reset timeout for now, will be set befor execution again */
+     alarm(0);
+     signal(SIGALRM, SIG_IGN);
  
      jobs_done++;
  
      /* send start signal to parent */
-     send_state_to_parent(GM_JOB_START);
- 
-     /* reset timeout for now, will be set befor execution again */
-     alarm(0);
+     set_state(GM_JOB_START);
  
      gm_log( GM_LOG_TRACE, "get_job()\n" );
  
@@@ -180,6 -180,7 +189,7 @@@
      exec_job = ( gm_job_t * )malloc( sizeof *exec_job );
      set_default_job(exec_job);
  
+     valid_lines = 0;
      while ( (ptr = strsep(&decrypted_data, "\n" )) != NULL ) {
          char *key   = strsep( &ptr, "=" );
          char *value = strsep( &ptr, "\x0" );
@@@ -192,31 -193,37 +202,37 @@@
  
          if ( !strcmp( key, "host_name" ) ) {
              exec_job->host_name = strdup(value);
+             valid_lines++;
          } else if ( !strcmp( key, "service_description" ) ) {
              exec_job->service_description = strdup(value);
+             valid_lines++;
          } else if ( !strcmp( key, "type" ) ) {
              exec_job->type = strdup(value);
+             valid_lines++;
          } else if ( !strcmp( key, "result_queue" ) ) {
              exec_job->result_queue = strdup(value);
+             valid_lines++;
          } else if ( !strcmp( key, "check_options" ) ) {
              exec_job->check_options = atoi(value);
+             valid_lines++;
          } else if ( !strcmp( key, "scheduled_check" ) ) {
              exec_job->scheduled_check = atoi(value);
+             valid_lines++;
          } else if ( !strcmp( key, "reschedule_check" ) ) {
              exec_job->reschedule_check = atoi(value);
+             valid_lines++;
          } else if ( !strcmp( key, "latency" ) ) {
              exec_job->latency = atof(value);
+             valid_lines++;
          } else if ( !strcmp( key, "start_time" ) ) {
              string2timeval(value, &exec_job->core_start_time);
+             valid_lines++;
          } else if ( !strcmp( key, "timeout" ) ) {
              exec_job->timeout = atoi(value);
+             valid_lines++;
          } else if ( !strcmp( key, "command_line" ) ) {
-             /* adding 2>&1 breaks the exec/popen check and everything will be checked by popen */
-             /*
-             snprintf(command, sizeof(command)+5, "%s 2>&1", value);
-             exec_job->command_line = strdup(command);
-             */
              exec_job->command_line = strdup(value);
+             valid_lines++;
          }
      }
  
@@@ -225,7 -232,14 +241,14 @@@
          write_debug_file(&decrypted_orig);
  #endif
  
-     do_exec_job();
+     if(valid_lines == 0) {
+         gm_log( GM_LOG_ERROR, "discarded invalid job, check your encryption settings\n" );
+     } else {
+         do_exec_job();
+     }
+ 
+     /* send finish signal to parent */
+     set_state(GM_JOB_END);
  
      /* start listening to SIGTERMs */
      sigprocmask(SIG_SETMASK, &old_mask, NULL);
@@@ -234,16 -248,9 +257,9 @@@
      free(decrypted_data_c);
      free_job(exec_job);
  
-     /* send finish signal to parent */
-     send_state_to_parent(GM_JOB_END);
- 
-     if(jobs_done >= mod_gm_opt->max_jobs) {
+     if(mod_gm_opt->max_jobs > 0 && jobs_done >= mod_gm_opt->max_jobs) {
          gm_log( GM_LOG_TRACE, "jobs done: %i -> exiting...\n", jobs_done );
-         gearman_worker_unregister_all(&worker);
-         gearman_job_free_all( &worker );
-         gearman_client_free( &client );
-         if( mod_gm_opt->dupserver_num ) gearman_client_free( &client_dup );
-         mod_gm_free_opt(mod_gm_opt);
+         clean_worker_exit(0);
          exit( EXIT_SUCCESS );
      }
  
@@@ -261,11 -268,11 +277,11 @@@ void do_exec_job( ) 
      gm_log( GM_LOG_TRACE, "do_exec_job()\n" );
  
      if(exec_job->type == NULL) {
-         gm_log( GM_LOG_ERROR, "discarded invalid job\n" );
+         gm_log( GM_LOG_ERROR, "discarded invalid job, no type given\n" );
          return;
      }
      if(exec_job->command_line == NULL) {
-         gm_log( GM_LOG_ERROR, "discarded invalid job\n" );
+         gm_log( GM_LOG_ERROR, "discarded invalid job, no command line given\n" );
          return;
      }
  
@@@ -517,29 -524,6 +533,29 @@@ void send_result_back() 
          gm_log( GM_LOG_TRACE, "send_result_back() finished unsuccessfully\n" );
      }
  
 +    if( mod_gm_opt->dupserver_num ) {
 +        strncpy(temp_buffer2, "type=passive\n", (sizeof(temp_buffer1)-2));
 +        strncat(temp_buffer2, temp_buffer1, (sizeof(temp_buffer2)-2));
 +        temp_buffer2[sizeof( temp_buffer2 )-1]='\x0';
 +        if( add_job_to_queue( &client_dup,
 +                              mod_gm_opt->dupserver_list,
 +                              exec_job->result_queue,
 +                              NULL,
 +                              temp_buffer2,
 +                              GM_JOB_PRIO_NORMAL,
 +                              GM_DEFAULT_JOB_RETRIES,
 +                              mod_gm_opt->transportmode
 +                            ) == GM_OK) {
 +            gm_log( GM_LOG_TRACE, "send_result_back() finished successfully for duplicate server.\n" );
 +        }
 +        else {
 +            gm_log( GM_LOG_TRACE, "send_result_back() finished unsuccessfully for duplicate server\n" );
 +        }
 +    }
 +    else {
 +        gm_log( GM_LOG_TRACE, "send_result_back() has no duplicate servers to send to.\n" );
 +    }
 +
      return;
  }
  
@@@ -592,6 -576,14 +608,14 @@@ int set_worker( gearman_worker_st *w ) 
  }
  
  
+ /* called when worker runs into idle timeout */
+ void idle_sighandler(int sig) {
+     gm_log( GM_LOG_TRACE, "idle_sighandler(%i)\n", sig );
+     clean_worker_exit(0);
+     exit( EXIT_SUCCESS );
+ }
+ 
+ 
  /* called when check runs into timeout */
  void alarm_sighandler(int sig) {
      pid_t pid = getpid();
@@@ -607,66 -599,73 +631,73 @@@
      kill(-pid, SIGKILL);
  
      if(worker_run_mode != GM_WORKER_STANDALONE)
+         clean_worker_exit(0);
          exit(EXIT_SUCCESS);
  
      return;
  }
  
  /* tell parent our state */
- void send_state_to_parent(int status) {
-     int shmid;
+ void set_state(int status) {
      int *shm;
  
-     gm_log( GM_LOG_TRACE, "send_state_to_parent(%d)\n", status );
- 
-     /* Locate the segment */
-     if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
-         perror("shmget");
-         gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
-         exit( EXIT_FAILURE );
-     }
+     gm_log( GM_LOG_TRACE, "set_state(%d)\n", status );
  
      /* Now we attach the segment to our data space. */
      if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
          perror("shmat");
          gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
+         clean_worker_exit(0);
          exit( EXIT_FAILURE );
      }
  
-     /* set our counter */
      if(status == GM_JOB_START)
-         shm[0]++;
+         shm[shm_index] = current_pid;
      if(status == GM_JOB_END) {
-         shm[0]--;
-         shm[2]++; /* increase jobs done */
+         shm[0]++; /* increase jobs done */
+         /* pid in our status slot changed, this should not happen -> exit */
+         if( shm[shm_index] != current_pid && shm[shm_index] != -current_pid ) {
+             gm_log( GM_LOG_ERROR, "double used worker slot: %d != %d\n", current_pid, shm[shm_index] );
+             clean_worker_exit(0);
+             exit( EXIT_FAILURE );
+         }
+         shm[shm_index] = -current_child_pid;
      }
  
      /* detach from shared memory */
      if(shmdt(shm) < 0)
          perror("shmdt");
  
-     if(worker_run_mode != GM_WORKER_MULTI)
-         return;
- 
-     /* wake up parent */
-     kill(getppid(), SIGUSR1);
- 
      return;
  }
  
  
  /* do a clean exit */
  void clean_worker_exit(int sig) {
-     int shmid;
+     int *shm;
  
      gm_log( GM_LOG_TRACE, "clean_worker_exit(%d)\n", sig);
  
+     gm_log( GM_LOG_TRACE, "cleaning worker\n");
      gearman_worker_unregister_all(&worker);
      gearman_job_free_all( &worker );
+     gm_log( GM_LOG_TRACE, "cleaning client\n");
      gearman_client_free( &client );
- 
      mod_gm_free_opt(mod_gm_opt);
  
+     /* Now we attach the segment to our data space. */
+     if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
+         perror("shmat");
+         gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
+         clean_worker_exit(0);
+         exit( EXIT_FAILURE );
+     }
+     shm[shm_index] = -1;
+ 
+     /* detach from shared memory */
+     if(shmdt(shm) < 0)
+         perror("shmdt");
+ 
      if(worker_run_mode != GM_WORKER_STANDALONE)
          exit( EXIT_SUCCESS );
  
@@@ -674,9 -673,6 +705,6 @@@
       * clean up shared memory
       * will be removed when last client detaches
       */
-     if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
-         perror("shmget");
-     }
      if( shmctl( shmid, IPC_RMID, 0 ) == -1 ) {
          perror("shmctl");
      }
@@@ -689,7 -685,6 +717,6 @@@
  void *return_status( gearman_job_st *job, void *context, size_t *result_size, gearman_return_t *ret_ptr ) {
      int wsize;
      char workload[GM_BUFFERSIZE];
-     int shmid;
      int *shm;
      char * result;
  
@@@ -712,13 -707,6 +739,6 @@@
      result = malloc(GM_BUFFERSIZE);
      *result_size = GM_BUFFERSIZE;
  
-     /* Locate the segment */
-     if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
-         perror("shmget");
-         *result_size = 0;
-         return NULL;
-     }
- 
      /* Now we attach the segment to our data space. */
      if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
          perror("shmat");
@@@ -726,7 -714,7 +746,7 @@@
          return NULL;
      }
  
-     snprintf(result, GM_BUFFERSIZE, "%s has %i worker and is working on %i jobs. Version: %s|worker=%i jobs=%ic", hostname, shm[1], shm[0], GM_VERSION, shm[1], shm[2] );
+     snprintf(result, GM_BUFFERSIZE, "%s has %i worker and is working on %i jobs. Version: %s|worker=%i jobs=%ic", hostname, shm[1], shm[2], GM_VERSION, shm[1], shm[0] );
  
      /* detach from shared memory */
      if(shmdt(shm) < 0)

-- 
Debian packaging for mod gearman.



More information about the Pkg-nagios-changes mailing list