pacemaker  1.1.24-3850484742
Scalable High-Availability cluster resource manager
cpg.c
Go to the documentation of this file.
1 /*
2  * Copyright 2004-2019 the Pacemaker project contributors
3  *
4  * The version control history for this file may have further details.
5  *
6  * This source code is licensed under the GNU Lesser General Public License
7  * version 2.1 or later (LGPLv2.1+) WITHOUT ANY WARRANTY.
8  */
9 
10 #include <crm_internal.h>
11 #include <bzlib.h>
12 #include <sys/socket.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 
17 #include <crm/common/ipc.h>
18 #include <crm/cluster/internal.h>
19 #include <crm/common/mainloop.h>
20 #include <sys/utsname.h>
21 
22 #include <qb/qbipcc.h>
23 #include <qb/qbutil.h>
24 
25 #include <corosync/corodefs.h>
26 #include <corosync/corotypes.h>
27 #include <corosync/hdb.h>
28 #include <corosync/cpg.h>
29 
30 #include <crm/msg_xml.h>
31 
32 #include <crm/common/ipc_internal.h> /* PCMK__SPECIAL_PID* */
33 
34 cpg_handle_t pcmk_cpg_handle = 0; /* TODO: Remove, use cluster.cpg_handle */
35 
36 static bool cpg_evicted = FALSE;
37 gboolean(*pcmk_cpg_dispatch_fn) (int kind, const char *from, const char *data) = NULL;
38 
39 #define cs_repeat(counter, max, code) do { \
40  code; \
41  if(rc == CS_ERR_TRY_AGAIN || rc == CS_ERR_QUEUE_FULL) { \
42  counter++; \
43  crm_debug("Retrying operation after %ds", counter); \
44  sleep(counter); \
45  } else { \
46  break; \
47  } \
48  } while(counter < max)
49 
50 void
52 {
53  pcmk_cpg_handle = 0;
54  if (cluster->cpg_handle) {
55  crm_trace("Disconnecting CPG");
56  cpg_leave(cluster->cpg_handle, &cluster->group);
57  cpg_finalize(cluster->cpg_handle);
58  cluster->cpg_handle = 0;
59 
60  } else {
61  crm_info("No CPG connection");
62  }
63 }
64 
65 uint32_t get_local_nodeid(cpg_handle_t handle)
66 {
67  cs_error_t rc = CS_OK;
68  int retries = 0;
69  static uint32_t local_nodeid = 0;
70  cpg_handle_t local_handle = handle;
71  cpg_callbacks_t cb = { };
72  int fd = -1;
73  uid_t found_uid = 0;
74  gid_t found_gid = 0;
75  pid_t found_pid = 0;
76  int rv;
77 
78  if(local_nodeid != 0) {
79  return local_nodeid;
80  }
81 
82 #if 0
83  /* Should not be necessary */
85  get_ais_details(&local_nodeid, NULL);
86  goto done;
87  }
88 #endif
89 
90  if(handle == 0) {
91  crm_trace("Creating connection");
92  cs_repeat(retries, 5, rc = cpg_initialize(&local_handle, &cb));
93  if (rc != CS_OK) {
94  crm_err("Could not connect to the CPG API (rc=%d)", rc);
95  return 0;
96  }
97 
98  rc = cpg_fd_get(local_handle, &fd);
99  if (rc != CS_OK) {
100  crm_err("Could not obtain the CPG API connection (rc=%d)", rc);
101  goto bail;
102  }
103 
104  /* CPG provider run as root (in given user namespace, anyway)? */
105  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
106  &found_uid, &found_gid))) {
107  crm_err("CPG provider is not authentic:"
108  " process %lld (uid: %lld, gid: %lld)",
109  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
110  (long long) found_uid, (long long) found_gid);
111  goto bail;
112  } else if (rv < 0) {
113  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
114  strerror(-rv), -rv);
115  goto bail;
116  }
117  }
118 
119  if (rc == CS_OK) {
120  retries = 0;
121  crm_trace("Performing lookup");
122  cs_repeat(retries, 5, rc = cpg_local_get(local_handle, &local_nodeid));
123  }
124 
125  if (rc != CS_OK) {
126  crm_err("Could not get local node id from the CPG API: %s (%d)", ais_error2text(rc), rc);
127  }
128 
129 bail:
130  if(handle == 0) {
131  crm_trace("Closing connection");
132  cpg_finalize(local_handle);
133  }
134  crm_debug("Local nodeid is %u", local_nodeid);
135  return local_nodeid;
136 }
137 
138 
141 
142 static ssize_t crm_cs_flush(gpointer data);
143 
144 static gboolean
145 crm_cs_flush_cb(gpointer data)
146 {
147  cs_message_timer = 0;
148  crm_cs_flush(data);
149  return FALSE;
150 }
151 
152 #define CS_SEND_MAX 200
153 static ssize_t
154 crm_cs_flush(gpointer data)
155 {
156  int sent = 0;
157  ssize_t rc = 0;
158  int queue_len = 0;
159  static unsigned int last_sent = 0;
160  cpg_handle_t *handle = (cpg_handle_t *)data;
161 
162  if (*handle == 0) {
163  crm_trace("Connection is dead");
164  return pcmk_ok;
165  }
166 
167  queue_len = g_list_length(cs_message_queue);
168  if ((queue_len % 1000) == 0 && queue_len > 1) {
169  crm_err("CPG queue has grown to %d", queue_len);
170 
171  } else if (queue_len == CS_SEND_MAX) {
172  crm_warn("CPG queue has grown to %d", queue_len);
173  }
174 
175  if (cs_message_timer) {
176  /* There is already a timer, wait until it goes off */
177  crm_trace("Timer active %d", cs_message_timer);
178  return pcmk_ok;
179  }
180 
181  while (cs_message_queue && sent < CS_SEND_MAX) {
182  struct iovec *iov = cs_message_queue->data;
183 
184  errno = 0;
185  rc = cpg_mcast_joined(*handle, CPG_TYPE_AGREED, iov, 1);
186 
187  if (rc != CS_OK) {
188  break;
189  }
190 
191  sent++;
192  last_sent++;
193  crm_trace("CPG message sent, size=%llu",
194  (unsigned long long) iov->iov_len);
195 
196  cs_message_queue = g_list_remove(cs_message_queue, iov);
197  free(iov->iov_base);
198  free(iov);
199  }
200 
201  queue_len -= sent;
202  if (sent > 1 || cs_message_queue) {
203  crm_info("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
204  sent, queue_len, last_sent, ais_error2text(rc),
205  (long long) rc);
206  } else {
207  crm_trace("Sent %d CPG messages (%d remaining, last=%u): %s (%lld)",
208  sent, queue_len, last_sent, ais_error2text(rc),
209  (long long) rc);
210  }
211 
212  if (cs_message_queue) {
213  uint32_t delay_ms = 100;
214  if(rc != CS_OK) {
215  /* Proportionally more if sending failed but cap at 1s */
216  delay_ms = QB_MIN(1000, CS_SEND_MAX + (10 * queue_len));
217  }
218  cs_message_timer = g_timeout_add(delay_ms, crm_cs_flush_cb, data);
219  }
220 
221  return rc;
222 }
223 
224 gboolean
225 send_cpg_iov(struct iovec * iov)
226 {
227  static unsigned int queued = 0;
228 
229  queued++;
230  crm_trace("Queueing CPG message %u (%llu bytes)",
231  queued, (unsigned long long) iov->iov_len);
232  cs_message_queue = g_list_append(cs_message_queue, iov);
233  crm_cs_flush(&pcmk_cpg_handle);
234  return TRUE;
235 }
236 
237 static int
238 pcmk_cpg_dispatch(gpointer user_data)
239 {
240  int rc = 0;
241  crm_cluster_t *cluster = (crm_cluster_t*) user_data;
242 
243  rc = cpg_dispatch(cluster->cpg_handle, CS_DISPATCH_ONE);
244  if (rc != CS_OK) {
245  crm_err("Connection to the CPG API failed: %s (%d)", ais_error2text(rc), rc);
246  cluster->cpg_handle = 0;
247  return -1;
248 
249  } else if(cpg_evicted) {
250  crm_err("Evicted from CPG membership");
251  return -1;
252  }
253  return 0;
254 }
255 
256 char *
257 pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content,
258  uint32_t *kind, const char **from)
259 {
260  char *data = NULL;
261  AIS_Message *msg = (AIS_Message *) content;
262 
263  if(handle) {
264  /* 'msg' came from CPG not the plugin
265  * Do filtering and field massaging
266  */
268  const char *local_name = get_local_node_name();
269 
270  if (msg->sender.id > 0 && msg->sender.id != nodeid) {
271  crm_err("Nodeid mismatch from %d.%d: claimed nodeid=%u", nodeid, pid, msg->sender.id);
272  return NULL;
273 
274  } else if (msg->host.id != 0 && (local_nodeid != msg->host.id)) {
275  /* Not for us */
276  crm_trace("Not for us: %u != %u", msg->host.id, local_nodeid);
277  return NULL;
278  } else if (msg->host.size != 0 && safe_str_neq(msg->host.uname, local_name)) {
279  /* Not for us */
280  crm_trace("Not for us: %s != %s", msg->host.uname, local_name);
281  return NULL;
282  }
283 
284  msg->sender.id = nodeid;
285  if (msg->sender.size == 0) {
286  crm_node_t *peer = crm_get_peer(nodeid, NULL);
287 
288  if (peer == NULL) {
289  crm_err("Peer with nodeid=%u is unknown", nodeid);
290 
291  } else if (peer->uname == NULL) {
292  crm_err("No uname for peer with nodeid=%u", nodeid);
293 
294  } else {
295  crm_notice("Fixing uname for peer with nodeid=%u", nodeid);
296  msg->sender.size = strlen(peer->uname);
297  memset(msg->sender.uname, 0, MAX_NAME);
298  memcpy(msg->sender.uname, peer->uname, msg->sender.size);
299  }
300  }
301  }
302 
303  crm_trace("Got new%s message (size=%d, %d, %d)",
304  msg->is_compressed ? " compressed" : "",
305  ais_data_len(msg), msg->size, msg->compressed_size);
306 
307  if (kind != NULL) {
308  *kind = msg->header.id;
309  }
310  if (from != NULL) {
311  *from = msg->sender.uname;
312  }
313 
314  if (msg->is_compressed && msg->size > 0) {
315  int rc = BZ_OK;
316  char *uncompressed = NULL;
317  unsigned int new_size = msg->size + 1;
318 
319  if (check_message_sanity(msg, NULL) == FALSE) {
320  goto badmsg;
321  }
322 
323  crm_trace("Decompressing message data");
324  uncompressed = calloc(1, new_size);
325  rc = BZ2_bzBuffToBuffDecompress(uncompressed, &new_size, msg->data, msg->compressed_size, 1, 0);
326 
327  if (rc != BZ_OK) {
328  crm_err("Decompression failed: %d", rc);
329  free(uncompressed);
330  goto badmsg;
331  }
332 
333  CRM_ASSERT(rc == BZ_OK);
334  CRM_ASSERT(new_size == msg->size);
335 
336  data = uncompressed;
337 
338  } else if (check_message_sanity(msg, data) == FALSE) {
339  goto badmsg;
340 
341  } else if (safe_str_eq("identify", data)) {
342  char *pid_s = crm_getpid_s();
343 
344  send_cluster_text(crm_class_cluster, pid_s, TRUE, NULL, crm_msg_ais);
345  free(pid_s);
346  return NULL;
347 
348  } else {
349  data = strdup(msg->data);
350  }
351 
352  if (msg->header.id != crm_class_members) {
353  /* Is this even needed anymore? */
354  crm_get_peer(msg->sender.id, msg->sender.uname);
355  }
356 
357  if (msg->header.id == crm_class_rmpeer) {
358  uint32_t id = crm_int_helper(data, NULL);
359 
360  crm_info("Removing peer %s/%u", data, id);
361  reap_crm_member(id, NULL);
362  free(data);
363  return NULL;
364 
365 #if SUPPORT_PLUGIN
366  } else if (is_classic_ais_cluster()) {
368 #endif
369  }
370 
371  crm_trace("Payload: %.200s", data);
372  return data;
373 
374  badmsg:
375  crm_err("Invalid message (id=%d, dest=%s:%s, from=%s:%s.%d):"
376  " min=%d, total=%d, size=%d, bz2_size=%d",
377  msg->id, ais_dest(&(msg->host)), msg_type2text(msg->host.type),
378  ais_dest(&(msg->sender)), msg_type2text(msg->sender.type),
379  msg->sender.pid, (int)sizeof(AIS_Message),
380  msg->header.size, msg->size, msg->compressed_size);
381 
382  free(data);
383  return NULL;
384 }
385 
386 static int cmp_member_list_nodeid(const void *first,
387  const void *second)
388 {
389  const struct cpg_address *const a = *((const struct cpg_address **) first),
390  *const b = *((const struct cpg_address **) second);
391  if (a->nodeid < b->nodeid) {
392  return -1;
393  } else if (a->nodeid > b->nodeid) {
394  return 1;
395  }
396  /* don't bother with "reason" nor "pid" */
397  return 0;
398 }
399 
400 static const char *
401 cpgreason2str(cpg_reason_t reason)
402 {
403  switch (reason) {
404  case CPG_REASON_JOIN: return " via cpg_join";
405  case CPG_REASON_LEAVE: return " via cpg_leave";
406  case CPG_REASON_NODEDOWN: return " via cluster exit";
407  case CPG_REASON_NODEUP: return " via cluster join";
408  case CPG_REASON_PROCDOWN: return " for unknown reason";
409  default: break;
410  }
411  return "";
412 }
413 
414 static inline const char *
415 peer_name(crm_node_t *peer)
416 {
417  if (peer == NULL) {
418  return "unknown node";
419  } else if (peer->uname == NULL) {
420  return "peer node";
421  } else {
422  return peer->uname;
423  }
424 }
425 
426 void
427 pcmk_cpg_membership(cpg_handle_t handle,
428  const struct cpg_name *groupName,
429  const struct cpg_address *member_list, size_t member_list_entries,
430  const struct cpg_address *left_list, size_t left_list_entries,
431  const struct cpg_address *joined_list, size_t joined_list_entries)
432 {
433  int i;
434  gboolean found = FALSE;
435  static int counter = 0;
437  const struct cpg_address *key, **sorted;
438 
439  sorted = malloc(member_list_entries * sizeof(const struct cpg_address *));
440  CRM_ASSERT(sorted != NULL);
441 
442  for (size_t iter = 0; iter < member_list_entries; iter++) {
443  sorted[iter] = member_list + iter;
444  }
445  /* so that the cross-matching multiply-subscribed nodes is then cheap */
446  qsort(sorted, member_list_entries, sizeof(const struct cpg_address *),
447  cmp_member_list_nodeid);
448 
449  for (i = 0; i < left_list_entries; i++) {
450  crm_node_t *peer = crm_find_peer(left_list[i].nodeid, NULL);
451  const struct cpg_address **rival = NULL;
452 
453  /* in CPG world, NODE:PROCESS-IN-MEMBERSHIP-OF-G is an 1:N relation
454  and not playing by this rule may go wild in case of multiple
455  residual instances of the same pacemaker daemon at the same node
456  -- we must ensure that the possible local rival(s) won't make us
457  cry out and bail (e.g. when they quit themselves), since all the
458  surrounding logic denies this simple fact that the full membership
459  is discriminated also per the PID of the process beside mere node
460  ID (and implicitly, group ID); practically, this will be sound in
461  terms of not preventing progress, since all the CPG joiners are
462  also API end-point carriers, and that's what matters locally
463  (who's the winner);
464  remotely, we will just compare leave_list and member_list and if
465  the left process has its node retained in member_list (under some
466  other PID, anyway) we will just ignore it as well
467  XXX: long-term fix is to establish in-out PID-aware tracking? */
468  if (peer) {
469  key = &left_list[i];
470  rival = bsearch(&key, sorted, member_list_entries,
471  sizeof(const struct cpg_address *),
472  cmp_member_list_nodeid);
473  }
474 
475  if (rival == NULL) {
476  crm_info("Group %s event %d: %s (node %u pid %u) left%s",
477  groupName->value, counter, peer_name(peer),
478  left_list[i].nodeid, left_list[i].pid,
479  cpgreason2str(left_list[i].reason));
480  if (peer) {
481  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg,
482  OFFLINESTATUS);
483  }
484  } else if (left_list[i].nodeid == local_nodeid) {
485  crm_warn("Group %s event %d: duplicate local pid %u left%s",
486  groupName->value, counter,
487  left_list[i].pid, cpgreason2str(left_list[i].reason));
488  } else {
489  crm_warn("Group %s event %d: "
490  "%s (node %u) duplicate pid %u left%s (%u remains)",
491  groupName->value, counter, peer_name(peer),
492  left_list[i].nodeid, left_list[i].pid,
493  cpgreason2str(left_list[i].reason), (*rival)->pid);
494  }
495  }
496  free(sorted);
497  sorted = NULL;
498 
499  for (i = 0; i < joined_list_entries; i++) {
500  crm_info("Group %s event %d: node %u pid %u joined%s",
501  groupName->value, counter, joined_list[i].nodeid,
502  joined_list[i].pid, cpgreason2str(joined_list[i].reason));
503  }
504 
505  for (i = 0; i < member_list_entries; i++) {
506  crm_node_t *peer = crm_get_peer(member_list[i].nodeid, NULL);
507 
508  if (member_list[i].nodeid == local_nodeid
509  && member_list[i].pid != getpid()) {
510  /* see the note above */
511  crm_warn("Group %s event %d: detected duplicate local pid %u",
512  groupName->value, counter, member_list[i].pid);
513  continue;
514  }
515  crm_info("Group %s event %d: %s (node %u pid %u) is member",
516  groupName->value, counter, peer_name(peer),
517  member_list[i].nodeid, member_list[i].pid);
518 
519  /* Anyone that is sending us CPG messages must also be a _CPG_ member.
520  * But it's _not_ safe to assume it's in the quorum membership.
521  * We may have just found out it's dead and are processing the last couple of messages it sent
522  */
523  peer = crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
524  if(peer && peer->state && crm_is_peer_active(peer) == FALSE) {
525  time_t now = time(NULL);
526 
527  /* Co-opt the otherwise unused votes field */
528  if(peer->votes == 0) {
529  peer->votes = now;
530 
531  } else if(now > (60 + peer->votes)) {
532  /* On the otherhand, if we're still getting messages, at a certain point
533  * we need to acknowledge our internal cache is probably wrong
534  *
535  * Set the threshold to 1 minute
536  */
537  crm_warn("Node %u is member of group %s but was believed offline",
538  member_list[i].nodeid, groupName->value);
539  if (crm_update_peer_state(__FUNCTION__, peer, CRM_NODE_MEMBER, 0)) {
540  peer->votes = 0;
541  }
542  }
543  }
544 
545  if (local_nodeid == member_list[i].nodeid) {
546  found = TRUE;
547  }
548  }
549 
550  if (!found) {
551  crm_err("Local node was evicted from group %s", groupName->value);
552  cpg_evicted = TRUE;
553  }
554 
555  counter++;
556 }
557 
558 gboolean
560 {
561  cs_error_t rc;
562  int fd = -1;
563  int retries = 0;
564  uint32_t id = 0;
565  crm_node_t *peer = NULL;
566  cpg_handle_t handle = 0;
567  uid_t found_uid = 0;
568  gid_t found_gid = 0;
569  pid_t found_pid = 0;
570  int rv;
571 
572  struct mainloop_fd_callbacks cpg_fd_callbacks = {
573  .dispatch = pcmk_cpg_dispatch,
574  .destroy = cluster->destroy,
575  };
576 
577  cpg_callbacks_t cpg_callbacks = {
578  .cpg_deliver_fn = cluster->cpg.cpg_deliver_fn,
579  .cpg_confchg_fn = cluster->cpg.cpg_confchg_fn,
580  /* .cpg_deliver_fn = pcmk_cpg_deliver, */
581  /* .cpg_confchg_fn = pcmk_cpg_membership, */
582  };
583 
584  cpg_evicted = FALSE;
585  cluster->group.length = 0;
586  cluster->group.value[0] = 0;
587 
588  /* group.value is char[128] */
589  strncpy(cluster->group.value, crm_system_name?crm_system_name:"unknown", 127);
590  cluster->group.value[127] = 0;
591  cluster->group.length = 1 + QB_MIN(127, strlen(cluster->group.value));
592 
593  cs_repeat(retries, 30, rc = cpg_initialize(&handle, &cpg_callbacks));
594  if (rc != CS_OK) {
595  crm_err("Could not connect to the CPG API (rc=%d)", rc);
596  goto bail;
597  }
598 
599  rc = cpg_fd_get(handle, &fd);
600  if (rc != CS_OK) {
601  crm_err("Could not obtain the CPG API connection (rc=%d)", rc);
602  goto bail;
603  }
604 
605  /* CPG provider run as root (in given user namespace, anyway)? */
606  if (!(rv = crm_ipc_is_authentic_process(fd, (uid_t) 0,(gid_t) 0, &found_pid,
607  &found_uid, &found_gid))) {
608  crm_err("CPG provider is not authentic:"
609  " process %lld (uid: %lld, gid: %lld)",
610  (long long) PCMK__SPECIAL_PID_AS_0(found_pid),
611  (long long) found_uid, (long long) found_gid);
612  rc = CS_ERR_ACCESS;
613  goto bail;
614  } else if (rv < 0) {
615  crm_err("Could not verify authenticity of CPG provider: %s (%d)",
616  strerror(-rv), -rv);
617  rc = CS_ERR_ACCESS;
618  goto bail;
619  }
620 
621  id = get_local_nodeid(handle);
622  if (id == 0) {
623  crm_err("Could not get local node id from the CPG API");
624  goto bail;
625 
626  }
627  cluster->nodeid = id;
628 
629  retries = 0;
630  cs_repeat(retries, 30, rc = cpg_join(handle, &cluster->group));
631  if (rc != CS_OK) {
632  crm_err("Could not join the CPG group '%s': %d", crm_system_name, rc);
633  goto bail;
634  }
635 
636  pcmk_cpg_handle = handle;
637  cluster->cpg_handle = handle;
638  mainloop_add_fd("corosync-cpg", G_PRIORITY_MEDIUM, fd, cluster, &cpg_fd_callbacks);
639 
640  bail:
641  if (rc != CS_OK) {
642  cpg_finalize(handle);
643  return FALSE;
644  }
645 
646  peer = crm_get_peer(id, NULL);
647  crm_update_peer_proc(__FUNCTION__, peer, crm_proc_cpg, ONLINESTATUS);
648  return TRUE;
649 }
650 
651 gboolean
652 send_cluster_message_cs(xmlNode * msg, gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
653 {
654  gboolean rc = TRUE;
655  char *data = NULL;
656 
657  data = dump_xml_unformatted(msg);
658  rc = send_cluster_text(crm_class_cluster, data, local, node, dest);
659  free(data);
660  return rc;
661 }
662 
663 gboolean
664 send_cluster_text(int class, const char *data,
665  gboolean local, crm_node_t * node, enum crm_ais_msg_types dest)
666 {
667  static int msg_id = 0;
668  static int local_pid = 0;
669  static int local_name_len = 0;
670  static const char *local_name = NULL;
671 
672  char *target = NULL;
673  struct iovec *iov;
674  AIS_Message *msg = NULL;
676 
677  /* There are only 6 handlers registered to crm_lib_service in plugin.c */
678  CRM_CHECK(class < 6, crm_err("Invalid message class: %d", class);
679  return FALSE);
680 
681 #if !SUPPORT_PLUGIN
682  CRM_CHECK(dest != crm_msg_ais, return FALSE);
683 #endif
684 
685  if(local_name == NULL) {
686  local_name = get_local_node_name();
687  }
688  if(local_name_len == 0 && local_name) {
689  local_name_len = strlen(local_name);
690  }
691 
692  if (data == NULL) {
693  data = "";
694  }
695 
696  if (local_pid == 0) {
697  local_pid = getpid();
698  }
699 
700  if (sender == crm_msg_none) {
701  sender = local_pid;
702  }
703 
704  msg = calloc(1, sizeof(AIS_Message));
705 
706  msg_id++;
707  msg->id = msg_id;
708  msg->header.id = class;
709  msg->header.error = CS_OK;
710 
711  msg->host.type = dest;
712  msg->host.local = local;
713 
714  if (node) {
715  if (node->uname) {
716  target = strdup(node->uname);
717  msg->host.size = strlen(node->uname);
718  memset(msg->host.uname, 0, MAX_NAME);
719  memcpy(msg->host.uname, node->uname, msg->host.size);
720  } else {
721  target = crm_strdup_printf("%u", node->id);
722  }
723  msg->host.id = node->id;
724  } else {
725  target = strdup("all");
726  }
727 
728  msg->sender.id = 0;
729  msg->sender.type = sender;
730  msg->sender.pid = local_pid;
731  msg->sender.size = local_name_len;
732  memset(msg->sender.uname, 0, MAX_NAME);
733  if(local_name && msg->sender.size) {
734  memcpy(msg->sender.uname, local_name, msg->sender.size);
735  }
736 
737  msg->size = 1 + strlen(data);
738  msg->header.size = sizeof(AIS_Message) + msg->size;
739 
740  if (msg->size < CRM_BZ2_THRESHOLD) {
741  msg = realloc_safe(msg, msg->header.size);
742  memcpy(msg->data, data, msg->size);
743 
744  } else {
745  char *compressed = NULL;
746  unsigned int new_size = 0;
747  char *uncompressed = strdup(data);
748 
749  if (crm_compress_string(uncompressed, msg->size, 0, &compressed, &new_size)) {
750 
751  msg->header.size = sizeof(AIS_Message) + new_size;
752  msg = realloc_safe(msg, msg->header.size);
753  memcpy(msg->data, compressed, new_size);
754 
755  msg->is_compressed = TRUE;
756  msg->compressed_size = new_size;
757 
758  } else {
759  msg = realloc_safe(msg, msg->header.size);
760  memcpy(msg->data, data, msg->size);
761  }
762 
763  free(uncompressed);
764  free(compressed);
765  }
766 
767  iov = calloc(1, sizeof(struct iovec));
768  iov->iov_base = msg;
769  iov->iov_len = msg->header.size;
770 
771  if (msg->compressed_size) {
772  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes compressed payload): %.200s",
773  msg->id, target, (unsigned long long) iov->iov_len,
774  msg->compressed_size, data);
775  } else {
776  crm_trace("Queueing CPG message %u to %s (%llu bytes, %d bytes payload): %.200s",
777  msg->id, target, (unsigned long long) iov->iov_len,
778  msg->size, data);
779  }
780  free(target);
781 
782 #if SUPPORT_PLUGIN
783  /* The plugin is the only time we don't use CPG messaging */
785  return send_plugin_text(class, iov);
786  }
787 #endif
788 
789  send_cpg_iov(iov);
790 
791  return TRUE;
792 }
793 
795 text2msg_type(const char *text)
796 {
797  int type = crm_msg_none;
798 
799  CRM_CHECK(text != NULL, return type);
800  if (safe_str_eq(text, "ais")) {
801  type = crm_msg_ais;
802  } else if (safe_str_eq(text, "crm_plugin")) {
803  type = crm_msg_ais;
804  } else if (safe_str_eq(text, CRM_SYSTEM_CIB)) {
805  type = crm_msg_cib;
806  } else if (safe_str_eq(text, CRM_SYSTEM_CRMD)) {
807  type = crm_msg_crmd;
808  } else if (safe_str_eq(text, CRM_SYSTEM_DC)) {
809  type = crm_msg_crmd;
810  } else if (safe_str_eq(text, CRM_SYSTEM_TENGINE)) {
811  type = crm_msg_te;
812  } else if (safe_str_eq(text, CRM_SYSTEM_PENGINE)) {
813  type = crm_msg_pe;
814  } else if (safe_str_eq(text, CRM_SYSTEM_LRMD)) {
815  type = crm_msg_lrmd;
816  } else if (safe_str_eq(text, CRM_SYSTEM_STONITHD)) {
817  type = crm_msg_stonithd;
818  } else if (safe_str_eq(text, "stonith-ng")) {
819  type = crm_msg_stonith_ng;
820  } else if (safe_str_eq(text, "attrd")) {
821  type = crm_msg_attrd;
822 
823  } else {
824  /* This will normally be a transient client rather than
825  * a cluster daemon. Set the type to the pid of the client
826  */
827  int scan_rc = sscanf(text, "%d", &type);
828 
829  if (scan_rc != 1 || type <= crm_msg_stonith_ng) {
830  /* Ensure it's sane */
831  type = crm_msg_none;
832  }
833  }
834  return type;
835 }
bool send_plugin_text(int class, struct iovec *iov)
Definition: legacy.c:135
enum crm_ais_msg_types type
Definition: internal.h:38
#define CRM_CHECK(expr, failure_action)
Definition: logging.h:190
char data[0]
Definition: internal.h:55
gboolean send_cpg_iov(struct iovec *iov)
Definition: cpg.c:225
uint32_t local_nodeid
Definition: plugin.c:65
#define crm_notice(fmt, args...)
Definition: logging.h:276
gboolean is_compressed
Definition: internal.h:47
uint32_t size
Definition: internal.h:52
gboolean safe_str_neq(const char *a, const char *b)
Definition: strings.c:182
crm_ais_msg_types
Definition: cluster.h:128
mainloop_io_t * mainloop_add_fd(const char *name, int priority, int fd, void *userdata, struct mainloop_fd_callbacks *callbacks)
Definition: mainloop.c:886
uint32_t nodeid
Definition: cluster.h:97
uint32_t id
Definition: cluster.h:73
gboolean crm_is_peer_active(const crm_node_t *node)
Definition: membership.c:297
const char * get_local_node_name(void)
Definition: cluster.c:289
void(* destroy)(gpointer)
Definition: cluster.h:99
#define pcmk_ok
Definition: error.h:45
long long crm_int_helper(const char *text, char **end_text)
Definition: strings.c:81
uint32_t id
Definition: internal.h:35
#define PCMK__SPECIAL_PID_AS_0(p)
Definition: ipc_internal.h:34
crm_node_t * crm_get_peer(unsigned int id, const char *uname)
Definition: membership.c:689
char * crm_system_name
Definition: utils.c:61
crm_node_t * crm_update_peer_state(const char *source, crm_node_t *node, const char *state, uint64_t membership)
Update a node&#39;s state and membership information.
Definition: membership.c:1090
#define CS_SEND_MAX
Definition: cpg.c:152
uint32_t pid
Definition: internal.h:77
char * strerror(int errnum)
AIS_Host sender
Definition: internal.h:81
char * pcmk_message_common_cs(cpg_handle_t handle, uint32_t nodeid, uint32_t pid, void *content, uint32_t *kind, const char **from)
Definition: cpg.c:257
Wrappers for and extensions to glib mainloop.
#define CRM_SYSTEM_DC
Definition: crm.h:76
void plugin_handle_membership(AIS_Message *msg)
Definition: legacy.c:222
void cluster_disconnect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:51
int(* dispatch)(gpointer userdata)
Definition: mainloop.h:104
int cs_message_timer
Definition: cpg.c:140
#define crm_warn(fmt, args...)
Definition: logging.h:275
uint32_t id
Definition: internal.h:76
#define crm_debug(fmt, args...)
Definition: logging.h:279
GListPtr cs_message_queue
Definition: cpg.c:139
#define crm_trace(fmt, args...)
Definition: logging.h:280
gboolean local
Definition: internal.h:37
crm_node_t * crm_update_peer_proc(const char *source, crm_node_t *peer, uint32_t flag, const char *status)
Definition: membership.c:904
#define CRM_SYSTEM_PENGINE
Definition: crm.h:82
AIS_Host sender
Definition: internal.h:50
uint32_t id
Definition: internal.h:46
gboolean send_cluster_text(int class, const char *data, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:664
gboolean check_message_sanity(const AIS_Message *msg, const char *data)
Definition: plugin.c:1356
struct crm_ais_msg_s AIS_Message
Definition: internal.h:32
cpg_handle_t pcmk_cpg_handle
Definition: cpg.c:34
#define ais_data_len(msg)
Definition: internal.h:208
uint32_t size
Definition: internal.h:39
#define CRM_NODE_MEMBER
Definition: cluster.h:44
guint reap_crm_member(uint32_t id, const char *name)
Remove all peer cache entries matching a node ID and/or uname.
Definition: membership.c:354
uint32_t compressed_size
Definition: internal.h:53
uint32_t counter
Definition: internal.h:78
#define MAX_NAME
Definition: crm.h:32
#define CRM_SYSTEM_CRMD
Definition: crm.h:80
bool crm_compress_string(const char *data, int length, int max, char **result, unsigned int *result_len)
Definition: strings.c:445
#define CRM_SYSTEM_STONITHD
Definition: crm.h:84
#define CRM_SYSTEM_CIB
Definition: crm.h:79
#define CRM_SYSTEM_TENGINE
Definition: crm.h:83
uint32_t get_local_nodeid(cpg_handle_t handle)
Definition: cpg.c:65
gboolean(* pcmk_cpg_dispatch_fn)(int kind, const char *from, const char *data)
Definition: cpg.c:37
#define crm_err(fmt, args...)
Definition: logging.h:274
#define G_PRIORITY_MEDIUM
Definition: mainloop.h:141
char uname[MAX_NAME]
Definition: internal.h:40
#define OFFLINESTATUS
Definition: util.h:54
enum crm_ais_msg_types text2msg_type(const char *text)
Definition: cpg.c:795
#define CRM_BZ2_THRESHOLD
Definition: xml.h:41
char * dump_xml_unformatted(xmlNode *msg)
Definition: xml.c:3220
#define CRM_SYSTEM_LRMD
Definition: crm.h:81
gboolean send_cluster_message_cs(xmlNode *msg, gboolean local, crm_node_t *node, enum crm_ais_msg_types dest)
Definition: cpg.c:652
#define uint32_t
Definition: stdint.in.h:158
#define CRM_ASSERT(expr)
Definition: error.h:20
char data[0]
Definition: internal.h:86
char * state
Definition: cluster.h:84
Wrappers for and extensions to libqb IPC.
int32_t votes
Definition: cluster.h:78
uint32_t pid
Definition: internal.h:36
char * uname
Definition: cluster.h:82
#define cs_repeat(counter, max, code)
Definition: cpg.c:39
AIS_Host host
Definition: internal.h:49
#define safe_str_eq(a, b)
Definition: util.h:74
#define ONLINESTATUS
Definition: util.h:53
char * crm_strdup_printf(char const *format,...) __attribute__((__format__(__printf__
crm_node_t * crm_find_peer(unsigned int id, const char *uname)
Definition: membership.c:553
void pcmk_cpg_membership(cpg_handle_t handle, const struct cpg_name *groupName, const struct cpg_address *member_list, size_t member_list_entries, const struct cpg_address *left_list, size_t left_list_entries, const struct cpg_address *joined_list, size_t joined_list_entries)
Definition: cpg.c:427
GList * GListPtr
Definition: crm.h:210
#define crm_info(fmt, args...)
Definition: logging.h:277
gboolean cluster_connect_cpg(crm_cluster_t *cluster)
Definition: cpg.c:559
gboolean is_classic_ais_cluster(void)
Definition: cluster.c:624
enum crm_ais_msg_types type
Definition: internal.h:79
enum cluster_type_e get_cluster_type(void)
Definition: cluster.c:513
int crm_ipc_is_authentic_process(int sock, uid_t refuid, gid_t refgid, pid_t *gotpid, uid_t *gotuid, gid_t *gotgid)
Check the authenticity of the IPC socket peer process.
Definition: ipc.c:1422
gboolean local
Definition: internal.h:78