6c4a3d1e71eb888af753bbcdb5179b0d15e7bead
[asterisk/asterisk.git] / res / res_corosync.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007, Digium, Inc.
5  * Copyright (C) 2012, Russell Bryant
6  *
7  * Russell Bryant <russell@russellbryant.net>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19
20 /*!
21  * \file
22  * \author Russell Bryant <russell@russellbryant.net>
23  *
24  * This module is based on and replaces the previous res_ais module.
25  */
26
27 /*** MODULEINFO
28         <depend>corosync</depend>
29         <defaultenabled>no</defaultenabled>
30         <support_level>extended</support_level>
31  ***/
32
33 #include "asterisk.h"
34
35 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
36
37 #include <corosync/cpg.h>
38 #include <corosync/cfg.h>
39
40 #include "asterisk/module.h"
41 #include "asterisk/logger.h"
42 #include "asterisk/poll-compat.h"
43 #include "asterisk/config.h"
44 #include "asterisk/event.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/devicestate.h"
47 #include "asterisk/app.h"
48 #include "asterisk/stasis.h"
49 #include "asterisk/stasis_message_router.h"
50
51 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
52
53 static void publish_mwi_to_stasis(struct ast_event *event);
54 static void publish_device_state_to_stasis(struct ast_event *event);
55
56 /*! \brief The internal topic used for message forwarding and pings */
57 static struct stasis_topic *corosync_aggregate_topic;
58
59 /*! \brief Our \ref stasis message router */
60 static struct stasis_message_router *stasis_router;
61
62 /*! \brief Internal accessor for our topic */
63 static struct stasis_topic *corosync_topic(void)
64 {
65         return corosync_aggregate_topic;
66 }
67
68 /*! \brief A payload wrapper around a corosync ping event */
69 struct corosync_ping_payload {
70         /*! The corosync ping event being passed over \ref stasis */
71         struct ast_event *event;
72 };
73
74 /*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
75 static void corosync_ping_payload_dtor(void *obj)
76 {
77         struct corosync_ping_payload *payload = obj;
78
79         ast_free(payload->event);
80 }
81
82 /*! \brief Convert a Corosync PING to a \ref ast_event */
83 static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
84 {
85         struct corosync_ping_payload *payload;
86         struct ast_event *event;
87         struct ast_eid *event_eid;
88
89         if (!message) {
90                 return NULL;
91         }
92
93         payload = stasis_message_data(message);
94
95         if (!payload->event) {
96                 return NULL;
97         }
98
99         event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
100
101         event = ast_event_new(AST_EVENT_PING,
102                                 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
103                                 AST_EVENT_IE_END);
104
105         return event;
106 }
107
108 STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
109         .to_event = corosync_ping_to_event, );
110
111 /*! \brief Publish a Corosync ping to \ref stasis */
112 static void publish_corosync_ping_to_stasis(struct ast_event *event)
113 {
114         struct corosync_ping_payload *payload;
115         struct stasis_message *message;
116
117         ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
118         ast_assert(event != NULL);
119
120         payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
121         if (!payload) {
122                 return;
123         }
124         payload->event = event;
125
126         message = stasis_message_create(corosync_ping_message_type(), payload);
127         if (!message) {
128                 ao2_t_ref(payload, -1, "Destroy payload on off nominal");
129                 return;
130         }
131
132         stasis_publish(corosync_topic(), message);
133
134         ao2_t_ref(payload, -1, "Hand ref to stasis");
135         ao2_t_ref(message, -1, "Hand ref to stasis");
136 }
137
138 static struct {
139         const char *name;
140         struct stasis_forward *sub;
141         unsigned char publish;
142         unsigned char publish_default;
143         unsigned char subscribe;
144         unsigned char subscribe_default;
145         struct stasis_topic *(* topic_fn)(void);
146         struct stasis_cache *(* cache_fn)(void);
147         struct stasis_message_type *(* message_type_fn)(void);
148         void (* publish_to_stasis)(struct ast_event *);
149 } event_types[] = {
150         [AST_EVENT_MWI] = { .name = "mwi",
151                             .topic_fn = ast_mwi_topic_all,
152                             .cache_fn = ast_mwi_state_cache,
153                             .message_type_fn = ast_mwi_state_type,
154                             .publish_to_stasis = publish_mwi_to_stasis, },
155         [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
156                                             .topic_fn = ast_device_state_topic_all,
157                                             .cache_fn = ast_device_state_cache,
158                                             .message_type_fn = ast_device_state_message_type,
159                                             .publish_to_stasis = publish_device_state_to_stasis, },
160         [AST_EVENT_PING] = { .name = "ping",
161                              .publish_default = 1,
162                              .subscribe_default = 1,
163                              .topic_fn = corosync_topic,
164                              .message_type_fn = corosync_ping_message_type,
165                              .publish_to_stasis = publish_corosync_ping_to_stasis, },
166 };
167
168 static struct {
169         pthread_t id;
170         int alert_pipe[2];
171         unsigned int stop:1;
172 } dispatch_thread = {
173         .id = AST_PTHREADT_NULL,
174         .alert_pipe = { -1, -1 },
175 };
176
177 static cpg_handle_t cpg_handle;
178 static corosync_cfg_handle_t cfg_handle;
179
180 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
181 static void cfg_state_track_cb(
182                 corosync_cfg_state_notification_buffer_t *notification_buffer,
183                 cs_error_t error);
184 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
185
186 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
187                 corosync_cfg_shutdown_flags_t flags);
188
189 static corosync_cfg_callbacks_t cfg_callbacks = {
190 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
191         .corosync_cfg_state_track_callback = cfg_state_track_cb,
192 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
193         .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
194 };
195
196 /*! \brief Publish a received MWI \ref ast_event to \ref stasis */
197 static void publish_mwi_to_stasis(struct ast_event *event)
198 {
199         const char *mailbox;
200         const char *context;
201         unsigned int new_msgs;
202         unsigned int old_msgs;
203         struct ast_eid *event_eid;
204
205         ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
206
207         mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
208         context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
209         new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
210         old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
211         event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
212
213         if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
214                 return;
215         }
216
217         if (new_msgs > INT_MAX) {
218                 new_msgs = INT_MAX;
219         }
220
221         if (old_msgs > INT_MAX) {
222                 old_msgs = INT_MAX;
223         }
224
225         if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
226                                        (int)old_msgs, NULL, event_eid)) {
227                 char eid[16];
228                 ast_eid_to_str(eid, sizeof(eid), event_eid);
229                 ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
230                         mailbox, context, eid);
231         }
232 }
233
234 /*! \brief Publish a received device state \ref ast_event to \ref stasis */
235 static void publish_device_state_to_stasis(struct ast_event *event)
236 {
237         const char *device;
238         enum ast_device_state state;
239         unsigned int cachable;
240         struct ast_eid *event_eid;
241
242         ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
243
244         device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
245         state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
246         cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
247         event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
248
249         if (ast_strlen_zero(device)) {
250                 return;
251         }
252
253         if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
254                 char eid[16];
255                 ast_eid_to_str(eid, sizeof(eid), event_eid);
256                 ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
257                         device, eid);
258         }
259 }
260
261 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
262                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
263
264 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
265                 const struct cpg_address *member_list, size_t member_list_entries,
266                 const struct cpg_address *left_list, size_t left_list_entries,
267                 const struct cpg_address *joined_list, size_t joined_list_entries);
268
269 static cpg_callbacks_t cpg_callbacks = {
270         .cpg_deliver_fn = cpg_deliver_cb,
271         .cpg_confchg_fn = cpg_confchg_cb,
272 };
273
274 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
275 static void cfg_state_track_cb(
276                 corosync_cfg_state_notification_buffer_t *notification_buffer,
277                 cs_error_t error)
278 {
279 }
280 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
281
282 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
283                 corosync_cfg_shutdown_flags_t flags)
284 {
285 }
286
287 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
288                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
289 {
290         struct ast_event *event;
291         void (*publish_handler)(struct ast_event *) = NULL;
292         enum ast_event_type event_type;
293
294         if (msg_len < ast_event_minimum_length()) {
295                 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
296                         (unsigned int) msg_len,
297                         (unsigned int) ast_event_minimum_length());
298                 return;
299         }
300
301         if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
302                 /* Don't feed events back in that originated locally. */
303                 return;
304         }
305
306         event_type = ast_event_get_type(msg);
307         if (event_type > AST_EVENT_TOTAL) {
308                 /* Egads, we don't support this */
309                 return;
310         }
311
312         ast_rwlock_rdlock(&event_types_lock);
313         publish_handler = event_types[event_type].publish_to_stasis;
314         if (!event_types[event_type].subscribe || !publish_handler) {
315                 /* We are not configured to subscribe to these events or
316                    we have no way to publish it internally. */
317                 ast_rwlock_unlock(&event_types_lock);
318                 return;
319         }
320         ast_rwlock_unlock(&event_types_lock);
321
322         if (!(event = ast_malloc(msg_len))) {
323                 return;
324         }
325
326         memcpy(event, msg, msg_len);
327
328         if (event_type == AST_EVENT_PING) {
329                 const struct ast_eid *eid;
330                 char buf[128] = "";
331
332                 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
333                 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
334                 ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
335         }
336         ast_debug(5, "Publishing event %s (%d) to stasis\n",
337                 ast_event_get_type_name(event), event_type);
338         publish_handler(event);
339 }
340
341 static void publish_to_corosync(struct stasis_message *message)
342 {
343         cs_error_t cs_err;
344         struct iovec iov;
345         struct ast_event *event;
346
347         event = stasis_message_to_event(message);
348         if (!event) {
349                 return;
350         }
351
352         if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
353                 /* If the event didn't originate from this server, don't send it back out. */
354                 ast_event_destroy(event);
355                 return;
356         }
357
358         if (ast_event_get_type(event) == AST_EVENT_PING) {
359                 const struct ast_eid *eid;
360                 char buf[128] = "";
361
362                 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
363                 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
364                 ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
365         }
366
367         iov.iov_base = (void *)event;
368         iov.iov_len = ast_event_get_size(event);
369
370         ast_debug(5, "Publishing event %s (%d) to corosync\n",
371                 ast_event_get_type_name(event), ast_event_get_type(event));
372
373         /* The stasis subscription will only exist if we are configured to publish
374          * these events, so just send away. */
375         if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
376                 ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
377         }
378 }
379
380 static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
381 {
382         if (!message) {
383                 return;
384         }
385
386         publish_to_corosync(message);
387 }
388
389 static int dump_cache_cb(void *obj, void *arg, int flags)
390 {
391         struct stasis_message *message = obj;
392
393         if (!message) {
394                 return 0;
395         }
396
397         publish_to_corosync(message);
398
399         return 0;
400 }
401
402 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
403                 const struct cpg_address *member_list, size_t member_list_entries,
404                 const struct cpg_address *left_list, size_t left_list_entries,
405                 const struct cpg_address *joined_list, size_t joined_list_entries)
406 {
407         unsigned int i;
408
409         /* If any new nodes have joined, dump our cache of events we are publishing
410          * that originated from this server. */
411
412         if (!joined_list_entries) {
413                 return;
414         }
415
416         for (i = 0; i < ARRAY_LEN(event_types); i++) {
417                 struct ao2_container *messages;
418
419                 ast_rwlock_rdlock(&event_types_lock);
420                 if (!event_types[i].publish) {
421                         ast_rwlock_unlock(&event_types_lock);
422                         continue;
423                 }
424
425                 if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
426                         ast_rwlock_unlock(&event_types_lock);
427                         continue;
428                 }
429
430                 messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
431                         event_types[i].message_type_fn(),
432                         &ast_eid_default);
433                 ast_rwlock_unlock(&event_types_lock);
434
435                 ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
436
437                 ao2_t_ref(messages, -1, "Dispose of dumped cache");
438         }
439 }
440
441 static void *dispatch_thread_handler(void *data)
442 {
443         cs_error_t cs_err;
444         struct pollfd pfd[3] = {
445                 { .events = POLLIN, },
446                 { .events = POLLIN, },
447                 { .events = POLLIN, },
448         };
449
450         if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
451                 ast_log(LOG_ERROR, "Failed to get CPG fd.  This module is now broken.\n");
452                 return NULL;
453         }
454
455         if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
456                 ast_log(LOG_ERROR, "Failed to get CFG fd.  This module is now broken.\n");
457                 return NULL;
458         }
459
460         pfd[2].fd = dispatch_thread.alert_pipe[0];
461
462         while (!dispatch_thread.stop) {
463                 int res;
464
465                 cs_err = CS_OK;
466
467                 pfd[0].revents = 0;
468                 pfd[1].revents = 0;
469                 pfd[2].revents = 0;
470
471                 res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
472                 if (res == -1 && errno != EINTR && errno != EAGAIN) {
473                         ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
474                         continue;
475                 }
476
477                 if (pfd[0].revents & POLLIN) {
478                         if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
479                                 ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
480                         }
481                 }
482
483                 if (pfd[1].revents & POLLIN) {
484                         if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
485                                 ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
486                         }
487                 }
488
489                 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
490                         struct cpg_name name;
491
492                         /* If corosync gets restarted out from under Asterisk, try to recover. */
493
494                         ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
495
496                         if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
497                                 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
498                                 sleep(5);
499                                 continue;
500                         }
501
502                         if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
503                                 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
504                                 sleep(5);
505                                 continue;
506                         }
507
508                         if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
509                                 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
510                                 sleep(5);
511                                 continue;
512                         }
513
514                         if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
515                                 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
516                                 sleep(5);
517                                 continue;
518                         }
519
520                         ast_copy_string(name.value, "asterisk", sizeof(name.value));
521                         name.length = strlen(name.value);
522                         if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
523                                 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
524                                 sleep(5);
525                                 continue;
526                         }
527
528                         ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
529                 }
530         }
531
532         return NULL;
533 }
534
535 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
536 {
537         cs_error_t cs_err;
538         cpg_iteration_handle_t cpg_iter;
539         struct cpg_iteration_description_t cpg_desc;
540         unsigned int i;
541
542         switch (cmd) {
543         case CLI_INIT:
544                 e->command = "corosync show members";
545                 e->usage =
546                         "Usage: corosync show members\n"
547                         "       Show corosync cluster members\n";
548                 return NULL;
549
550         case CLI_GENERATE:
551                 return NULL;    /* no completion */
552         }
553
554         if (a->argc != e->args) {
555                 return CLI_SHOWUSAGE;
556         }
557
558         cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
559
560         if (cs_err != CS_OK) {
561                 ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
562                 return CLI_FAILURE;
563         }
564
565         ast_cli(a->fd, "\n"
566                     "=============================================================\n"
567                     "=== Cluster members =========================================\n"
568                     "=============================================================\n"
569                     "===\n");
570
571         for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
572                         cs_err == CS_OK;
573                         cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
574                 corosync_cfg_node_address_t addrs[8];
575                 int num_addrs = 0;
576                 unsigned int j;
577
578                 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
579                                 ARRAY_LEN(addrs), &num_addrs, addrs);
580                 if (cs_err != CS_OK) {
581                         ast_log(LOG_WARNING, "Failed to get node addresses\n");
582                         continue;
583                 }
584
585                 ast_cli(a->fd, "=== Node %d\n", i);
586                 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
587
588                 for (j = 0; j < num_addrs; j++) {
589                         struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
590                         size_t sa_len = (size_t) addrs[j].address_length;
591                         char buf[128];
592
593                         getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
594
595                         ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
596                 }
597
598         }
599
600         ast_cli(a->fd, "===\n"
601                        "=============================================================\n"
602                        "\n");
603
604         cpg_iteration_finalize(cpg_iter);
605
606         return CLI_SUCCESS;
607 }
608
609 static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
610 {
611         struct ast_event *event;
612
613         switch (cmd) {
614         case CLI_INIT:
615                 e->command = "corosync ping";
616                 e->usage =
617                         "Usage: corosync ping\n"
618                         "       Send a test ping to the cluster.\n"
619                         "A NOTICE will be in the log for every ping received\n"
620                         "on a server.\n  If you send a ping, you should see a NOTICE\n"
621                         "in the log for every server in the cluster.\n";
622                 return NULL;
623
624         case CLI_GENERATE:
625                 return NULL;    /* no completion */
626         }
627
628         if (a->argc != e->args) {
629                 return CLI_SHOWUSAGE;
630         }
631
632         event = ast_event_new(AST_EVENT_PING, AST_EVENT_IE_END);
633
634         if (!event) {
635                 return CLI_FAILURE;
636         }
637
638         ast_rwlock_rdlock(&event_types_lock);
639         event_types[AST_EVENT_PING].publish_to_stasis(event);
640         ast_rwlock_unlock(&event_types_lock);
641
642         return CLI_SUCCESS;
643 }
644
645 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
646 {
647         unsigned int i;
648
649         switch (cmd) {
650         case CLI_INIT:
651                 e->command = "corosync show config";
652                 e->usage =
653                         "Usage: corosync show config\n"
654                         "       Show configuration loaded from res_corosync.conf\n";
655                 return NULL;
656
657         case CLI_GENERATE:
658                 return NULL;    /* no completion */
659         }
660
661         if (a->argc != e->args) {
662                 return CLI_SHOWUSAGE;
663         }
664
665         ast_cli(a->fd, "\n"
666                     "=============================================================\n"
667                     "=== res_corosync config =====================================\n"
668                     "=============================================================\n"
669                     "===\n");
670
671         ast_rwlock_rdlock(&event_types_lock);
672         for (i = 0; i < ARRAY_LEN(event_types); i++) {
673                 if (event_types[i].publish) {
674                         ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
675                                         event_types[i].name);
676                 }
677                 if (event_types[i].subscribe) {
678                         ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
679                                         event_types[i].name);
680                 }
681         }
682         ast_rwlock_unlock(&event_types_lock);
683
684         ast_cli(a->fd, "===\n"
685                        "=============================================================\n"
686                        "\n");
687
688         return CLI_SUCCESS;
689 }
690
691 static struct ast_cli_entry corosync_cli[] = {
692         AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
693         AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
694         AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
695 };
696
697 enum {
698         PUBLISH,
699         SUBSCRIBE,
700 };
701
702 static int set_event(const char *event_type, int pubsub)
703 {
704         unsigned int i;
705
706         for (i = 0; i < ARRAY_LEN(event_types); i++) {
707                 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
708                         continue;
709                 }
710
711                 switch (pubsub) {
712                 case PUBLISH:
713                         event_types[i].publish = 1;
714                         break;
715                 case SUBSCRIBE:
716                         event_types[i].subscribe = 1;
717                         break;
718                 }
719
720                 break;
721         }
722
723         return (i == ARRAY_LEN(event_types)) ? -1 : 0;
724 }
725
726 static int load_general_config(struct ast_config *cfg)
727 {
728         struct ast_variable *v;
729         int res = 0;
730         unsigned int i;
731
732         ast_rwlock_wrlock(&event_types_lock);
733
734         for (i = 0; i < ARRAY_LEN(event_types); i++) {
735                 event_types[i].publish = event_types[i].publish_default;
736                 event_types[i].subscribe = event_types[i].subscribe_default;
737         }
738
739         for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
740                 if (!strcasecmp(v->name, "publish_event")) {
741                         res = set_event(v->value, PUBLISH);
742                 } else if (!strcasecmp(v->name, "subscribe_event")) {
743                         res = set_event(v->value, SUBSCRIBE);
744                 } else {
745                         ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
746                 }
747         }
748
749         for (i = 0; i < ARRAY_LEN(event_types); i++) {
750                 if (event_types[i].publish && !event_types[i].sub) {
751                         event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
752                                                                                                         corosync_topic());
753                         stasis_message_router_add(stasis_router,
754                                                   event_types[i].message_type_fn(),
755                                                   stasis_message_cb,
756                                                   NULL);
757                 } else if (!event_types[i].publish && event_types[i].sub) {
758                         event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
759                         stasis_message_router_remove(stasis_router,
760                                                      event_types[i].message_type_fn());
761                 }
762         }
763
764         ast_rwlock_unlock(&event_types_lock);
765
766         return res;
767 }
768
769 static int load_config(unsigned int reload)
770 {
771         static const char filename[] = "res_corosync.conf";
772         struct ast_config *cfg;
773         const char *cat = NULL;
774         struct ast_flags config_flags = { 0 };
775         int res = 0;
776
777         cfg = ast_config_load(filename, config_flags);
778
779         if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
780                 return -1;
781         }
782
783         while ((cat = ast_category_browse(cfg, cat))) {
784                 if (!strcasecmp(cat, "general")) {
785                         res = load_general_config(cfg);
786                 } else {
787                         ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
788                 }
789         }
790
791         ast_config_destroy(cfg);
792
793         return res;
794 }
795
796 static void cleanup_module(void)
797 {
798         cs_error_t cs_err;
799         unsigned int i;
800
801         if (stasis_router) {
802
803                 /* Unsubscribe all topic forwards and cancel all message routes */
804                 ast_rwlock_wrlock(&event_types_lock);
805                 for (i = 0; i < ARRAY_LEN(event_types); i++) {
806                         if (event_types[i].sub) {
807                                 event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
808                                 stasis_message_router_remove(stasis_router,
809                                                              event_types[i].message_type_fn());
810                         }
811                         event_types[i].publish = 0;
812                         event_types[i].subscribe = 0;
813                 }
814                 ast_rwlock_unlock(&event_types_lock);
815
816                 stasis_message_router_unsubscribe_and_join(stasis_router);
817                 stasis_router = NULL;
818         }
819
820         if (corosync_aggregate_topic) {
821                 ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
822                 corosync_aggregate_topic = NULL;
823         }
824
825         STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
826
827         if (dispatch_thread.id != AST_PTHREADT_NULL) {
828                 char meepmeep = 'x';
829                 dispatch_thread.stop = 1;
830                 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
831                                         5000) == -1) {
832                         ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
833                                         strerror(errno), errno);
834                 }
835                 pthread_join(dispatch_thread.id, NULL);
836         }
837
838         if (dispatch_thread.alert_pipe[0] != -1) {
839                 close(dispatch_thread.alert_pipe[0]);
840                 dispatch_thread.alert_pipe[0] = -1;
841         }
842
843         if (dispatch_thread.alert_pipe[1] != -1) {
844                 close(dispatch_thread.alert_pipe[1]);
845                 dispatch_thread.alert_pipe[1] = -1;
846         }
847
848         if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
849                 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
850         }
851         cpg_handle = 0;
852
853         if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
854                 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
855         }
856         cfg_handle = 0;
857 }
858
859 static int load_module(void)
860 {
861         cs_error_t cs_err;
862         enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
863         struct cpg_name name;
864
865         corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
866         if (!corosync_aggregate_topic) {
867                 ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
868                 goto failed;
869         }
870
871         stasis_router = stasis_message_router_create(corosync_aggregate_topic);
872         if (!stasis_router) {
873                 ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
874                 goto failed;
875         }
876
877         if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
878                 ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
879                 goto failed;
880         }
881
882         if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
883                 ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
884                 goto failed;
885         }
886
887         if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
888                 ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
889                 goto failed;
890         }
891
892         ast_copy_string(name.value, "asterisk", sizeof(name.value));
893         name.length = strlen(name.value);
894
895         if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
896                 ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
897                 goto failed;
898         }
899
900         if (pipe(dispatch_thread.alert_pipe) == -1) {
901                 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
902                                 strerror(errno), errno);
903                 goto failed;
904         }
905
906         if (ast_pthread_create_background(&dispatch_thread.id, NULL,
907                         dispatch_thread_handler, NULL)) {
908                 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
909                 goto failed;
910         }
911
912         if (load_config(0)) {
913                 /* simply not configured is not a fatal error */
914                 res = AST_MODULE_LOAD_DECLINE;
915                 goto failed;
916         }
917
918         ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
919
920         return AST_MODULE_LOAD_SUCCESS;
921
922 failed:
923         cleanup_module();
924
925         return res;
926 }
927
928 static int unload_module(void)
929 {
930         ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
931
932         cleanup_module();
933
934         return 0;
935 }
936
937 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync");