pacemaker  1.1.24-3850484742
Scalable High-Availability cluster resource manager
mainloop.c
Go to the documentation of this file.
1 /*
2  * Copyright (C) 2004 Andrew Beekhof <andrew@beekhof.net>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Lesser General Public
6  * License as published by the Free Software Foundation; either
7  * version 2.1 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12  * Lesser General Public License for more details.
13  *
14  * You should have received a copy of the GNU Lesser General Public
15  * License along with this library; if not, write to the Free Software
16  * Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
17  */
18 
19 #include <crm_internal.h>
20 
21 #ifndef _GNU_SOURCE
22 # define _GNU_SOURCE
23 #endif
24 
25 #include <stdlib.h>
26 #include <signal.h>
27 #include <errno.h>
28 
29 #include <sys/wait.h>
30 
31 #include <crm/crm.h>
32 #include <crm/common/xml.h>
33 #include <crm/common/mainloop.h>
34 #include <crm/common/ipcs.h>
35 
36 #include <qb/qbarray.h>
37 
38 struct mainloop_child_s {
39  pid_t pid;
40  char *desc;
41  unsigned timerid;
42  unsigned watchid;
43  gboolean timeout;
44  void *privatedata;
45 
47 
48  /* Called when a process dies */
49  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode);
50 };
51 
52 struct trigger_s {
53  GSource source;
54  gboolean running;
55  gboolean trigger;
56  void *user_data;
57  guint id;
58 
59 };
60 
61 static gboolean
62 crm_trigger_prepare(GSource * source, gint * timeout)
63 {
64  crm_trigger_t *trig = (crm_trigger_t *) source;
65 
66  /* cluster-glue's FD and IPC related sources make use of
67  * g_source_add_poll() but do not set a timeout in their prepare
68  * functions
69  *
70  * This means mainloop's poll() will block until an event for one
71  * of these sources occurs - any /other/ type of source, such as
72  * this one or g_idle_*, that doesn't use g_source_add_poll() is
73  * S-O-L and won't be processed until there is something fd-based
74  * happens.
75  *
76  * Luckily the timeout we can set here affects all sources and
77  * puts an upper limit on how long poll() can take.
78  *
79  * So unconditionally set a small-ish timeout, not too small that
80  * we're in constant motion, which will act as an upper bound on
81  * how long the signal handling might be delayed for.
82  */
83  *timeout = 500; /* Timeout in ms */
84 
85  return trig->trigger;
86 }
87 
88 static gboolean
89 crm_trigger_check(GSource * source)
90 {
91  crm_trigger_t *trig = (crm_trigger_t *) source;
92 
93  return trig->trigger;
94 }
95 
96 static gboolean
97 crm_trigger_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
98 {
99  int rc = TRUE;
100  crm_trigger_t *trig = (crm_trigger_t *) source;
101 
102  if (trig->running) {
103  /* Wait until the existing job is complete before starting the next one */
104  return TRUE;
105  }
106  trig->trigger = FALSE;
107 
108  if (callback) {
109  rc = callback(trig->user_data);
110  if (rc < 0) {
111  crm_trace("Trigger handler %p not yet complete", trig);
112  trig->running = TRUE;
113  rc = TRUE;
114  }
115  }
116  return rc;
117 }
118 
119 static void
120 crm_trigger_finalize(GSource * source)
121 {
122  crm_trace("Trigger %p destroyed", source);
123 }
124 
125 #if 0
126 struct _GSourceCopy
127 {
128  gpointer callback_data;
129  GSourceCallbackFuncs *callback_funcs;
130 
131  const GSourceFuncs *source_funcs;
132  guint ref_count;
133 
134  GMainContext *context;
135 
136  gint priority;
137  guint flags;
138  guint source_id;
139 
140  GSList *poll_fds;
141 
142  GSource *prev;
143  GSource *next;
144 
145  char *name;
146 
147  void *priv;
148 };
149 
150 static int
151 g_source_refcount(GSource * source)
152 {
153  /* Duplicating the contents of private header files is a necessary evil */
154  if (source) {
155  struct _GSourceCopy *evil = (struct _GSourceCopy*)source;
156  return evil->ref_count;
157  }
158  return 0;
159 }
160 #else
161 static int g_source_refcount(GSource * source)
162 {
163  return 0;
164 }
165 #endif
166 
167 static GSourceFuncs crm_trigger_funcs = {
168  crm_trigger_prepare,
169  crm_trigger_check,
170  crm_trigger_dispatch,
171  crm_trigger_finalize,
172 };
173 
174 static crm_trigger_t *
175 mainloop_setup_trigger(GSource * source, int priority, int (*dispatch) (gpointer user_data),
176  gpointer userdata)
177 {
178  crm_trigger_t *trigger = NULL;
179 
180  trigger = (crm_trigger_t *) source;
181 
182  trigger->id = 0;
183  trigger->trigger = FALSE;
184  trigger->user_data = userdata;
185 
186  if (dispatch) {
187  g_source_set_callback(source, dispatch, trigger, NULL);
188  }
189 
190  g_source_set_priority(source, priority);
191  g_source_set_can_recurse(source, FALSE);
192 
193  crm_trace("Setup %p with ref-count=%u", source, g_source_refcount(source));
194  trigger->id = g_source_attach(source, NULL);
195  crm_trace("Attached %p with ref-count=%u", source, g_source_refcount(source));
196 
197  return trigger;
198 }
199 
200 void
202 {
203  crm_trace("Trigger handler %p complete", trig);
204  trig->running = FALSE;
205 }
206 
207 /* If dispatch returns:
208  * -1: Job running but not complete
209  * 0: Remove the trigger from mainloop
210  * 1: Leave the trigger in mainloop
211  */
213 mainloop_add_trigger(int priority, int (*dispatch) (gpointer user_data), gpointer userdata)
214 {
215  GSource *source = NULL;
216 
217  CRM_ASSERT(sizeof(crm_trigger_t) > sizeof(GSource));
218  source = g_source_new(&crm_trigger_funcs, sizeof(crm_trigger_t));
219  CRM_ASSERT(source != NULL);
220 
221  return mainloop_setup_trigger(source, priority, dispatch, userdata);
222 }
223 
224 void
226 {
227  if(source) {
228  source->trigger = TRUE;
229  }
230 }
231 
232 gboolean
234 {
235  GSource *gs = NULL;
236 
237  if(source == NULL) {
238  return TRUE;
239  }
240 
241  gs = (GSource *)source;
242 
243  if(g_source_refcount(gs) > 2) {
244  crm_info("Trigger %p is still referenced %u times", gs, g_source_refcount(gs));
245  }
246 
247  g_source_destroy(gs); /* Remove from mainloop, ref_count-- */
248  g_source_unref(gs); /* The caller no longer carries a reference to source
249  *
250  * At this point the source should be free'd,
251  * unless we're currently processing said
252  * source, in which case mainloop holds an
253  * additional reference and it will be free'd
254  * once our processing completes
255  */
256  return TRUE;
257 }
258 
259 typedef struct signal_s {
260  crm_trigger_t trigger; /* must be first */
261  void (*handler) (int sig);
262  int signal;
263 
264 } crm_signal_t;
265 
266 static crm_signal_t *crm_signals[NSIG];
267 
268 static gboolean
269 crm_signal_dispatch(GSource * source, GSourceFunc callback, gpointer userdata)
270 {
271  crm_signal_t *sig = (crm_signal_t *) source;
272 
273  if(sig->signal != SIGCHLD) {
274  crm_notice("Caught '%s' signal "CRM_XS" %d (%s handler)",
275  strsignal(sig->signal), sig->signal,
276  (sig->handler? "invoking" : "no"));
277  }
278 
279  sig->trigger.trigger = FALSE;
280  if (sig->handler) {
281  sig->handler(sig->signal);
282  }
283  return TRUE;
284 }
285 
286 static void
287 mainloop_signal_handler(int sig)
288 {
289  if (sig > 0 && sig < NSIG && crm_signals[sig] != NULL) {
290  mainloop_set_trigger((crm_trigger_t *) crm_signals[sig]);
291  }
292 }
293 
294 static GSourceFuncs crm_signal_funcs = {
295  crm_trigger_prepare,
296  crm_trigger_check,
297  crm_signal_dispatch,
298  crm_trigger_finalize,
299 };
300 
301 gboolean
302 crm_signal(int sig, void (*dispatch) (int sig))
303 {
304  sigset_t mask;
305  struct sigaction sa;
306  struct sigaction old;
307 
308  if (sigemptyset(&mask) < 0) {
309  crm_perror(LOG_ERR, "Call to sigemptyset failed");
310  return FALSE;
311  }
312 
313  memset(&sa, 0, sizeof(struct sigaction));
314  sa.sa_handler = dispatch;
315  sa.sa_flags = SA_RESTART;
316  sa.sa_mask = mask;
317 
318  if (sigaction(sig, &sa, &old) < 0) {
319  crm_perror(LOG_ERR, "Could not install signal handler for signal %d", sig);
320  return FALSE;
321  }
322 
323  return TRUE;
324 }
325 
326 gboolean
327 mainloop_add_signal(int sig, void (*dispatch) (int sig))
328 {
329  GSource *source = NULL;
330  int priority = G_PRIORITY_HIGH - 1;
331 
332  if (sig == SIGTERM) {
333  /* TERM is higher priority than other signals,
334  * signals are higher priority than other ipc.
335  * Yes, minus: smaller is "higher"
336  */
337  priority--;
338  }
339 
340  if (sig >= NSIG || sig < 0) {
341  crm_err("Signal %d is out of range", sig);
342  return FALSE;
343 
344  } else if (crm_signals[sig] != NULL && crm_signals[sig]->handler == dispatch) {
345  crm_trace("Signal handler for %d is already installed", sig);
346  return TRUE;
347 
348  } else if (crm_signals[sig] != NULL) {
349  crm_err("Different signal handler for %d is already installed", sig);
350  return FALSE;
351  }
352 
353  CRM_ASSERT(sizeof(crm_signal_t) > sizeof(GSource));
354  source = g_source_new(&crm_signal_funcs, sizeof(crm_signal_t));
355 
356  crm_signals[sig] = (crm_signal_t *) mainloop_setup_trigger(source, priority, NULL, NULL);
357  CRM_ASSERT(crm_signals[sig] != NULL);
358 
359  crm_signals[sig]->handler = dispatch;
360  crm_signals[sig]->signal = sig;
361 
362  if (crm_signal(sig, mainloop_signal_handler) == FALSE) {
363  crm_signal_t *tmp = crm_signals[sig];
364 
365  crm_signals[sig] = NULL;
366 
368  return FALSE;
369  }
370 #if 0
371  /* If we want signals to interrupt mainloop's poll(), instead of waiting for
372  * the timeout, then we should call siginterrupt() below
373  *
374  * For now, just enforce a low timeout
375  */
376  if (siginterrupt(sig, 1) < 0) {
377  crm_perror(LOG_INFO, "Could not enable system call interruptions for signal %d", sig);
378  }
379 #endif
380 
381  return TRUE;
382 }
383 
384 gboolean
386 {
387  crm_signal_t *tmp = NULL;
388 
389  if (sig >= NSIG || sig < 0) {
390  crm_err("Signal %d is out of range", sig);
391  return FALSE;
392 
393  } else if (crm_signal(sig, NULL) == FALSE) {
394  crm_perror(LOG_ERR, "Could not uninstall signal handler for signal %d", sig);
395  return FALSE;
396 
397  } else if (crm_signals[sig] == NULL) {
398  return TRUE;
399  }
400 
401  crm_trace("Destroying signal %d", sig);
402  tmp = crm_signals[sig];
403  crm_signals[sig] = NULL;
405  return TRUE;
406 }
407 
408 static qb_array_t *gio_map = NULL;
409 
410 void
412 {
413  if(gio_map) {
414  qb_array_free(gio_map);
415  }
416 }
417 
418 /*
419  * libqb...
420  */
421 struct gio_to_qb_poll {
422  int32_t is_used;
423  guint source;
424  int32_t events;
425  void *data;
426  qb_ipcs_dispatch_fn_t fn;
427  enum qb_loop_priority p;
428 };
429 
430 static gboolean
431 gio_read_socket(GIOChannel * gio, GIOCondition condition, gpointer data)
432 {
433  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
434  gint fd = g_io_channel_unix_get_fd(gio);
435 
436  crm_trace("%p.%d %d", data, fd, condition);
437 
438  /* if this assert get's hit, then there is a race condition between
439  * when we destroy a fd and when mainloop actually gives it up */
440  CRM_ASSERT(adaptor->is_used > 0);
441 
442  return (adaptor->fn(fd, condition, adaptor->data) == 0);
443 }
444 
445 static void
446 gio_poll_destroy(gpointer data)
447 {
448  struct gio_to_qb_poll *adaptor = (struct gio_to_qb_poll *)data;
449 
450  adaptor->is_used--;
451  CRM_ASSERT(adaptor->is_used >= 0);
452 
453  if (adaptor->is_used == 0) {
454  crm_trace("Marking adaptor %p unused", adaptor);
455  adaptor->source = 0;
456  }
457 }
458 
467 static gint
468 conv_prio_libqb2glib(enum qb_loop_priority prio)
469 {
470  gint ret = G_PRIORITY_DEFAULT;
471  switch (prio) {
472  case QB_LOOP_LOW:
473  ret = G_PRIORITY_LOW;
474  break;
475  case QB_LOOP_HIGH:
476  ret = G_PRIORITY_HIGH;
477  break;
478  default:
479  crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
480  prio);
481  /* fall-through */
482  case QB_LOOP_MED:
483  break;
484  }
485  return ret;
486 }
487 
496 static enum qb_ipcs_rate_limit
497 conv_libqb_prio2ratelimit(enum qb_loop_priority prio)
498 {
499  /* this is an inversion of what libqb's qb_ipcs_request_rate_limit does */
500  enum qb_ipcs_rate_limit ret = QB_IPCS_RATE_NORMAL;
501  switch (prio) {
502  case QB_LOOP_LOW:
503  ret = QB_IPCS_RATE_SLOW;
504  break;
505  case QB_LOOP_HIGH:
506  ret = QB_IPCS_RATE_FAST;
507  break;
508  default:
509  crm_trace("Invalid libqb's loop priority %d, assuming QB_LOOP_MED",
510  prio);
511  /* fall-through */
512  case QB_LOOP_MED:
513  break;
514  }
515  return ret;
516 }
517 
518 static int32_t
519 gio_poll_dispatch_update(enum qb_loop_priority p, int32_t fd, int32_t evts,
520  void *data, qb_ipcs_dispatch_fn_t fn, int32_t add)
521 {
522  struct gio_to_qb_poll *adaptor;
523  GIOChannel *channel;
524  int32_t res = 0;
525 
526  res = qb_array_index(gio_map, fd, (void **)&adaptor);
527  if (res < 0) {
528  crm_err("Array lookup failed for fd=%d: %d", fd, res);
529  return res;
530  }
531 
532  crm_trace("Adding fd=%d to mainloop as adaptor %p", fd, adaptor);
533 
534  if (add && adaptor->source) {
535  crm_err("Adaptor for descriptor %d is still in-use", fd);
536  return -EEXIST;
537  }
538  if (!add && !adaptor->is_used) {
539  crm_err("Adaptor for descriptor %d is not in-use", fd);
540  return -ENOENT;
541  }
542 
543  /* channel is created with ref_count = 1 */
544  channel = g_io_channel_unix_new(fd);
545  if (!channel) {
546  crm_err("No memory left to add fd=%d", fd);
547  return -ENOMEM;
548  }
549 
550  if (adaptor->source) {
551  g_source_remove(adaptor->source);
552  adaptor->source = 0;
553  }
554 
555  /* Because unlike the poll() API, glib doesn't tell us about HUPs by default */
556  evts |= (G_IO_HUP | G_IO_NVAL | G_IO_ERR);
557 
558  adaptor->fn = fn;
559  adaptor->events = evts;
560  adaptor->data = data;
561  adaptor->p = p;
562  adaptor->is_used++;
563  adaptor->source =
564  g_io_add_watch_full(channel, conv_prio_libqb2glib(p), evts,
565  gio_read_socket, adaptor, gio_poll_destroy);
566 
567  /* Now that mainloop now holds a reference to channel,
568  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
569  *
570  * This means that channel will be free'd by:
571  * g_main_context_dispatch()
572  * -> g_source_destroy_internal()
573  * -> g_source_callback_unref()
574  * shortly after gio_poll_destroy() completes
575  */
576  g_io_channel_unref(channel);
577 
578  crm_trace("Added to mainloop with gsource id=%d", adaptor->source);
579  if (adaptor->source > 0) {
580  return 0;
581  }
582 
583  return -EINVAL;
584 }
585 
586 static int32_t
587 gio_poll_dispatch_add(enum qb_loop_priority p, int32_t fd, int32_t evts,
588  void *data, qb_ipcs_dispatch_fn_t fn)
589 {
590  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_TRUE);
591 }
592 
593 static int32_t
594 gio_poll_dispatch_mod(enum qb_loop_priority p, int32_t fd, int32_t evts,
595  void *data, qb_ipcs_dispatch_fn_t fn)
596 {
597  return gio_poll_dispatch_update(p, fd, evts, data, fn, QB_FALSE);
598 }
599 
600 static int32_t
601 gio_poll_dispatch_del(int32_t fd)
602 {
603  struct gio_to_qb_poll *adaptor;
604 
605  crm_trace("Looking for fd=%d", fd);
606  if (qb_array_index(gio_map, fd, (void **)&adaptor) == 0) {
607  if (adaptor->source) {
608  g_source_remove(adaptor->source);
609  adaptor->source = 0;
610  }
611  }
612  return 0;
613 }
614 
615 struct qb_ipcs_poll_handlers gio_poll_funcs = {
616  .job_add = NULL,
617  .dispatch_add = gio_poll_dispatch_add,
618  .dispatch_mod = gio_poll_dispatch_mod,
619  .dispatch_del = gio_poll_dispatch_del,
620 };
621 
622 static enum qb_ipc_type
623 pick_ipc_type(enum qb_ipc_type requested)
624 {
625  const char *env = getenv("PCMK_ipc_type");
626 
627  if (env && strcmp("shared-mem", env) == 0) {
628  return QB_IPC_SHM;
629  } else if (env && strcmp("socket", env) == 0) {
630  return QB_IPC_SOCKET;
631  } else if (env && strcmp("posix", env) == 0) {
632  return QB_IPC_POSIX_MQ;
633  } else if (env && strcmp("sysv", env) == 0) {
634  return QB_IPC_SYSV_MQ;
635  } else if (requested == QB_IPC_NATIVE) {
636  /* We prefer shared memory because the server never blocks on
637  * send. If part of a message fits into the socket, libqb
638  * needs to block until the remainder can be sent also.
639  * Otherwise the client will wait forever for the remaining
640  * bytes.
641  */
642  return QB_IPC_SHM;
643  }
644  return requested;
645 }
646 
647 qb_ipcs_service_t *
648 mainloop_add_ipc_server(const char *name, enum qb_ipc_type type,
649  struct qb_ipcs_service_handlers *callbacks)
650 {
651  return mainloop_add_ipc_server_with_prio(name, type, callbacks, QB_LOOP_MED);
652 }
653 
654 qb_ipcs_service_t *
655 mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type,
656  struct qb_ipcs_service_handlers *callbacks,
657  enum qb_loop_priority prio)
658 {
659  int rc = 0;
660  qb_ipcs_service_t *server = NULL;
661 
662  if (gio_map == NULL) {
663  gio_map = qb_array_create_2(64, sizeof(struct gio_to_qb_poll), 1);
664  }
665 
666  crm_client_init();
667  server = qb_ipcs_create(name, 0, pick_ipc_type(type), callbacks);
668 
669  if (server == NULL) {
670  crm_err("Could not create %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
671  return NULL;
672  }
673 
674  if (prio != QB_LOOP_MED) {
675  qb_ipcs_request_rate_limit(server, conv_libqb_prio2ratelimit(prio));
676  }
677 
678 #ifdef HAVE_IPCS_GET_BUFFER_SIZE
679  /* All clients should use at least ipc_buffer_max as their buffer size */
680  qb_ipcs_enforce_buffer_size(server, crm_ipc_default_buffer_size());
681 #endif
682 
683  qb_ipcs_poll_handlers_set(server, &gio_poll_funcs);
684 
685  rc = qb_ipcs_run(server);
686  if (rc < 0) {
687  crm_err("Could not start %s IPC server: %s (%d)", name, pcmk_strerror(rc), rc);
688  return NULL;
689  }
690 
691  return server;
692 }
693 
694 void
695 mainloop_del_ipc_server(qb_ipcs_service_t * server)
696 {
697  if (server) {
698  qb_ipcs_destroy(server);
699  }
700 }
701 
702 struct mainloop_io_s {
703  char *name;
704  void *userdata;
705 
706  int fd;
707  guint source;
708  crm_ipc_t *ipc;
709  GIOChannel *channel;
710 
711  int (*dispatch_fn_ipc) (const char *buffer, ssize_t length, gpointer userdata);
712  int (*dispatch_fn_io) (gpointer userdata);
713  void (*destroy_fn) (gpointer userdata);
714 
715 };
716 
717 static gboolean
718 mainloop_gio_callback(GIOChannel * gio, GIOCondition condition, gpointer data)
719 {
720  gboolean keep = TRUE;
721  mainloop_io_t *client = data;
722 
723  CRM_ASSERT(client->fd == g_io_channel_unix_get_fd(gio));
724 
725  if (condition & G_IO_IN) {
726  if (client->ipc) {
727  long rc = 0;
728  int max = 10;
729 
730  do {
731  rc = crm_ipc_read(client->ipc);
732  if (rc <= 0) {
733  crm_trace("Message acquisition from %s[%p] failed: %s (%ld)",
734  client->name, client, pcmk_strerror(rc), rc);
735 
736  } else if (client->dispatch_fn_ipc) {
737  const char *buffer = crm_ipc_buffer(client->ipc);
738 
739  crm_trace("New message from %s[%p] = %ld (I/O condition=%d)", client->name, client, rc, condition);
740  if (client->dispatch_fn_ipc(buffer, rc, client->userdata) < 0) {
741  crm_trace("Connection to %s no longer required", client->name);
742  keep = FALSE;
743  }
744  }
745 
746  } while (keep && rc > 0 && --max > 0);
747 
748  } else {
749  crm_trace("New message from %s[%p] %u", client->name, client, condition);
750  if (client->dispatch_fn_io) {
751  if (client->dispatch_fn_io(client->userdata) < 0) {
752  crm_trace("Connection to %s no longer required", client->name);
753  keep = FALSE;
754  }
755  }
756  }
757  }
758 
759  if (client->ipc && crm_ipc_connected(client->ipc) == FALSE) {
760  crm_err("Connection to %s[%p] closed (I/O condition=%d)", client->name, client, condition);
761  keep = FALSE;
762 
763  } else if (condition & (G_IO_HUP | G_IO_NVAL | G_IO_ERR)) {
764  crm_trace("The connection %s[%p] has been closed (I/O condition=%d)",
765  client->name, client, condition);
766  keep = FALSE;
767 
768  } else if ((condition & G_IO_IN) == 0) {
769  /*
770  #define GLIB_SYSDEF_POLLIN =1
771  #define GLIB_SYSDEF_POLLPRI =2
772  #define GLIB_SYSDEF_POLLOUT =4
773  #define GLIB_SYSDEF_POLLERR =8
774  #define GLIB_SYSDEF_POLLHUP =16
775  #define GLIB_SYSDEF_POLLNVAL =32
776 
777  typedef enum
778  {
779  G_IO_IN GLIB_SYSDEF_POLLIN,
780  G_IO_OUT GLIB_SYSDEF_POLLOUT,
781  G_IO_PRI GLIB_SYSDEF_POLLPRI,
782  G_IO_ERR GLIB_SYSDEF_POLLERR,
783  G_IO_HUP GLIB_SYSDEF_POLLHUP,
784  G_IO_NVAL GLIB_SYSDEF_POLLNVAL
785  } GIOCondition;
786 
787  A bitwise combination representing a condition to watch for on an event source.
788 
789  G_IO_IN There is data to read.
790  G_IO_OUT Data can be written (without blocking).
791  G_IO_PRI There is urgent data to read.
792  G_IO_ERR Error condition.
793  G_IO_HUP Hung up (the connection has been broken, usually for pipes and sockets).
794  G_IO_NVAL Invalid request. The file descriptor is not open.
795  */
796  crm_err("Strange condition: %d", condition);
797  }
798 
799  /* keep == FALSE results in mainloop_gio_destroy() being called
800  * just before the source is removed from mainloop
801  */
802  return keep;
803 }
804 
805 static void
806 mainloop_gio_destroy(gpointer c)
807 {
808  mainloop_io_t *client = c;
809  char *c_name = strdup(client->name);
810 
811  /* client->source is valid but about to be destroyed (ref_count == 0) in gmain.c
812  * client->channel will still have ref_count > 0... should be == 1
813  */
814  crm_trace("Destroying client %s[%p]", c_name, c);
815 
816  if (client->ipc) {
817  crm_ipc_close(client->ipc);
818  }
819 
820  if (client->destroy_fn) {
821  void (*destroy_fn) (gpointer userdata) = client->destroy_fn;
822 
823  client->destroy_fn = NULL;
824  destroy_fn(client->userdata);
825  }
826 
827  if (client->ipc) {
828  crm_ipc_t *ipc = client->ipc;
829 
830  client->ipc = NULL;
831  crm_ipc_destroy(ipc);
832  }
833 
834  crm_trace("Destroyed client %s[%p]", c_name, c);
835 
836  free(client->name); client->name = NULL;
837  free(client);
838 
839  free(c_name);
840 }
841 
843 mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata,
844  struct ipc_client_callbacks *callbacks)
845 {
846  mainloop_io_t *client = NULL;
847  crm_ipc_t *conn = crm_ipc_new(name, max_size);
848 
849  if (conn && crm_ipc_connect(conn)) {
850  int32_t fd = crm_ipc_get_fd(conn);
851 
852  client = mainloop_add_fd(name, priority, fd, userdata, NULL);
853  }
854 
855  if (client == NULL) {
856  crm_perror(LOG_TRACE, "Connection to %s failed", name);
857  if (conn) {
858  crm_ipc_close(conn);
859  crm_ipc_destroy(conn);
860  }
861  return NULL;
862  }
863 
864  client->ipc = conn;
865  client->destroy_fn = callbacks->destroy;
866  client->dispatch_fn_ipc = callbacks->dispatch;
867  return client;
868 }
869 
870 void
872 {
873  mainloop_del_fd(client);
874 }
875 
876 crm_ipc_t *
878 {
879  if (client) {
880  return client->ipc;
881  }
882  return NULL;
883 }
884 
886 mainloop_add_fd(const char *name, int priority, int fd, void *userdata,
887  struct mainloop_fd_callbacks * callbacks)
888 {
889  mainloop_io_t *client = NULL;
890 
891  if (fd >= 0) {
892  client = calloc(1, sizeof(mainloop_io_t));
893  if (client == NULL) {
894  return NULL;
895  }
896  client->name = strdup(name);
897  client->userdata = userdata;
898 
899  if (callbacks) {
900  client->destroy_fn = callbacks->destroy;
901  client->dispatch_fn_io = callbacks->dispatch;
902  }
903 
904  client->fd = fd;
905  client->channel = g_io_channel_unix_new(fd);
906  client->source =
907  g_io_add_watch_full(client->channel, priority,
908  (G_IO_IN | G_IO_HUP | G_IO_NVAL | G_IO_ERR), mainloop_gio_callback,
909  client, mainloop_gio_destroy);
910 
911  /* Now that mainloop now holds a reference to channel,
912  * thanks to g_io_add_watch_full(), drop ours from g_io_channel_unix_new().
913  *
914  * This means that channel will be free'd by:
915  * g_main_context_dispatch() or g_source_remove()
916  * -> g_source_destroy_internal()
917  * -> g_source_callback_unref()
918  * shortly after mainloop_gio_destroy() completes
919  */
920  g_io_channel_unref(client->channel);
921  crm_trace("Added connection %d for %s[%p].%d", client->source, client->name, client, fd);
922  } else {
923  errno = EINVAL;
924  }
925 
926  return client;
927 }
928 
929 void
931 {
932  if (client != NULL) {
933  crm_trace("Removing client %s[%p]", client->name, client);
934  if (client->source) {
935  /* Results in mainloop_gio_destroy() being called just
936  * before the source is removed from mainloop
937  */
938  g_source_remove(client->source);
939  }
940  }
941 }
942 
943 static GListPtr child_list = NULL;
944 
945 pid_t
947 {
948  return child->pid;
949 }
950 
951 const char *
953 {
954  return child->desc;
955 }
956 
957 int
959 {
960  return child->timeout;
961 }
962 
963 void *
965 {
966  return child->privatedata;
967 }
968 
969 void
971 {
972  child->privatedata = NULL;
973 }
974 
975 /* good function name */
976 static void
977 child_free(mainloop_child_t *child)
978 {
979  if (child->timerid != 0) {
980  crm_trace("Removing timer %d", child->timerid);
981  g_source_remove(child->timerid);
982  child->timerid = 0;
983  }
984  free(child->desc);
985  free(child);
986 }
987 
988 /* terrible function name */
989 static int
990 child_kill_helper(mainloop_child_t *child)
991 {
992  int rc;
993  if (child->flags & mainloop_leave_pid_group) {
994  crm_debug("Kill pid %d only. leave group intact.", child->pid);
995  rc = kill(child->pid, SIGKILL);
996  } else {
997  crm_debug("Kill pid %d's group", child->pid);
998  rc = kill(-child->pid, SIGKILL);
999  }
1000 
1001  if (rc < 0) {
1002  if (errno != ESRCH) {
1003  crm_perror(LOG_ERR, "kill(%d, KILL) failed", child->pid);
1004  }
1005  return -errno;
1006  }
1007  return 0;
1008 }
1009 
1010 static gboolean
1011 child_timeout_callback(gpointer p)
1012 {
1013  mainloop_child_t *child = p;
1014  int rc = 0;
1015 
1016  child->timerid = 0;
1017  if (child->timeout) {
1018  crm_crit("%s process (PID %d) will not die!", child->desc, (int)child->pid);
1019  return FALSE;
1020  }
1021 
1022  rc = child_kill_helper(child);
1023  if (rc == -ESRCH) {
1024  /* Nothing left to do. pid doesn't exist */
1025  return FALSE;
1026  }
1027 
1028  child->timeout = TRUE;
1029  crm_warn("%s process (PID %d) timed out", child->desc, (int)child->pid);
1030 
1031  child->timerid = g_timeout_add(5000, child_timeout_callback, child);
1032  return FALSE;
1033 }
1034 
1035 static gboolean
1036 child_waitpid(mainloop_child_t *child, int flags)
1037 {
1038  int rc = 0;
1039  int core = 0;
1040  int signo = 0;
1041  int status = 0;
1042  int exitcode = 0;
1043 
1044  rc = waitpid(child->pid, &status, flags);
1045  if(rc == 0) {
1046  crm_perror(LOG_DEBUG, "wait(%d) = %d", child->pid, rc);
1047  return FALSE;
1048 
1049  } else if(rc != child->pid) {
1050  signo = SIGCHLD;
1051  exitcode = 1;
1052  status = 1;
1053  crm_perror(LOG_ERR, "Call to waitpid(%d) failed", child->pid);
1054 
1055  } else {
1056  crm_trace("Managed process %d exited: %p", child->pid, child);
1057 
1058  if (WIFEXITED(status)) {
1059  exitcode = WEXITSTATUS(status);
1060  crm_trace("Managed process %d (%s) exited with rc=%d", child->pid, child->desc, exitcode);
1061 
1062  } else if (WIFSIGNALED(status)) {
1063  signo = WTERMSIG(status);
1064  crm_trace("Managed process %d (%s) exited with signal=%d", child->pid, child->desc, signo);
1065  }
1066 #ifdef WCOREDUMP
1067  if (WCOREDUMP(status)) {
1068  core = 1;
1069  crm_err("Managed process %d (%s) dumped core", child->pid, child->desc);
1070  }
1071 #endif
1072  }
1073 
1074  if (child->callback) {
1075  child->callback(child, child->pid, core, signo, exitcode);
1076  }
1077  return TRUE;
1078 }
1079 
1080 static void
1081 child_death_dispatch(int signal)
1082 {
1083  GListPtr iter = child_list;
1084  gboolean exited;
1085 
1086  while(iter) {
1087  GListPtr saved = NULL;
1088  mainloop_child_t *child = iter->data;
1089  exited = child_waitpid(child, WNOHANG);
1090 
1091  saved = iter;
1092  iter = iter->next;
1093 
1094  if (exited == FALSE) {
1095  continue;
1096  }
1097  crm_trace("Removing process entry %p for %d", child, child->pid);
1098 
1099  child_list = g_list_remove_link(child_list, saved);
1100  g_list_free(saved);
1101  child_free(child);
1102  }
1103 }
1104 
1105 static gboolean
1106 child_signal_init(gpointer p)
1107 {
1108  crm_trace("Installed SIGCHLD handler");
1109  /* Do NOT use g_child_watch_add() and friends, they rely on pthreads */
1110  mainloop_add_signal(SIGCHLD, child_death_dispatch);
1111 
1112  /* In case they terminated before the signal handler was installed */
1113  child_death_dispatch(SIGCHLD);
1114  return FALSE;
1115 }
1116 
1117 int
1119 {
1120  GListPtr iter;
1121  mainloop_child_t *child = NULL;
1122  mainloop_child_t *match = NULL;
1123  /* It is impossible to block SIGKILL, this allows us to
1124  * call waitpid without WNOHANG flag.*/
1125  int waitflags = 0, rc = 0;
1126 
1127  for (iter = child_list; iter != NULL && match == NULL; iter = iter->next) {
1128  child = iter->data;
1129  if (pid == child->pid) {
1130  match = child;
1131  }
1132  }
1133 
1134  if (match == NULL) {
1135  return FALSE;
1136  }
1137 
1138  rc = child_kill_helper(match);
1139  if(rc == -ESRCH) {
1140  /* It's gone, but hasn't shown up in waitpid() yet
1141  *
1142  * Wait until we get SIGCHLD and let child_death_dispatch()
1143  * clean it up as normal (so we get the correct return
1144  * code/status)
1145  *
1146  * The blocking alternative would be to call:
1147  * child_waitpid(match, 0);
1148  */
1149  crm_trace("Waiting for child %d to be reaped by child_death_dispatch()", match->pid);
1150  return TRUE;
1151 
1152  } else if(rc != 0) {
1153  /* If KILL for some other reason set the WNOHANG flag since we
1154  * can't be certain what happened.
1155  */
1156  waitflags = WNOHANG;
1157  }
1158 
1159  if (child_waitpid(match, waitflags) == FALSE) {
1160  /* not much we can do if this occurs */
1161  return FALSE;
1162  }
1163 
1164  child_list = g_list_remove(child_list, match);
1165  child_free(match);
1166  return TRUE;
1167 }
1168 
1169 /* Create/Log a new tracked process
1170  * To track a process group, use -pid
1171  */
1172 void
1173 mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags,
1174  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1175 {
1176  static bool need_init = TRUE;
1177  mainloop_child_t *child = g_new(mainloop_child_t, 1);
1178 
1179  child->pid = pid;
1180  child->timerid = 0;
1181  child->timeout = FALSE;
1182  child->privatedata = privatedata;
1183  child->callback = callback;
1184  child->flags = flags;
1185 
1186  if(desc) {
1187  child->desc = strdup(desc);
1188  }
1189 
1190  if (timeout) {
1191  child->timerid = g_timeout_add(timeout, child_timeout_callback, child);
1192  }
1193 
1194  child_list = g_list_append(child_list, child);
1195 
1196  if(need_init) {
1197  need_init = FALSE;
1198  /* SIGCHLD processing has to be invoked from mainloop.
1199  * We do not want it to be possible to both add a child pid
1200  * to mainloop, and have the pid's exit callback invoked within
1201  * the same callstack. */
1202  g_timeout_add(1, child_signal_init, NULL);
1203  }
1204 }
1205 
1206 void
1207 mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata,
1208  void (*callback) (mainloop_child_t * p, pid_t pid, int core, int signo, int exitcode))
1209 {
1210  mainloop_child_add_with_flags(pid, timeout, desc, privatedata, 0, callback);
1211 }
1212 
1213 struct mainloop_timer_s {
1214  guint id;
1215  guint period_ms;
1216  bool repeat;
1217  char *name;
1218  GSourceFunc cb;
1219  void *userdata;
1220 };
1221 
1222 struct mainloop_timer_s mainloop;
1223 
1224 static gboolean mainloop_timer_cb(gpointer user_data)
1225 {
1226  int id = 0;
1227  bool repeat = FALSE;
1228  struct mainloop_timer_s *t = user_data;
1229 
1230  CRM_ASSERT(t != NULL);
1231 
1232  id = t->id;
1233  t->id = 0; /* Ensure it's unset during callbacks so that
1234  * mainloop_timer_running() works as expected
1235  */
1236 
1237  if(t->cb) {
1238  crm_trace("Invoking callbacks for timer %s", t->name);
1239  repeat = t->repeat;
1240  if(t->cb(t->userdata) == FALSE) {
1241  crm_trace("Timer %s complete", t->name);
1242  repeat = FALSE;
1243  }
1244  }
1245 
1246  if(repeat) {
1247  /* Restore if repeating */
1248  t->id = id;
1249  }
1250 
1251  return repeat;
1252 }
1253 
1255 {
1256  if(t && t->id != 0) {
1257  return TRUE;
1258  }
1259  return FALSE;
1260 }
1261 
1263 {
1265  if(t && t->period_ms > 0) {
1266  crm_trace("Starting timer %s", t->name);
1267  t->id = g_timeout_add(t->period_ms, mainloop_timer_cb, t);
1268  }
1269 }
1270 
1272 {
1273  if(t && t->id != 0) {
1274  crm_trace("Stopping timer %s", t->name);
1275  g_source_remove(t->id);
1276  t->id = 0;
1277  }
1278 }
1279 
1281 {
1282  guint last = 0;
1283 
1284  if(t) {
1285  last = t->period_ms;
1286  t->period_ms = period_ms;
1287  }
1288 
1289  if(t && t->id != 0 && last != t->period_ms) {
1291  }
1292  return last;
1293 }
1294 
1295 
1297 mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
1298 {
1299  mainloop_timer_t *t = calloc(1, sizeof(mainloop_timer_t));
1300 
1301  if(t) {
1302  if(name) {
1303  t->name = crm_strdup_printf("%s-%u-%d", name, period_ms, repeat);
1304  } else {
1305  t->name = crm_strdup_printf("%p-%u-%d", t, period_ms, repeat);
1306  }
1307  t->id = 0;
1308  t->period_ms = period_ms;
1309  t->repeat = repeat;
1310  t->cb = cb;
1311  t->userdata = userdata;
1312  crm_trace("Created timer %s with %p %p", t->name, userdata, t->userdata);
1313  }
1314  return t;
1315 }
1316 
1317 void
1319 {
1320  if(t) {
1321  crm_trace("Destroying timer %s", t->name);
1323  free(t->name);
1324  free(t);
1325  }
1326 }
1327 
1328 /*
1329  * Helpers to make sure certain events aren't lost at shutdown
1330  */
1331 
1332 static gboolean
1333 drain_timeout_cb(gpointer user_data)
1334 {
1335  bool *timeout_popped = (bool*) user_data;
1336 
1337  *timeout_popped = TRUE;
1338  return FALSE;
1339 }
1340 
1353 void
1354 pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool (*check)(guint))
1355 {
1356  bool timeout_popped = FALSE;
1357  guint timer = 0;
1358  GMainContext *ctx = NULL;
1359 
1360  CRM_CHECK(mloop && check, return);
1361 
1362  ctx = g_main_loop_get_context(mloop);
1363  if (ctx) {
1364  time_t start_time = time(NULL);
1365 
1366  timer = g_timeout_add(timer_ms, drain_timeout_cb, &timeout_popped);
1367  while (!timeout_popped
1368  && check(timer_ms - (time(NULL) - start_time) * 1000)) {
1369  g_main_context_iteration(ctx, TRUE);
1370  }
1371  }
1372  if (!timeout_popped && (timer > 0)) {
1373  g_source_remove(timer);
1374  }
1375 }
void * mainloop_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:964
#define LOG_TRACE
Definition: logging.h:29
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:190
pid_t mainloop_child_pid(mainloop_child_t *child)
Definition: mainloop.c:946
bool mainloop_timer_running(mainloop_timer_t *t)
Definition: mainloop.c:1254
bool crm_ipc_connect(crm_ipc_t *client)
Establish an IPC connection to a Pacemaker component.
Definition: ipc.c:924
A dumping ground.
void(* destroy)(gpointer)
Definition: mainloop.h:64
#define crm_notice(fmt, args...)
Definition: logging.h:276
struct signal_s crm_signal_t
#define crm_crit(fmt, args...)
Definition: logging.h:273
void mainloop_del_fd(mainloop_io_t *client)
Definition: mainloop.c:930
void mainloop_del_ipc_server(qb_ipcs_service_t *server)
Definition: mainloop.c:695
mainloop_child_flags
Definition: mainloop.h:19
gboolean mainloop_destroy_signal(int sig)
Definition: mainloop.c:385
crm_trigger_t * mainloop_add_trigger(int priority, int(*dispatch)(gpointer user_data), gpointer userdata)
Definition: mainloop.c:213
void(* destroy)(gpointer userdata)
Definition: mainloop.h:105
int crm_ipc_get_fd(crm_ipc_t *client)
Definition: ipc.c:1030
const char * pcmk_strerror(int rc)
Definition: logging.c:1017
void mainloop_trigger_complete(crm_trigger_t *trig)
Definition: mainloop.c:201
const char * mainloop_child_name(mainloop_child_t *child)
Definition: mainloop.c:952
struct mainloop_timer_s mainloop_timer_t
Definition: mainloop.h:27
struct mainloop_io_s mainloop_io_t
Definition: mainloop.h:25
struct mainloop_child_s mainloop_child_t
Definition: mainloop.h:26
gboolean crm_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:302
long crm_ipc_read(crm_ipc_t *client)
Definition: ipc.c:1138
mainloop_timer_t * mainloop_timer_add(const char *name, guint period_ms, bool repeat, GSourceFunc cb, void *userdata)
Definition: mainloop.c:1297
uint32_t pid
Definition: internal.h:77
mainloop_io_t * mainloop_add_ipc_client(const char *name, int priority, size_t max_size, void *userdata, struct ipc_client_callbacks *callbacks)
Definition: mainloop.c:843
struct qb_ipcs_poll_handlers gio_poll_funcs
Definition: mainloop.c:615
guint mainloop_timer_set_period(mainloop_timer_t *t, guint period_ms)
Definition: mainloop.c:1280
Wrappers for and extensions to glib mainloop.
void crm_client_init(void)
Definition: ipc.c:256
void mainloop_timer_del(mainloop_timer_t *t)
Definition: mainloop.c:1318
crm_ipc_t * mainloop_get_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:877
const char * crm_ipc_buffer(crm_ipc_t *client)
Definition: ipc.c:1185
void mainloop_del_ipc_client(mainloop_io_t *client)
Definition: mainloop.c:871
struct trigger_s crm_trigger_t
Definition: mainloop.h:24
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:104
#define crm_warn(fmt, args...)
Definition: logging.h:275
int mainloop_child_timeout(mainloop_child_t *child)
Definition: mainloop.c:958
uint32_t id
Definition: internal.h:76
#define crm_debug(fmt, args...)
Definition: logging.h:279
struct crm_ipc_s crm_ipc_t
Definition: ipc.h:52
void mainloop_set_trigger(crm_trigger_t *source)
Definition: mainloop.c:225
#define crm_trace(fmt, args...)
Definition: logging.h:280
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:886
void mainloop_timer_start(mainloop_timer_t *t)
Definition: mainloop.c:1262
qb_ipcs_service_t * mainloop_add_ipc_server(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks)
Definition: mainloop.c:648
Wrappers for and extensions to libxml2.
void mainloop_clear_child_userdata(mainloop_child_t *child)
Definition: mainloop.c:970
void mainloop_timer_stop(mainloop_timer_t *t)
Definition: mainloop.c:1271
void mainloop_child_add_with_flags(pid_t pid, int timeout, const char *desc, void *privatedata, enum mainloop_child_flags flags, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1173
unsigned int crm_ipc_default_buffer_size(void)
Definition: ipc.c:71
void crm_ipc_destroy(crm_ipc_t *client)
Definition: ipc.c:1007
void mainloop_child_add(pid_t pid, int timeout, const char *desc, void *privatedata, void(*callback)(mainloop_child_t *p, pid_t pid, int core, int signo, int exitcode))
Definition: mainloop.c:1207
bool crm_ipc_connected(crm_ipc_t *client)
Definition: ipc.c:1044
#define CRM_XS
Definition: logging.h:42
#define crm_perror(level, fmt, args...)
Log a system error message.
Definition: logging.h:252
qb_ipcs_service_t * mainloop_add_ipc_server_with_prio(const char *name, enum qb_ipc_type type, struct qb_ipcs_service_handlers *callbacks, enum qb_loop_priority prio)
Start server-side API end-point, hooked into the internal event loop.
Definition: mainloop.c:655
#define crm_err(fmt, args...)
Definition: logging.h:274
struct mainloop_timer_s mainloop
Definition: mainloop.c:1222
crm_ipc_t * crm_ipc_new(const char *name, size_t max_size)
Definition: ipc.c:894
#define CRM_ASSERT(expr)
Definition: error.h:20
char data[0]
Definition: internal.h:86
void mainloop_cleanup(void)
Definition: mainloop.c:411
gboolean mainloop_add_signal(int sig, void(*dispatch)(int sig))
Definition: mainloop.c:327
int mainloop_child_kill(pid_t pid)
Definition: mainloop.c:1118
gboolean mainloop_destroy_trigger(crm_trigger_t *source)
Definition: mainloop.c:233
void pcmk_drain_main_loop(GMainLoop *mloop, guint timer_ms, bool(*check)(guint))
Process main loop events while a certain condition is met.
Definition: mainloop.c:1354
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
void crm_ipc_close(crm_ipc_t *client)
Definition: ipc.c:992
GList * GListPtr
Definition: crm.h:210
#define crm_info(fmt, args...)
Definition: logging.h:277
uint64_t flags
Definition: remote.c:156
int(* dispatch)(const char *buffer, ssize_t length, gpointer userdata)
Definition: mainloop.h:63
enum crm_ais_msg_types type
Definition: internal.h:79
#define int32_t
Definition: stdint.in.h:157