[Pkg-nagios-changes] [SCM] Debian packaging for mod gearman. branch, master, updated. f5edd3e88aa2138dcf6d044c835fbd0ada944168
Mark Clarkson (none)
mclarkson at hptm2.
Fri Feb 11 11:18:07 UTC 2011
The following commit has been merged in the master branch:
commit 2336370145a568ddc5790f6325de817251246ea2
Merge: 3fc5dd0267d82d7425463d45c7680a6497106cf3 47e540d71d60d3ce676ffc22b7fad1975a19c0a5
Author: Mark Clarkson <mark.clarkson at smorg.co.uk>
Date: Wed Jan 26 20:37:39 2011 +0000
Merge branch 'master' of git://github.com/sni/mod_gearman
Conflicts:
worker/worker_client.c
diff --combined common/utils.c
index d8a9ed3,8a4915a..c988774
--- a/common/utils.c
+++ b/common/utils.c
@@@ -226,9 -226,6 +226,9 @@@ int set_default_options(mod_gm_opt_t *o
opt->server_num = 0;
for(i=0;i<=GM_LISTSIZE;i++)
opt->server_list[i] = NULL;
+ opt->dupserver_num = 0;
+ for(i=0;i<=GM_LISTSIZE;i++)
+ opt->dupserver_list[i] = NULL;
opt->hostgroups_num = 0;
for(i=0;i<=GM_LISTSIZE;i++)
opt->hostgroups_list[i] = NULL;
@@@ -501,18 -498,6 +501,18 @@@ int parse_args_line(mod_gm_opt_t *opt,
}
}
+ /* duplicate server */
+ else if ( !strcmp( key, "dupserver" ) ) {
+ char *servername;
+ while ( (servername = strsep( &value, "," )) != NULL ) {
+ servername = trim(servername);
+ if ( strcmp( servername, "" ) ) {
+ opt->dupserver_list[opt->dupserver_num] = strdup(servername);
+ opt->dupserver_num++;
+ }
+ }
+ }
+
/* servicegroups */
else if ( !strcmp( key, "servicegroups" )
|| !strcmp( key, "servicegroup" )
@@@ -648,9 -633,6 +648,9 @@@ void dumpconfig(mod_gm_opt_t *opt, int
for(i=0;i<opt->server_num;i++)
gm_log( GM_LOG_DEBUG, "server: %s\n", opt->server_list[i]);
gm_log( GM_LOG_DEBUG, "\n" );
+ for(i=0;i<opt->dupserver_num;i++)
+ gm_log( GM_LOG_DEBUG, "dupserver: %s\n", opt->dupserver_list[i]);
+ gm_log( GM_LOG_DEBUG, "\n" );
if(mode == GM_NEB_MODE) {
gm_log( GM_LOG_DEBUG, "perfdata: %s\n", opt->perfdata == GM_ENABLED ? "yes" : "no");
}
@@@ -694,8 -676,6 +694,8 @@@ void mod_gm_free_opt(mod_gm_opt_t *opt
int i;
for(i=0;i<opt->server_num;i++)
free(opt->server_list[i]);
+ for(i=0;i<opt->dupserver_num;i++)
+ free(opt->dupserver_list[i]);
for(i=0;i<opt->hostgroups_num;i++)
free(opt->hostgroups_list[i]);
for(i=0;i<opt->servicegroups_num;i++)
@@@ -964,3 -944,20 +964,20 @@@ int run_check(char *processed_command,
return retval;
}
+
+
+ /* verify if a pid is alive */
+ int pid_alive(int pid) {
+ if(pid < 0) { pid = -pid; }
+
+ /* 1/-1 are undefined pids in our case */
+ if(pid == 1)
+ return TRUE;
+
+ /* send kill 0 to verify the proc is alive */
+ if(kill(pid, 0) == 0) {
+ return TRUE;
+ }
+
+ return FALSE;
+ }
diff --combined include/common.h
index 8a27d20,5840a6a..082b642
--- a/include/common.h
+++ b/include/common.h
@@@ -28,11 -28,11 +28,11 @@@
#define MOD_GM_COMMON_H
/* constants */
- #define GM_VERSION "0.8"
+ #define GM_VERSION "1.0"
#define GM_ENABLED 1
#define GM_DISABLED 0
- #define GM_BUFFERSIZE 16384
- #define GM_MAX_OUTPUT 11000 /* must be ~30% below GM_BUFFERSIZE for base64/encryption */
+ #define GM_BUFFERSIZE 98304
+ #define GM_MAX_OUTPUT 65536 /* must be ~30% below GM_BUFFERSIZE for base64/encryption */
#define GM_LISTSIZE 512
#define GM_MIN_LIB_GEARMAN_VERSION 0.14
@@@ -55,10 -55,10 +55,10 @@@
#define GM_DEFAULT_JOB_TIMEOUT 60
#define GM_DEFAULT_JOB_RETRIES 1
- #define GM_CHILD_SHUTDOWN_TIMEOUT 5
+ #define GM_CHILD_SHUTDOWN_TIMEOUT 30
#define GM_DEFAULT_RESULT_QUEUE "check_results"
#define GM_DEFAULT_IDLE_TIMEOUT 10
- #define GM_DEFAULT_MAX_JOBS 20
+ #define GM_DEFAULT_MAX_JOBS 1000
#define MAX_CMD_ARGS 4096
/* worker */
@@@ -96,8 -96,7 +96,7 @@@
#define STATE_CRITICAL 2
#define STATE_UNKNOWN 3
- /* size of the shared memory segment */
- #define GM_SHM_SIZE 300
+ #define GM_SHM_SIZE 4096
/* options structure */
typedef struct mod_gm_opt_struct {
@@@ -107,8 -106,6 +106,8 @@@
char * keyfile;
char * server_list[GM_LISTSIZE];
int server_num;
+ char * dupserver_list[GM_LISTSIZE];
+ int dupserver_num;
char * hostgroups_list[GM_LISTSIZE];
int hostgroups_num;
char * servicegroups_list[GM_LISTSIZE];
diff --combined worker/worker.c
index ef47cf0,bbb6808..5876ba0
--- a/worker/worker.c
+++ b/worker/worker.c
@@@ -33,10 -33,15 +33,15 @@@ pthread_t status_thr
int orig_argc;
char ** orig_argv;
+ int last_time_increased;
+ volatile sig_atomic_t shmid;
+ int * shm;
/* work starts here */
int main (int argc, char **argv) {
- int sid, x, status, now, last_time_checked, target_number_of_workers;
+ int sid, x;
+
+ last_time_increased = 0;
/* store the original command line for later reloads */
store_original_comandline(argc, argv);
@@@ -99,7 -104,7 +104,7 @@@
/* check and write pid file */
if(write_pid_file() != GM_OK) {
- exit( EXIT_SUCCESS );
+ exit(EXIT_FAILURE);
}
/* init crypto functions */
@@@ -111,6 -116,13 +116,13 @@@
gm_log( GM_LOG_DEBUG, "main process started\n");
+ /* start a single non forked standalone worker */
+ if(mod_gm_opt->debug_level >= 10) {
+ gm_log( GM_LOG_TRACE, "starting standalone worker\n");
+ worker_client(GM_WORKER_STANDALONE, 1, shmid);
+ exit(EXIT_SUCCESS);
+ }
+
/* setup shared memory */
setup_child_communicator();
@@@ -122,52 -134,131 +134,131 @@@
make_new_child(GM_WORKER_MULTI);
}
+ /* maintain worker population */
+ monitor_loop();
+
+ clean_exit(15);
+ exit( EXIT_SUCCESS );
+ }
+
+
+ /* main loop for checking worker */
+ void monitor_loop() {
+ int status;
+
/* maintain the population */
- now = (int)time(NULL);
- last_time_checked = now;
while (1) {
- /* check number of workers every 3 seconds
- * sleep gets canceled anyway when receiving signals
- */
- sleep(3);
+ /* check number of workers every second */
+ sleep(1);
/* collect finished workers */
- while(waitpid(-1, &status, WNOHANG) > 0) {
- current_number_of_workers--;
- gm_log( GM_LOG_TRACE, "waitpid() %d\n", status);
- update_runtime_data();
- }
+ while(waitpid(-1, &status, WNOHANG) > 0)
+ gm_log( GM_LOG_TRACE, "waitpid() worker exited with: %d\n", status);
- if(current_number_of_jobs < 0) { current_number_of_jobs = 0; }
- if(current_number_of_jobs > current_number_of_workers) { current_number_of_jobs = current_number_of_workers; }
+ check_worker_population();
+ }
+ return;
+ }
- /* keep up minimum population */
- for (x = current_number_of_workers; x < mod_gm_opt->min_worker; x++) {
- make_new_child(GM_WORKER_MULTI);
- }
- now = (int)time(NULL);
- if(last_time_checked +2 > now)
- continue;
- last_time_checked = time(NULL);
+ /* count current worker and jobs */
+ void count_current_worker() {
+ int x;
- target_number_of_workers = adjust_number_of_worker(mod_gm_opt->min_worker, mod_gm_opt->max_worker, current_number_of_workers, current_number_of_jobs);
- for (x = current_number_of_workers; x < target_number_of_workers; x++) {
- /* top up the worker pool */
- make_new_child(GM_WORKER_MULTI);
+ gm_log( GM_LOG_TRACE, "count_current_worker()\n");
+ gm_log( GM_LOG_TRACE, "done jobs: shm[0] = %d\n", shm[0]);
+
+ /* shm states:
+ * 0 -> undefined
+ * -1 -> free
+ * <-1 -> used but idle
+ * > 1 -> used and working
+ */
+
+ /* check if status worker died */
+ if( shm[3] != -1 && pid_alive(shm[3]) == FALSE ) {
+ gm_log( GM_LOG_TRACE, "removed stale status worker, old pid: %d\n", shm[3] );
+ shm[3] = -1;
+ }
+ gm_log( GM_LOG_TRACE, "status worker: shm[3] = %d\n", shm[3]);
+
+ /* check all known worker */
+ current_number_of_workers = 0;
+ current_number_of_jobs = 0;
+ for(x=4; x < mod_gm_opt->max_worker+4; x++) {
+ /* verify worker is alive */
+ if( shm[x] != -1 && pid_alive(shm[x]) == FALSE ) {
+ gm_log( GM_LOG_TRACE, "removed stale worker %d, old pid: %d\n", x, shm[x]);
+ shm[x] = -1;
+ }
+ gm_log( GM_LOG_TRACE, "worker slot: shm[%d] = %d\n", x, shm[x]);
+ if(shm[x] != -1) {
+ current_number_of_workers++;
+ }
+ if(shm[x] > 0) {
+ current_number_of_jobs++;
}
}
- clean_exit(15);
- exit( EXIT_SUCCESS );
+ shm[1] = current_number_of_workers; /* total worker */
+ shm[2] = current_number_of_jobs; /* running worker */
+
+ gm_log( GM_LOG_TRACE, "worker: %d - running: %d\n", current_number_of_workers, current_number_of_jobs);
+
+ return;
+ }
+
+ /* start new worker if needed */
+ void check_worker_population() {
+ int x, now, target_number_of_workers;
+
+ gm_log( GM_LOG_TRACE, "check_worker_population()\n");
+
+ /* set current worker number */
+ count_current_worker();
+
+ /* check if status worker died */
+ if( shm[3] == -1 ) {
+ make_new_child(GM_WORKER_STATUS);
+ }
+
+ /* keep up minimum population */
+ for (x = current_number_of_workers; x < mod_gm_opt->min_worker; x++) {
+ make_new_child(GM_WORKER_MULTI);
+ current_number_of_workers++;
+ }
+
+ now = (int)time(NULL);
+ if(last_time_increased +2 > now)
+ return;
+
+ target_number_of_workers = adjust_number_of_worker(mod_gm_opt->min_worker, mod_gm_opt->max_worker, current_number_of_workers, current_number_of_jobs);
+ for (x = current_number_of_workers; x < target_number_of_workers; x++) {
+ last_time_increased = now;
+ /* top up the worker pool */
+ make_new_child(GM_WORKER_MULTI);
+ }
+ return;
}
/* start up new worker */
int make_new_child(int mode) {
pid_t pid = 0;
+ int next_shm_index;
- gm_log( GM_LOG_TRACE, "make_new_child()\n");
+ gm_log( GM_LOG_TRACE, "make_new_child(%d)\n", mode);
+
+ if(mode == GM_WORKER_STATUS) {
+ gm_log( GM_LOG_TRACE, "forking status worker\n");
+ next_shm_index = 3;
+ } else {
+ gm_log( GM_LOG_TRACE, "forking worker\n");
+ next_shm_index = get_next_shm_index();
+ }
+
+ signal(SIGINT, SIG_DFL);
+ signal(SIGTERM, SIG_DFL);
/* fork a child process */
pid=fork();
@@@ -181,23 -272,21 +272,21 @@@
/* we are in the child process */
else if(pid==0){
- gm_log( GM_LOG_DEBUG, "worker started with pid: %d\n", getpid() );
- signal(SIGUSR1, SIG_IGN);
- signal(SIGINT, SIG_DFL);
- signal(SIGTERM, SIG_DFL);
+ gm_log( GM_LOG_DEBUG, "child started with pid: %d\n", getpid() );
+ shm[next_shm_index] = -getpid();
/* do the real work */
- worker_client(mode);
+ worker_client(mode, next_shm_index, shmid);
exit(EXIT_SUCCESS);
}
/* parent */
else if(pid > 0){
- if(mode != GM_WORKER_STATUS)
- current_number_of_workers++;
- update_runtime_data();
+ signal(SIGINT, clean_exit);
+ signal(SIGTERM,clean_exit);
+ shm[next_shm_index] = -pid;
}
return GM_OK;
@@@ -342,8 -431,6 +431,8 @@@ void print_usage()
printf("\n");
printf(" [ --server=<server> ]\n");
printf("\n");
+ printf(" [ --dupserver=<server> ]\n");
+ printf("\n");
printf(" [ --hosts ]\n");
printf(" [ --services ]\n");
printf(" [ --events ]\n");
@@@ -367,48 -454,31 +456,31 @@@
}
- /* check child signal pipe */
- void check_signal(int sig) {
- gm_log( GM_LOG_TRACE, "check_signal(%i)\n", sig);
- update_runtime_data();
- return;
- }
-
/* create shared memory segments */
void setup_child_communicator() {
- struct sigaction usr1_action;
- sigset_t block_mask;
- int shmid;
- int * shm;
+ int x;
gm_log( GM_LOG_TRACE, "setup_child_communicator()\n");
- /* setup signal handler */
- sigfillset (&block_mask); /* block all signals */
- usr1_action.sa_handler = check_signal;
- usr1_action.sa_mask = block_mask;
- usr1_action.sa_flags = 0;
- sigaction (SIGUSR1, &usr1_action, NULL);
-
/* Create the segment. */
mod_gm_shm_key = getpid(); /* use pid as shm key */
if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, IPC_CREAT | 0600)) < 0) {
perror("shmget");
- exit(1);
+ exit( EXIT_FAILURE );
}
/* Now we attach the segment to our data space. */
if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
perror("shmat");
- exit(1);
+ exit( EXIT_FAILURE );
+ }
+ shm[0] = 0; /* done jobs */
+ shm[1] = 0; /* total worker */
+ shm[2] = 0; /* running worker */
+ shm[3] = -1; /* status worker pid */
+ for(x = 0; x < mod_gm_opt->max_worker; x++) {
+ shm[x+4] = -1; /* status worker */
}
- shm[0] = 0; /* current jobs */
- shm[1] = 0; /* current worker */
- shm[2] = 0; /* done jobs */
-
- /* detach from shared memory */
- if(shmdt(shm) < 0)
- perror("shmdt");
return;
}
@@@ -419,6 -489,12 +491,12 @@@ int adjust_number_of_worker(int min, in
int perc_running;
int idle;
int target = min;
+
+ if(cur_workers == 0) {
+ gm_log( GM_LOG_TRACE, "adjust_number_of_worker(min %d, max %d, worker %d, jobs %d) -> %d\n", min, max, cur_workers, cur_jobs, mod_gm_opt->min_worker);
+ return mod_gm_opt->min_worker;
+ }
+
perc_running = (int)cur_jobs*100/cur_workers;
idle = (int)cur_workers - cur_jobs;
@@@ -454,16 -530,12 +532,12 @@@ void clean_exit(int sig)
/* stop all childs */
stop_childs(GM_WORKER_STOP);
- /* kill remaining worker */
- if(current_number_of_workers > 0) {
- pid_t pid = getpid();
- kill(-pid, SIGKILL);
- }
+ /* detach shm */
+ if(shmdt(shm) < 0)
+ perror("shmdt");
gm_log( GM_LOG_INFO, "mod_gearman worker exited\n");
-
mod_gm_free_opt(mod_gm_opt);
-
exit( EXIT_SUCCESS );
}
@@@ -472,8 -544,9 +546,9 @@@
void stop_childs(int mode) {
int status, chld;
int waited = 0;
- int shmid;
- int skipfirst = 0;
+ int x, curpid;
+
+ gm_log( GM_LOG_TRACE, "stop_childs(%d)\n", mode);
/* ignore some signals for now */
signal(SIGTERM, SIG_IGN);
@@@ -483,56 -556,81 +558,81 @@@
* send term signal to our childs
* children will finish the current job and exit
*/
- gm_log( GM_LOG_TRACE, "send SIGTERM\n");
killpg(0, SIGTERM);
-
- gm_log( GM_LOG_TRACE, "waiting for childs to exit...\n");
while(current_number_of_workers > 0) {
+ gm_log( GM_LOG_TRACE, "send SIGTERM\n");
+ for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+ curpid = shm[x];
+ if(curpid < 0) { curpid = -curpid; }
+ if( curpid != 0 ) {
+ kill(curpid, SIGTERM);
+ }
+ }
while((chld = waitpid(-1, &status, WNOHANG)) != -1 && chld > 0) {
- current_number_of_workers--;
gm_log( GM_LOG_TRACE, "wait() %d exited with %d\n", chld, status);
- /* start one worker less than exited, because of the status worker */
- if(skipfirst == 0 && mode == GM_WORKER_RESTART) {
- make_new_child(GM_WORKER_MULTI);
- }
- skipfirst = 1;
}
sleep(1);
waited++;
if(waited > GM_CHILD_SHUTDOWN_TIMEOUT) {
break;
}
- gm_log( GM_LOG_TRACE, "still waiting (%d)...\n", waited);
+ count_current_worker();
+ if(current_number_of_workers == 0)
+ return;
+ gm_log( GM_LOG_TRACE, "still waiting (%d) %d childs missing...\n", waited, current_number_of_workers);
}
if(mode == GM_WORKER_STOP) {
- if(current_number_of_workers > 0) {
- gm_log( GM_LOG_TRACE, "sending SIGINT...\n");
- killpg(0, SIGINT);
+ killpg(0, SIGINT);
+ count_current_worker();
+ if(current_number_of_workers == 0)
+ return;
+
+ gm_log( GM_LOG_TRACE, "sending SIGINT...\n");
+ for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+ curpid = shm[x];
+ if(curpid < 0) { curpid = -curpid; }
+ if( curpid != 0 ) {
+ kill(curpid, SIGINT);
+ }
}
while((chld = waitpid(-1, &status, WNOHANG)) != -1 && chld > 0) {
gm_log( GM_LOG_TRACE, "wait() %d exited with %d\n", chld, status);
}
+ /* kill them the hard way */
+ count_current_worker();
+ if(current_number_of_workers == 0)
+ return;
+ for(x=3; x < mod_gm_opt->max_worker+4; x++) {
+ if( shm[x] != 0 ) {
+ curpid = shm[x];
+ if(curpid < 0) { curpid = -curpid; }
+ if( curpid != 0 ) {
+ kill(curpid, SIGKILL);
+ }
+ }
+ }
+
+ /* count childs a last time */
+ count_current_worker();
+ if(current_number_of_workers == 0)
+ return;
+
/*
* clean up shared memory
* will be removed when last client detaches
*/
- if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
- perror("shmget");
- }
if( shmctl( shmid, IPC_RMID, 0 ) == -1 ) {
perror("shmctl");
+ } else {
+ gm_log( GM_LOG_TRACE, "shared memory deleted\n");
}
- gm_log( GM_LOG_TRACE, "shared memory deleted\n");
- if(current_number_of_workers > 0) {
- /* this will kill us too */
- gm_log( GM_LOG_TRACE, "sending SIGKILL...\n");
- killpg(0, SIGKILL);
- }
+ /* this will kill us too */
+ gm_log( GM_LOG_ERROR, "exiting by SIGKILL...\n");
+ killpg(0, SIGKILL);
}
/* restore signal handlers for a clean exit */
@@@ -623,34 -721,29 +723,29 @@@ void reload_config(int sig)
}
- /* update the number of worker and jobs */
- void update_runtime_data() {
- int shmid;
- int *shm;
+ /* return and reserve next shm index*/
+ int get_next_shm_index() {
+ int x;
+ int next_index = 0;
- gm_log( GM_LOG_TRACE, "update_worker_num()\n");
+ gm_log( GM_LOG_TRACE, "get_next_shm_index()\n" );
- /* Locate the segment. */
- if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
- perror("shmget");
- exit(1);
+ for(x = 4; x < mod_gm_opt->max_worker+4; x++) {
+ if(shm[x] == -1) {
+ next_index = x;
+ shm[next_index] = 1;
+ break;
+ }
}
- /* Now we attach the segment to our data space. */
- if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
- perror("shmat");
- exit(1);
+ if(next_index == 0) {
+ gm_log(GM_LOG_ERROR, "unable to get next shm id\n");
+ clean_exit(15);
+ exit(EXIT_FAILURE);
}
+ gm_log( GM_LOG_TRACE, "get_next_shm_index() -> %d\n", next_index );
- gm_log( GM_LOG_TRACE, "update_runtime_data: %i\n", shm[0]);
- current_number_of_jobs = shm[0];
- shm[1] = current_number_of_workers;
-
- /* detach from shared memory */
- if(shmdt(shm) < 0)
- perror("shmdt");
-
- return;
+ return next_index;
}
diff --combined worker/worker_client.c
index f150270,0d7b460..1b6e391
--- a/worker/worker_client.c
+++ b/worker/worker_client.c
@@@ -35,48 -35,48 +35,55 @@@ char hostname[GM_BUFFERSIZE]
gearman_worker_st worker;
gearman_client_st client;
+gearman_client_st client_dup;
gm_job_t * current_job;
+ pid_t current_pid;
pid_t current_child_pid;
gm_job_t * exec_job;
int jobs_done = 0;
int sleep_time_after_error = 1;
int worker_run_mode;
-
+ int shm_index = 0;
+ volatile sig_atomic_t shmid;
/* callback for task completed */
- void worker_client(int worker_mode) {
+ void worker_client(int worker_mode, int index, int shid) {
- gm_log( GM_LOG_TRACE, "worker client started\n" );
+ gm_log( GM_LOG_TRACE, "%s worker client started\n", (worker_mode == GM_WORKER_STATUS ? "status" : "job" ));
/* set signal handlers for a clean exit */
signal(SIGINT, clean_worker_exit);
signal(SIGTERM,clean_worker_exit);
worker_run_mode = worker_mode;
+ shm_index = index;
+ shmid = shid;
+ current_pid = getpid();
gethostname(hostname, GM_BUFFERSIZE-1);
/* create worker */
if(set_worker(&worker) != GM_OK) {
gm_log( GM_LOG_ERROR, "cannot start worker\n" );
+ clean_worker_exit(0);
exit( EXIT_FAILURE );
}
/* create client */
if ( create_client( mod_gm_opt->server_list, &client ) != GM_OK ) {
gm_log( GM_LOG_ERROR, "cannot start client\n" );
+ clean_worker_exit(0);
exit( EXIT_FAILURE );
}
+ /* create duplicate client */
+ if ( create_client_dup( mod_gm_opt->dupserver_list, &client_dup ) != GM_OK ) {
+ gm_log( GM_LOG_ERROR, "cannot start client for duplicate server\n" );
+ exit( EXIT_FAILURE );
+ }
+
worker_loop();
return;
@@@ -89,9 -89,11 +96,11 @@@ void worker_loop()
while ( 1 ) {
gearman_return_t ret;
- /* wait three minutes for a job, otherwise exit */
- if(worker_run_mode == GM_WORKER_MULTI)
+ /* wait for a job, otherwise exit when hit the idle timeout */
+ if(mod_gm_opt->idle_timeout > 0 && worker_run_mode == GM_WORKER_MULTI) {
+ signal(SIGALRM, idle_sighandler);
alarm(mod_gm_opt->idle_timeout);
+ }
signal(SIGPIPE, SIG_IGN);
ret = gearman_worker_work( &worker );
@@@ -100,7 -102,6 +109,7 @@@
gearman_job_free_all( &worker );
gearman_worker_free( &worker );
gearman_client_free( &client );
+ if( mod_gm_opt->dupserver_num ) gearman_client_free( &client_dup );
/* sleep on error to avoid cpu intensive infinite loops */
sleep(sleep_time_after_error);
@@@ -111,7 -112,6 +120,7 @@@
/* create new connections */
set_worker( &worker );
create_client( mod_gm_opt->server_list, &client );
+ create_client( mod_gm_opt->dupserver_list, &client_dup );
}
}
@@@ -123,21 -123,21 +132,21 @@@
void *get_job( gearman_job_st *job, void *context, size_t *result_size, gearman_return_t *ret_ptr ) {
sigset_t block_mask;
sigset_t old_mask;
- int wsize;
+ int wsize, valid_lines;
char workload[GM_BUFFERSIZE];
char * decrypted_data;
char * decrypted_data_c;
char * decrypted_orig;
char *ptr;
- char command[GM_BUFFERSIZE];
+
+ /* reset timeout for now, will be set befor execution again */
+ alarm(0);
+ signal(SIGALRM, SIG_IGN);
jobs_done++;
/* send start signal to parent */
- send_state_to_parent(GM_JOB_START);
-
- /* reset timeout for now, will be set befor execution again */
- alarm(0);
+ set_state(GM_JOB_START);
gm_log( GM_LOG_TRACE, "get_job()\n" );
@@@ -180,6 -180,7 +189,7 @@@
exec_job = ( gm_job_t * )malloc( sizeof *exec_job );
set_default_job(exec_job);
+ valid_lines = 0;
while ( (ptr = strsep(&decrypted_data, "\n" )) != NULL ) {
char *key = strsep( &ptr, "=" );
char *value = strsep( &ptr, "\x0" );
@@@ -192,31 -193,37 +202,37 @@@
if ( !strcmp( key, "host_name" ) ) {
exec_job->host_name = strdup(value);
+ valid_lines++;
} else if ( !strcmp( key, "service_description" ) ) {
exec_job->service_description = strdup(value);
+ valid_lines++;
} else if ( !strcmp( key, "type" ) ) {
exec_job->type = strdup(value);
+ valid_lines++;
} else if ( !strcmp( key, "result_queue" ) ) {
exec_job->result_queue = strdup(value);
+ valid_lines++;
} else if ( !strcmp( key, "check_options" ) ) {
exec_job->check_options = atoi(value);
+ valid_lines++;
} else if ( !strcmp( key, "scheduled_check" ) ) {
exec_job->scheduled_check = atoi(value);
+ valid_lines++;
} else if ( !strcmp( key, "reschedule_check" ) ) {
exec_job->reschedule_check = atoi(value);
+ valid_lines++;
} else if ( !strcmp( key, "latency" ) ) {
exec_job->latency = atof(value);
+ valid_lines++;
} else if ( !strcmp( key, "start_time" ) ) {
string2timeval(value, &exec_job->core_start_time);
+ valid_lines++;
} else if ( !strcmp( key, "timeout" ) ) {
exec_job->timeout = atoi(value);
+ valid_lines++;
} else if ( !strcmp( key, "command_line" ) ) {
- /* adding 2>&1 breaks the exec/popen check and everything will be checked by popen */
- /*
- snprintf(command, sizeof(command)+5, "%s 2>&1", value);
- exec_job->command_line = strdup(command);
- */
exec_job->command_line = strdup(value);
+ valid_lines++;
}
}
@@@ -225,7 -232,14 +241,14 @@@
write_debug_file(&decrypted_orig);
#endif
- do_exec_job();
+ if(valid_lines == 0) {
+ gm_log( GM_LOG_ERROR, "discarded invalid job, check your encryption settings\n" );
+ } else {
+ do_exec_job();
+ }
+
+ /* send finish signal to parent */
+ set_state(GM_JOB_END);
/* start listening to SIGTERMs */
sigprocmask(SIG_SETMASK, &old_mask, NULL);
@@@ -234,16 -248,9 +257,9 @@@
free(decrypted_data_c);
free_job(exec_job);
- /* send finish signal to parent */
- send_state_to_parent(GM_JOB_END);
-
- if(jobs_done >= mod_gm_opt->max_jobs) {
+ if(mod_gm_opt->max_jobs > 0 && jobs_done >= mod_gm_opt->max_jobs) {
gm_log( GM_LOG_TRACE, "jobs done: %i -> exiting...\n", jobs_done );
- gearman_worker_unregister_all(&worker);
- gearman_job_free_all( &worker );
- gearman_client_free( &client );
- if( mod_gm_opt->dupserver_num ) gearman_client_free( &client_dup );
- mod_gm_free_opt(mod_gm_opt);
+ clean_worker_exit(0);
exit( EXIT_SUCCESS );
}
@@@ -261,11 -268,11 +277,11 @@@ void do_exec_job( )
gm_log( GM_LOG_TRACE, "do_exec_job()\n" );
if(exec_job->type == NULL) {
- gm_log( GM_LOG_ERROR, "discarded invalid job\n" );
+ gm_log( GM_LOG_ERROR, "discarded invalid job, no type given\n" );
return;
}
if(exec_job->command_line == NULL) {
- gm_log( GM_LOG_ERROR, "discarded invalid job\n" );
+ gm_log( GM_LOG_ERROR, "discarded invalid job, no command line given\n" );
return;
}
@@@ -517,29 -524,6 +533,29 @@@ void send_result_back()
gm_log( GM_LOG_TRACE, "send_result_back() finished unsuccessfully\n" );
}
+ if( mod_gm_opt->dupserver_num ) {
+ strncpy(temp_buffer2, "type=passive\n", (sizeof(temp_buffer1)-2));
+ strncat(temp_buffer2, temp_buffer1, (sizeof(temp_buffer2)-2));
+ temp_buffer2[sizeof( temp_buffer2 )-1]='\x0';
+ if( add_job_to_queue( &client_dup,
+ mod_gm_opt->dupserver_list,
+ exec_job->result_queue,
+ NULL,
+ temp_buffer2,
+ GM_JOB_PRIO_NORMAL,
+ GM_DEFAULT_JOB_RETRIES,
+ mod_gm_opt->transportmode
+ ) == GM_OK) {
+ gm_log( GM_LOG_TRACE, "send_result_back() finished successfully for duplicate server.\n" );
+ }
+ else {
+ gm_log( GM_LOG_TRACE, "send_result_back() finished unsuccessfully for duplicate server\n" );
+ }
+ }
+ else {
+ gm_log( GM_LOG_TRACE, "send_result_back() has no duplicate servers to send to.\n" );
+ }
+
return;
}
@@@ -592,6 -576,14 +608,14 @@@ int set_worker( gearman_worker_st *w )
}
+ /* called when worker runs into idle timeout */
+ void idle_sighandler(int sig) {
+ gm_log( GM_LOG_TRACE, "idle_sighandler(%i)\n", sig );
+ clean_worker_exit(0);
+ exit( EXIT_SUCCESS );
+ }
+
+
/* called when check runs into timeout */
void alarm_sighandler(int sig) {
pid_t pid = getpid();
@@@ -607,66 -599,73 +631,73 @@@
kill(-pid, SIGKILL);
if(worker_run_mode != GM_WORKER_STANDALONE)
+ clean_worker_exit(0);
exit(EXIT_SUCCESS);
return;
}
/* tell parent our state */
- void send_state_to_parent(int status) {
- int shmid;
+ void set_state(int status) {
int *shm;
- gm_log( GM_LOG_TRACE, "send_state_to_parent(%d)\n", status );
-
- /* Locate the segment */
- if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
- perror("shmget");
- gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
- exit( EXIT_FAILURE );
- }
+ gm_log( GM_LOG_TRACE, "set_state(%d)\n", status );
/* Now we attach the segment to our data space. */
if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
perror("shmat");
gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
+ clean_worker_exit(0);
exit( EXIT_FAILURE );
}
- /* set our counter */
if(status == GM_JOB_START)
- shm[0]++;
+ shm[shm_index] = current_pid;
if(status == GM_JOB_END) {
- shm[0]--;
- shm[2]++; /* increase jobs done */
+ shm[0]++; /* increase jobs done */
+ /* pid in our status slot changed, this should not happen -> exit */
+ if( shm[shm_index] != current_pid && shm[shm_index] != -current_pid ) {
+ gm_log( GM_LOG_ERROR, "double used worker slot: %d != %d\n", current_pid, shm[shm_index] );
+ clean_worker_exit(0);
+ exit( EXIT_FAILURE );
+ }
+ shm[shm_index] = -current_child_pid;
}
/* detach from shared memory */
if(shmdt(shm) < 0)
perror("shmdt");
- if(worker_run_mode != GM_WORKER_MULTI)
- return;
-
- /* wake up parent */
- kill(getppid(), SIGUSR1);
-
return;
}
/* do a clean exit */
void clean_worker_exit(int sig) {
- int shmid;
+ int *shm;
gm_log( GM_LOG_TRACE, "clean_worker_exit(%d)\n", sig);
+ gm_log( GM_LOG_TRACE, "cleaning worker\n");
gearman_worker_unregister_all(&worker);
gearman_job_free_all( &worker );
+ gm_log( GM_LOG_TRACE, "cleaning client\n");
gearman_client_free( &client );
-
mod_gm_free_opt(mod_gm_opt);
+ /* Now we attach the segment to our data space. */
+ if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
+ perror("shmat");
+ gm_log( GM_LOG_TRACE, "worker finished: %d\n", getpid() );
+ clean_worker_exit(0);
+ exit( EXIT_FAILURE );
+ }
+ shm[shm_index] = -1;
+
+ /* detach from shared memory */
+ if(shmdt(shm) < 0)
+ perror("shmdt");
+
if(worker_run_mode != GM_WORKER_STANDALONE)
exit( EXIT_SUCCESS );
@@@ -674,9 -673,6 +705,6 @@@
* clean up shared memory
* will be removed when last client detaches
*/
- if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
- perror("shmget");
- }
if( shmctl( shmid, IPC_RMID, 0 ) == -1 ) {
perror("shmctl");
}
@@@ -689,7 -685,6 +717,6 @@@
void *return_status( gearman_job_st *job, void *context, size_t *result_size, gearman_return_t *ret_ptr ) {
int wsize;
char workload[GM_BUFFERSIZE];
- int shmid;
int *shm;
char * result;
@@@ -712,13 -707,6 +739,6 @@@
result = malloc(GM_BUFFERSIZE);
*result_size = GM_BUFFERSIZE;
- /* Locate the segment */
- if ((shmid = shmget(mod_gm_shm_key, GM_SHM_SIZE, 0600)) < 0) {
- perror("shmget");
- *result_size = 0;
- return NULL;
- }
-
/* Now we attach the segment to our data space. */
if ((shm = shmat(shmid, NULL, 0)) == (int *) -1) {
perror("shmat");
@@@ -726,7 -714,7 +746,7 @@@
return NULL;
}
- snprintf(result, GM_BUFFERSIZE, "%s has %i worker and is working on %i jobs. Version: %s|worker=%i jobs=%ic", hostname, shm[1], shm[0], GM_VERSION, shm[1], shm[2] );
+ snprintf(result, GM_BUFFERSIZE, "%s has %i worker and is working on %i jobs. Version: %s|worker=%i jobs=%ic", hostname, shm[1], shm[2], GM_VERSION, shm[1], shm[0] );
/* detach from shared memory */
if(shmdt(shm) < 0)
--
Debian packaging for mod gearman.
More information about the Pkg-nagios-changes
mailing list