Removing registrar_expire from basic-pbx config
[asterisk/asterisk.git] / res / res_corosync.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007, Digium, Inc.
5  * Copyright (C) 2012, Russell Bryant
6  *
7  * Russell Bryant <russell@russellbryant.net>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19
20 /*!
21  * \file
22  * \author Russell Bryant <russell@russellbryant.net>
23  *
24  * This module is based on and replaces the previous res_ais module.
25  */
26
27 /*** MODULEINFO
28         <depend>corosync</depend>
29         <defaultenabled>no</defaultenabled>
30         <support_level>extended</support_level>
31  ***/
32
33 #include "asterisk.h"
34
35 #include <corosync/cpg.h>
36 #include <corosync/cfg.h>
37
38 #include "asterisk/module.h"
39 #include "asterisk/logger.h"
40 #include "asterisk/poll-compat.h"
41 #include "asterisk/config.h"
42 #include "asterisk/event.h"
43 #include "asterisk/cli.h"
44 #include "asterisk/devicestate.h"
45 #include "asterisk/app.h"
46 #include "asterisk/stasis.h"
47 #include "asterisk/stasis_message_router.h"
48 #include "asterisk/stasis_system.h"
49
50 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
51
52 static void publish_mwi_to_stasis(struct ast_event *event);
53 static void publish_device_state_to_stasis(struct ast_event *event);
54 static void publish_cluster_discovery_to_stasis(struct ast_event *event);
55
56 /*! \brief All the nodes that we're aware of */
57 static struct ao2_container *nodes;
58
59 /*! \brief The internal topic used for message forwarding and pings */
60 static struct stasis_topic *corosync_aggregate_topic;
61
62 /*! \brief Our \ref stasis message router */
63 static struct stasis_message_router *stasis_router;
64
65 /*! \brief Internal accessor for our topic */
66 static struct stasis_topic *corosync_topic(void)
67 {
68         return corosync_aggregate_topic;
69 }
70
71 struct corosync_node {
72         /*! The corosync ID */
73         int id;
74         /*! The Asterisk EID */
75         struct ast_eid eid;
76         /*! The IP address of the node */
77         struct ast_sockaddr addr;
78 };
79
80 /*! \brief Corosync ipc dispatch/request and reply size */
81 #define COROSYNC_IPC_BUFFER_SIZE                                (8192 * 128)
82
83 /*! \brief Version of pthread_create to ensure stack is large enough */
84 #define corosync_pthread_create_background(a, b, c, d)                          \
85         ast_pthread_create_stack(a, b, c, d,                                    \
86                 (AST_BACKGROUND_STACKSIZE + (3 * COROSYNC_IPC_BUFFER_SIZE)),    \
87                 __FILE__, __FUNCTION__, __LINE__, #c)
88
89 static struct corosync_node *corosync_node_alloc(struct ast_event *event)
90 {
91         struct corosync_node *node;
92
93         node = ao2_alloc_options(sizeof(*node), NULL, AO2_ALLOC_OPT_LOCK_NOLOCK);
94         if (!node) {
95                 return NULL;
96         }
97
98         memcpy(&node->eid, (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID), sizeof(node->eid));
99         node->id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
100         ast_sockaddr_parse(&node->addr, ast_event_get_ie_str(event, AST_EVENT_IE_LOCAL_ADDR), PARSE_PORT_IGNORE);
101
102         return node;
103 }
104
105 static int corosync_node_hash_fn(const void *obj, const int flags)
106 {
107         const struct corosync_node *node;
108         const int *id;
109
110         switch (flags & OBJ_SEARCH_MASK) {
111         case OBJ_SEARCH_KEY:
112                 id = obj;
113                 break;
114         case OBJ_SEARCH_OBJECT:
115                 node = obj;
116                 id = &node->id;
117                 break;
118         default:
119                 ast_assert(0);
120                 return 0;
121         }
122         return *id;
123 }
124
125 static int corosync_node_cmp_fn(void *obj, void *arg, int flags)
126 {
127         struct corosync_node *left = obj;
128         struct corosync_node *right = arg;
129         const int *id = arg;
130         int cmp;
131
132         switch (flags & OBJ_SEARCH_MASK) {
133         case OBJ_SEARCH_OBJECT:
134                 id = &right->id;
135                 /* Fall through */
136         case OBJ_SEARCH_KEY:
137                 cmp = (left->id == *id);
138                 break;
139         case OBJ_SEARCH_PARTIAL_KEY:
140                 cmp = (left->id == right->id);
141                 break;
142         default:
143                 /* Sort can only work on something with a full or partial key. */
144                 ast_assert(0);
145                 cmp = 1;
146                 break;
147         }
148         return cmp ? CMP_MATCH : 0;
149 }
150
151
152 /*! \brief A payload wrapper around a corosync ping event */
153 struct corosync_ping_payload {
154         /*! The corosync ping event being passed over \ref stasis */
155         struct ast_event *event;
156 };
157
158 /*! \brief Destructor for the \ref corosync_ping_payload wrapper object */
159 static void corosync_ping_payload_dtor(void *obj)
160 {
161         struct corosync_ping_payload *payload = obj;
162
163         ast_free(payload->event);
164 }
165
166 /*! \brief Convert a Corosync PING to a \ref ast_event */
167 static struct ast_event *corosync_ping_to_event(struct stasis_message *message)
168 {
169         struct corosync_ping_payload *payload;
170         struct ast_event *event;
171         struct ast_eid *event_eid;
172
173         if (!message) {
174                 return NULL;
175         }
176
177         payload = stasis_message_data(message);
178
179         if (!payload->event) {
180                 return NULL;
181         }
182
183         event_eid = (struct ast_eid *)ast_event_get_ie_raw(payload->event, AST_EVENT_IE_EID);
184
185         event = ast_event_new(AST_EVENT_PING,
186                                 AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, event_eid, sizeof(*event_eid),
187                                 AST_EVENT_IE_END);
188
189         return event;
190 }
191
192 STASIS_MESSAGE_TYPE_DEFN_LOCAL(corosync_ping_message_type,
193         .to_event = corosync_ping_to_event, );
194
195 /*! \brief Publish a Corosync ping to \ref stasis */
196 static void publish_corosync_ping_to_stasis(struct ast_event *event)
197 {
198         struct corosync_ping_payload *payload;
199         struct stasis_message *message;
200
201         ast_assert(ast_event_get_type(event) == AST_EVENT_PING);
202         ast_assert(event != NULL);
203
204         if (!corosync_ping_message_type()) {
205                 return;
206         }
207
208         payload = ao2_t_alloc(sizeof(*payload), corosync_ping_payload_dtor, "Create ping payload");
209         if (!payload) {
210                 return;
211         }
212         payload->event = event;
213
214         message = stasis_message_create(corosync_ping_message_type(), payload);
215         if (!message) {
216                 ao2_t_ref(payload, -1, "Destroy payload on off nominal");
217                 return;
218         }
219
220         stasis_publish(corosync_topic(), message);
221
222         ao2_t_ref(payload, -1, "Hand ref to stasis");
223         ao2_t_ref(message, -1, "Hand ref to stasis");
224 }
225
226 static struct {
227         const char *name;
228         struct stasis_forward *sub;
229         unsigned char publish;
230         unsigned char publish_default;
231         unsigned char subscribe;
232         unsigned char subscribe_default;
233         struct stasis_topic *(* topic_fn)(void);
234         struct stasis_cache *(* cache_fn)(void);
235         struct stasis_message_type *(* message_type_fn)(void);
236         void (* publish_to_stasis)(struct ast_event *);
237 } event_types[] = {
238         [AST_EVENT_MWI] = { .name = "mwi",
239                             .topic_fn = ast_mwi_topic_all,
240                             .cache_fn = ast_mwi_state_cache,
241                             .message_type_fn = ast_mwi_state_type,
242                             .publish_to_stasis = publish_mwi_to_stasis, },
243         [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state",
244                                             .topic_fn = ast_device_state_topic_all,
245                                             .cache_fn = ast_device_state_cache,
246                                             .message_type_fn = ast_device_state_message_type,
247                                             .publish_to_stasis = publish_device_state_to_stasis, },
248         [AST_EVENT_PING] = { .name = "ping",
249                              .publish_default = 1,
250                              .subscribe_default = 1,
251                              .topic_fn = corosync_topic,
252                              .message_type_fn = corosync_ping_message_type,
253                              .publish_to_stasis = publish_corosync_ping_to_stasis, },
254         [AST_EVENT_CLUSTER_DISCOVERY] = { .name = "cluster_discovery",
255                                           .publish_default = 1,
256                                           .subscribe_default = 1,
257                                           .topic_fn = ast_system_topic,
258                                           .message_type_fn = ast_cluster_discovery_type,
259                                           .publish_to_stasis = publish_cluster_discovery_to_stasis, },
260 };
261
262 static struct {
263         pthread_t id;
264         int alert_pipe[2];
265         unsigned int stop:1;
266 } dispatch_thread = {
267         .id = AST_PTHREADT_NULL,
268         .alert_pipe = { -1, -1 },
269 };
270
271 static cpg_handle_t cpg_handle;
272 static corosync_cfg_handle_t cfg_handle;
273
274 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
275 static void cfg_state_track_cb(
276                 corosync_cfg_state_notification_buffer_t *notification_buffer,
277                 cs_error_t error);
278 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
279
280 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
281                 corosync_cfg_shutdown_flags_t flags);
282
283 static corosync_cfg_callbacks_t cfg_callbacks = {
284 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
285         .corosync_cfg_state_track_callback = cfg_state_track_cb,
286 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
287         .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
288 };
289
290 /*! \brief Publish cluster discovery to \ref stasis */
291 static void publish_cluster_discovery_to_stasis_full(struct corosync_node *node, int joined)
292 {
293         struct ast_json *json;
294         struct ast_json_payload *payload;
295         struct stasis_message *message;
296         char eid[18];
297         const char *addr;
298
299         ast_eid_to_str(eid, sizeof(eid), &node->eid);
300         addr = ast_sockaddr_stringify_addr(&node->addr);
301
302         ast_log(AST_LOG_NOTICE, "Node %u (%s) at %s %s the cluster\n",
303                 node->id,
304                 eid,
305                 addr,
306                 joined ? "joined" : "left");
307
308         json = ast_json_pack("{s: s, s: i, s: s, s: i}",
309                 "address", addr,
310                 "node_id", node->id,
311                 "eid", eid,
312                 "joined", joined);
313         if (!json) {
314                 return;
315         }
316
317         payload = ast_json_payload_create(json);
318         if (!payload) {
319                 ast_json_unref(json);
320                 return;
321         }
322
323         message = stasis_message_create(ast_cluster_discovery_type(), payload);
324         if (!message) {
325                 ast_json_unref(json);
326                 ao2_ref(payload, -1);
327                 return;
328         }
329
330         stasis_publish(ast_system_topic(), message);
331         ast_json_unref(json);
332         ao2_ref(payload, -1);
333         ao2_ref(message, -1);
334 }
335
336 static void send_cluster_notify(void);
337
338 /*! \brief Publish a received cluster discovery \ref ast_event to \ref stasis */
339 static void publish_cluster_discovery_to_stasis(struct ast_event *event)
340 {
341         struct corosync_node *node;
342         int id = ast_event_get_ie_uint(event, AST_EVENT_IE_NODE_ID);
343         struct ast_eid *event_eid;
344
345         ast_assert(ast_event_get_type(event) == AST_EVENT_CLUSTER_DISCOVERY);
346
347         event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
348         if (!ast_eid_cmp(&ast_eid_default, event_eid)) {
349                 /* Don't feed events back in that originated locally. */
350                 return;
351         }
352
353         ao2_lock(nodes);
354         node = ao2_find(nodes, &id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
355         if (node) {
356                 /* We already know about this node */
357                 ao2_unlock(nodes);
358                 ao2_ref(node, -1);
359                 return;
360         }
361
362         node = corosync_node_alloc(event);
363         if (!node) {
364                 ao2_unlock(nodes);
365                 return;
366         }
367         ao2_link_flags(nodes, node, OBJ_NOLOCK);
368         ao2_unlock(nodes);
369
370         publish_cluster_discovery_to_stasis_full(node, 1);
371
372         ao2_ref(node, -1);
373
374         /*
375          * When we get news that someone else has joined, we need to let them
376          * know we exist as well.
377          */
378         send_cluster_notify();
379 }
380
381 /*! \brief Publish a received MWI \ref ast_event to \ref stasis */
382 static void publish_mwi_to_stasis(struct ast_event *event)
383 {
384         const char *mailbox;
385         const char *context;
386         unsigned int new_msgs;
387         unsigned int old_msgs;
388         struct ast_eid *event_eid;
389
390         ast_assert(ast_event_get_type(event) == AST_EVENT_MWI);
391
392         mailbox = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
393         context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
394         new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
395         old_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
396         event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
397
398         if (ast_strlen_zero(mailbox) || ast_strlen_zero(context)) {
399                 return;
400         }
401
402         if (new_msgs > INT_MAX) {
403                 new_msgs = INT_MAX;
404         }
405
406         if (old_msgs > INT_MAX) {
407                 old_msgs = INT_MAX;
408         }
409
410         if (ast_publish_mwi_state_full(mailbox, context, (int)new_msgs,
411                                        (int)old_msgs, NULL, event_eid)) {
412                 char eid[18];
413                 ast_eid_to_str(eid, sizeof(eid), event_eid);
414                 ast_log(LOG_WARNING, "Failed to publish MWI message for %s@%s from %s\n",
415                         mailbox, context, eid);
416         }
417 }
418
419 /*! \brief Publish a received device state \ref ast_event to \ref stasis */
420 static void publish_device_state_to_stasis(struct ast_event *event)
421 {
422         const char *device;
423         enum ast_device_state state;
424         unsigned int cachable;
425         struct ast_eid *event_eid;
426
427         ast_assert(ast_event_get_type(event) == AST_EVENT_DEVICE_STATE_CHANGE);
428
429         device = ast_event_get_ie_str(event, AST_EVENT_IE_DEVICE);
430         state = ast_event_get_ie_uint(event, AST_EVENT_IE_STATE);
431         cachable = ast_event_get_ie_uint(event, AST_EVENT_IE_CACHABLE);
432         event_eid = (struct ast_eid *)ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
433
434         if (ast_strlen_zero(device)) {
435                 return;
436         }
437
438         if (ast_publish_device_state_full(device, state, cachable, event_eid)) {
439                 char eid[18];
440                 ast_eid_to_str(eid, sizeof(eid), event_eid);
441                 ast_log(LOG_WARNING, "Failed to publish device state message for %s from %s\n",
442                         device, eid);
443         }
444 }
445
446 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
447                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
448
449 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
450                 const struct cpg_address *member_list, size_t member_list_entries,
451                 const struct cpg_address *left_list, size_t left_list_entries,
452                 const struct cpg_address *joined_list, size_t joined_list_entries);
453
454 static cpg_callbacks_t cpg_callbacks = {
455         .cpg_deliver_fn = cpg_deliver_cb,
456         .cpg_confchg_fn = cpg_confchg_cb,
457 };
458
459 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
460 static void cfg_state_track_cb(
461                 corosync_cfg_state_notification_buffer_t *notification_buffer,
462                 cs_error_t error)
463 {
464 }
465 #endif /* HAVE_COROSYNC_CFG_STATE_TRACK */
466
467 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
468                 corosync_cfg_shutdown_flags_t flags)
469 {
470 }
471
472 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
473                 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
474 {
475         struct ast_event *event;
476         void (*publish_handler)(struct ast_event *) = NULL;
477         enum ast_event_type event_type;
478
479         if (msg_len < ast_event_minimum_length()) {
480                 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
481                         (unsigned int) msg_len,
482                         (unsigned int) ast_event_minimum_length());
483                 return;
484         }
485
486         if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
487                 /* Don't feed events back in that originated locally. */
488                 return;
489         }
490
491         event_type = ast_event_get_type(msg);
492         if (event_type > AST_EVENT_TOTAL) {
493                 /* Egads, we don't support this */
494                 return;
495         }
496
497         ast_rwlock_rdlock(&event_types_lock);
498         publish_handler = event_types[event_type].publish_to_stasis;
499         if (!event_types[event_type].subscribe || !publish_handler) {
500                 /* We are not configured to subscribe to these events or
501                    we have no way to publish it internally. */
502                 ast_rwlock_unlock(&event_types_lock);
503                 return;
504         }
505         ast_rwlock_unlock(&event_types_lock);
506
507         if (!(event = ast_malloc(msg_len))) {
508                 return;
509         }
510
511         memcpy(event, msg, msg_len);
512
513         if (event_type == AST_EVENT_PING) {
514                 const struct ast_eid *eid;
515                 char buf[128] = "";
516
517                 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
518                 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
519                 ast_log(LOG_NOTICE, "Got event PING from server with EID: '%s'\n", buf);
520         }
521         ast_debug(5, "Publishing event %s (%u) to stasis\n",
522                 ast_event_get_type_name(event), event_type);
523         publish_handler(event);
524 }
525
526 static void publish_event_to_corosync(struct ast_event *event)
527 {
528         cs_error_t cs_err;
529         struct iovec iov;
530
531         iov.iov_base = (void *)event;
532         iov.iov_len = ast_event_get_size(event);
533
534         ast_debug(5, "Publishing event %s (%u) to corosync\n",
535                 ast_event_get_type_name(event), ast_event_get_type(event));
536
537         /* The stasis subscription will only exist if we are configured to publish
538          * these events, so just send away. */
539         if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
540                 ast_log(LOG_WARNING, "CPG mcast failed (%u) for event %s (%u)\n",
541                         cs_err, ast_event_get_type_name(event), ast_event_get_type(event));
542         }
543 }
544
545 static void publish_to_corosync(struct stasis_message *message)
546 {
547         struct ast_event *event;
548
549         event = stasis_message_to_event(message);
550         if (!event) {
551                 return;
552         }
553
554         if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
555                 /* If the event didn't originate from this server, don't send it back out. */
556                 ast_event_destroy(event);
557                 return;
558         }
559
560         if (ast_event_get_type(event) == AST_EVENT_PING) {
561                 const struct ast_eid *eid;
562                 char buf[128] = "";
563
564                 eid = ast_event_get_ie_raw(event, AST_EVENT_IE_EID);
565                 ast_eid_to_str(buf, sizeof(buf), (struct ast_eid *) eid);
566                 ast_log(LOG_NOTICE, "Sending event PING from this server with EID: '%s'\n", buf);
567         }
568
569         publish_event_to_corosync(event);
570 }
571
572 static void stasis_message_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
573 {
574         if (!message) {
575                 return;
576         }
577
578         publish_to_corosync(message);
579 }
580
581 static int dump_cache_cb(void *obj, void *arg, int flags)
582 {
583         struct stasis_message *message = obj;
584
585         if (!message) {
586                 return 0;
587         }
588
589         publish_to_corosync(message);
590
591         return 0;
592 }
593
594 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
595                 const struct cpg_address *member_list, size_t member_list_entries,
596                 const struct cpg_address *left_list, size_t left_list_entries,
597                 const struct cpg_address *joined_list, size_t joined_list_entries)
598 {
599         unsigned int i;
600
601
602         for (i = 0; i < left_list_entries; i++) {
603                 const struct cpg_address *cpg_node = &left_list[i];
604                 struct corosync_node* node;
605
606                 node = ao2_find(nodes, &cpg_node->nodeid, OBJ_UNLINK | OBJ_SEARCH_KEY);
607                 if (!node) {
608                         continue;
609                 }
610
611                 publish_cluster_discovery_to_stasis_full(node, 0);
612                 ao2_ref(node, -1);
613         }
614
615         /* If any new nodes have joined, dump our cache of events we are publishing
616          * that originated from this server. */
617         if (!joined_list_entries) {
618                 return;
619         }
620
621         for (i = 0; i < ARRAY_LEN(event_types); i++) {
622                 struct ao2_container *messages;
623
624                 ast_rwlock_rdlock(&event_types_lock);
625                 if (!event_types[i].publish) {
626                         ast_rwlock_unlock(&event_types_lock);
627                         continue;
628                 }
629
630                 if (!event_types[i].cache_fn || !event_types[i].message_type_fn) {
631                         ast_rwlock_unlock(&event_types_lock);
632                         continue;
633                 }
634
635                 messages = stasis_cache_dump_by_eid(event_types[i].cache_fn(),
636                         event_types[i].message_type_fn(),
637                         &ast_eid_default);
638                 ast_rwlock_unlock(&event_types_lock);
639
640                 ao2_callback(messages, OBJ_NODATA, dump_cache_cb, NULL);
641
642                 ao2_t_ref(messages, -1, "Dispose of dumped cache");
643         }
644 }
645
646 /*! \brief Informs the cluster of our EID and our IP addresses */
647 static void send_cluster_notify(void)
648 {
649         struct ast_event *event;
650         unsigned int node_id;
651         cs_error_t cs_err;
652         corosync_cfg_node_address_t corosync_addr;
653         int num_addrs = 0;
654         struct sockaddr *sa;
655         size_t sa_len;
656         char buf[128];
657         int res;
658
659         if ((cs_err = corosync_cfg_local_get(cfg_handle, &node_id)) != CS_OK) {
660                 ast_log(LOG_WARNING, "Failed to extract Corosync node ID for this node. Not informing cluster of existance.\n");
661                 return;
662         }
663
664         if (((cs_err = corosync_cfg_get_node_addrs(cfg_handle, node_id, 1, &num_addrs, &corosync_addr)) != CS_OK) || (num_addrs < 1)) {
665                 ast_log(LOG_WARNING, "Failed to get local Corosync address. Not informing cluster of existance.\n");
666                 return;
667         }
668
669         sa = (struct sockaddr *)corosync_addr.address;
670         sa_len = (size_t)corosync_addr.address_length;
671         if ((res = getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST))) {
672                 ast_log(LOG_WARNING, "Failed to determine name of local Corosync address: %s (%d). Not informing cluster of existance.\n",
673                         gai_strerror(res), res);
674                 return;
675         }
676
677         event = ast_event_new(AST_EVENT_CLUSTER_DISCOVERY,
678                                     AST_EVENT_IE_NODE_ID, AST_EVENT_IE_PLTYPE_UINT, node_id,
679                                     AST_EVENT_IE_LOCAL_ADDR, AST_EVENT_IE_PLTYPE_STR, buf,
680                                     AST_EVENT_IE_END);
681         publish_event_to_corosync(event);
682         ast_free(event);
683 }
684
685 static void *dispatch_thread_handler(void *data)
686 {
687         cs_error_t cs_err;
688         struct pollfd pfd[3] = {
689                 { .events = POLLIN, },
690                 { .events = POLLIN, },
691                 { .events = POLLIN, },
692         };
693
694         if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
695                 ast_log(LOG_ERROR, "Failed to get CPG fd.  This module is now broken.\n");
696                 return NULL;
697         }
698
699         if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
700                 ast_log(LOG_ERROR, "Failed to get CFG fd.  This module is now broken.\n");
701                 return NULL;
702         }
703
704         pfd[2].fd = dispatch_thread.alert_pipe[0];
705
706         send_cluster_notify();
707         while (!dispatch_thread.stop) {
708                 int res;
709
710                 cs_err = CS_OK;
711
712                 pfd[0].revents = 0;
713                 pfd[1].revents = 0;
714                 pfd[2].revents = 0;
715
716                 res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
717                 if (res == -1 && errno != EINTR && errno != EAGAIN) {
718                         ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
719                         continue;
720                 }
721
722                 if (pfd[0].revents & POLLIN) {
723                         if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
724                                 ast_log(LOG_WARNING, "Failed CPG dispatch: %u\n", cs_err);
725                         }
726                 }
727
728                 if (pfd[1].revents & POLLIN) {
729                         if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
730                                 ast_log(LOG_WARNING, "Failed CFG dispatch: %u\n", cs_err);
731                         }
732                 }
733
734                 if (cs_err == CS_ERR_LIBRARY || cs_err == CS_ERR_BAD_HANDLE) {
735                         struct cpg_name name;
736
737                         /* If corosync gets restarted out from under Asterisk, try to recover. */
738
739                         ast_log(LOG_NOTICE, "Attempting to recover from corosync failure.\n");
740
741                         if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
742                                 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
743                                 sleep(5);
744                                 continue;
745                         }
746
747                         if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks) != CS_OK)) {
748                                 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
749                                 sleep(5);
750                                 continue;
751                         }
752
753                         if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
754                                 ast_log(LOG_ERROR, "Failed to get CPG fd.\n");
755                                 sleep(5);
756                                 continue;
757                         }
758
759                         if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
760                                 ast_log(LOG_ERROR, "Failed to get CFG fd.\n");
761                                 sleep(5);
762                                 continue;
763                         }
764
765                         ast_copy_string(name.value, "asterisk", sizeof(name.value));
766                         name.length = strlen(name.value);
767                         if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
768                                 ast_log(LOG_ERROR, "Failed to join cpg (%d)\n", (int) cs_err);
769                                 sleep(5);
770                                 continue;
771                         }
772
773                         ast_log(LOG_NOTICE, "Corosync recovery complete.\n");
774                         send_cluster_notify();
775                 }
776         }
777
778         return NULL;
779 }
780
781 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
782 {
783         cs_error_t cs_err;
784         cpg_iteration_handle_t cpg_iter;
785         struct cpg_iteration_description_t cpg_desc;
786         unsigned int i;
787
788         switch (cmd) {
789         case CLI_INIT:
790                 e->command = "corosync show members";
791                 e->usage =
792                         "Usage: corosync show members\n"
793                         "       Show corosync cluster members\n";
794                 return NULL;
795
796         case CLI_GENERATE:
797                 return NULL;    /* no completion */
798         }
799
800         if (a->argc != e->args) {
801                 return CLI_SHOWUSAGE;
802         }
803
804         cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
805
806         if (cs_err != CS_OK) {
807                 ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
808                 return CLI_FAILURE;
809         }
810
811         ast_cli(a->fd, "\n"
812                     "=============================================================\n"
813                     "=== Cluster members =========================================\n"
814                     "=============================================================\n"
815                     "===\n");
816
817         for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
818                         cs_err == CS_OK;
819                         cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
820 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
821                 corosync_cfg_node_address_t addrs[8];
822                 int num_addrs = 0;
823                 unsigned int j;
824 #endif
825
826                 ast_cli(a->fd, "=== Node %u\n", i);
827                 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
828
829 #ifdef HAVE_COROSYNC_CFG_STATE_TRACK
830                 /*
831                  * Corosync 2.x cfg lib needs to allocate 1M on stack after calling
832                  * corosync_cfg_get_node_addrs. netconsole thread has allocated only 0.5M
833                  * resulting in crash.
834                  */
835                 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
836                                 ARRAY_LEN(addrs), &num_addrs, addrs);
837                 if (cs_err != CS_OK) {
838                         ast_log(LOG_WARNING, "Failed to get node addresses\n");
839                         continue;
840                 }
841
842                 for (j = 0; j < num_addrs; j++) {
843                         struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
844                         size_t sa_len = (size_t) addrs[j].address_length;
845                         char buf[128];
846
847                         getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
848
849                         ast_cli(a->fd, "=== --> Address %u: %s\n", j + 1, buf);
850                 }
851 #else
852                 ast_cli(a->fd, "=== --> Nodeid: %"PRIu32"\n", cpg_desc.nodeid);
853 #endif
854         }
855
856         ast_cli(a->fd, "===\n"
857                        "=============================================================\n"
858                        "\n");
859
860         cpg_iteration_finalize(cpg_iter);
861
862         return CLI_SUCCESS;
863 }
864
865 static char *corosync_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
866 {
867         struct ast_event *event;
868
869         switch (cmd) {
870         case CLI_INIT:
871                 e->command = "corosync ping";
872                 e->usage =
873                         "Usage: corosync ping\n"
874                         "       Send a test ping to the cluster.\n"
875                         "A NOTICE will be in the log for every ping received\n"
876                         "on a server.\n  If you send a ping, you should see a NOTICE\n"
877                         "in the log for every server in the cluster.\n";
878                 return NULL;
879
880         case CLI_GENERATE:
881                 return NULL;    /* no completion */
882         }
883
884         if (a->argc != e->args) {
885                 return CLI_SHOWUSAGE;
886         }
887
888         event = ast_event_new(AST_EVENT_PING, AST_EVENT_IE_END);
889
890         if (!event) {
891                 return CLI_FAILURE;
892         }
893
894         ast_rwlock_rdlock(&event_types_lock);
895         event_types[AST_EVENT_PING].publish_to_stasis(event);
896         ast_rwlock_unlock(&event_types_lock);
897
898         return CLI_SUCCESS;
899 }
900
901 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
902 {
903         unsigned int i;
904
905         switch (cmd) {
906         case CLI_INIT:
907                 e->command = "corosync show config";
908                 e->usage =
909                         "Usage: corosync show config\n"
910                         "       Show configuration loaded from res_corosync.conf\n";
911                 return NULL;
912
913         case CLI_GENERATE:
914                 return NULL;    /* no completion */
915         }
916
917         if (a->argc != e->args) {
918                 return CLI_SHOWUSAGE;
919         }
920
921         ast_cli(a->fd, "\n"
922                     "=============================================================\n"
923                     "=== res_corosync config =====================================\n"
924                     "=============================================================\n"
925                     "===\n");
926
927         ast_rwlock_rdlock(&event_types_lock);
928         for (i = 0; i < ARRAY_LEN(event_types); i++) {
929                 if (event_types[i].publish) {
930                         ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
931                                         event_types[i].name);
932                 }
933                 if (event_types[i].subscribe) {
934                         ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
935                                         event_types[i].name);
936                 }
937         }
938         ast_rwlock_unlock(&event_types_lock);
939
940         ast_cli(a->fd, "===\n"
941                        "=============================================================\n"
942                        "\n");
943
944         return CLI_SUCCESS;
945 }
946
947 static struct ast_cli_entry corosync_cli[] = {
948         AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
949         AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
950         AST_CLI_DEFINE(corosync_ping, "Send a test ping to the cluster"),
951 };
952
953 enum {
954         PUBLISH,
955         SUBSCRIBE,
956 };
957
958 static int set_event(const char *event_type, int pubsub)
959 {
960         unsigned int i;
961
962         for (i = 0; i < ARRAY_LEN(event_types); i++) {
963                 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
964                         continue;
965                 }
966
967                 switch (pubsub) {
968                 case PUBLISH:
969                         event_types[i].publish = 1;
970                         break;
971                 case SUBSCRIBE:
972                         event_types[i].subscribe = 1;
973                         break;
974                 }
975
976                 break;
977         }
978
979         return (i == ARRAY_LEN(event_types)) ? -1 : 0;
980 }
981
982 static int load_general_config(struct ast_config *cfg)
983 {
984         struct ast_variable *v;
985         int res = 0;
986         unsigned int i;
987
988         ast_rwlock_wrlock(&event_types_lock);
989
990         for (i = 0; i < ARRAY_LEN(event_types); i++) {
991                 event_types[i].publish = event_types[i].publish_default;
992                 event_types[i].subscribe = event_types[i].subscribe_default;
993         }
994
995         for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
996                 if (!strcasecmp(v->name, "publish_event")) {
997                         res = set_event(v->value, PUBLISH);
998                 } else if (!strcasecmp(v->name, "subscribe_event")) {
999                         res = set_event(v->value, SUBSCRIBE);
1000                 } else {
1001                         ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
1002                 }
1003         }
1004
1005         for (i = 0; i < ARRAY_LEN(event_types); i++) {
1006                 if (event_types[i].publish && !event_types[i].sub) {
1007                         event_types[i].sub = stasis_forward_all(event_types[i].topic_fn(),
1008                                                                                                         corosync_topic());
1009                         stasis_message_router_add(stasis_router,
1010                                                   event_types[i].message_type_fn(),
1011                                                   stasis_message_cb,
1012                                                   NULL);
1013                 } else if (!event_types[i].publish && event_types[i].sub) {
1014                         event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
1015                         stasis_message_router_remove(stasis_router,
1016                                                      event_types[i].message_type_fn());
1017                 }
1018         }
1019
1020         ast_rwlock_unlock(&event_types_lock);
1021
1022         return res;
1023 }
1024
1025 static int load_config(unsigned int reload)
1026 {
1027         static const char filename[] = "res_corosync.conf";
1028         struct ast_config *cfg;
1029         const char *cat = NULL;
1030         struct ast_flags config_flags = { 0 };
1031         int res = 0;
1032
1033         cfg = ast_config_load(filename, config_flags);
1034
1035         if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
1036                 return -1;
1037         }
1038
1039         while ((cat = ast_category_browse(cfg, cat))) {
1040                 if (!strcasecmp(cat, "general")) {
1041                         res = load_general_config(cfg);
1042                 } else {
1043                         ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
1044                 }
1045         }
1046
1047         ast_config_destroy(cfg);
1048
1049         return res;
1050 }
1051
1052 static void cleanup_module(void)
1053 {
1054         cs_error_t cs_err;
1055         unsigned int i;
1056
1057         if (stasis_router) {
1058
1059                 /* Unsubscribe all topic forwards and cancel all message routes */
1060                 ast_rwlock_wrlock(&event_types_lock);
1061                 for (i = 0; i < ARRAY_LEN(event_types); i++) {
1062                         if (event_types[i].sub) {
1063                                 event_types[i].sub = stasis_forward_cancel(event_types[i].sub);
1064                                 stasis_message_router_remove(stasis_router,
1065                                                              event_types[i].message_type_fn());
1066                         }
1067                         event_types[i].publish = 0;
1068                         event_types[i].subscribe = 0;
1069                 }
1070                 ast_rwlock_unlock(&event_types_lock);
1071
1072                 stasis_message_router_unsubscribe_and_join(stasis_router);
1073                 stasis_router = NULL;
1074         }
1075
1076         if (corosync_aggregate_topic) {
1077                 ao2_t_ref(corosync_aggregate_topic, -1, "Dispose of topic on cleanup");
1078                 corosync_aggregate_topic = NULL;
1079         }
1080
1081         STASIS_MESSAGE_TYPE_CLEANUP(corosync_ping_message_type);
1082
1083         if (dispatch_thread.id != AST_PTHREADT_NULL) {
1084                 char meepmeep = 'x';
1085                 dispatch_thread.stop = 1;
1086                 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
1087                                         5000) == -1) {
1088                         ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
1089                                         strerror(errno), errno);
1090                 }
1091                 pthread_join(dispatch_thread.id, NULL);
1092         }
1093
1094         if (dispatch_thread.alert_pipe[0] != -1) {
1095                 close(dispatch_thread.alert_pipe[0]);
1096                 dispatch_thread.alert_pipe[0] = -1;
1097         }
1098
1099         if (dispatch_thread.alert_pipe[1] != -1) {
1100                 close(dispatch_thread.alert_pipe[1]);
1101                 dispatch_thread.alert_pipe[1] = -1;
1102         }
1103
1104         if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
1105                 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
1106         }
1107         cpg_handle = 0;
1108
1109         if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
1110                 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
1111         }
1112         cfg_handle = 0;
1113
1114         ao2_cleanup(nodes);
1115         nodes = NULL;
1116 }
1117
1118 static int load_module(void)
1119 {
1120         cs_error_t cs_err;
1121         struct cpg_name name;
1122
1123         if (ast_eid_is_empty(&ast_eid_default)) {
1124                 ast_log(LOG_ERROR, "Entity ID is not set.\n");
1125                 return AST_MODULE_LOAD_DECLINE;
1126         }
1127
1128         nodes = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, 23,
1129                 corosync_node_hash_fn, NULL, corosync_node_cmp_fn);
1130         if (!nodes) {
1131                 goto failed;
1132         }
1133
1134         corosync_aggregate_topic = stasis_topic_create("corosync_aggregate_topic");
1135         if (!corosync_aggregate_topic) {
1136                 ast_log(AST_LOG_ERROR, "Failed to create stasis topic for corosync\n");
1137                 goto failed;
1138         }
1139
1140         stasis_router = stasis_message_router_create(corosync_aggregate_topic);
1141         if (!stasis_router) {
1142                 ast_log(AST_LOG_ERROR, "Failed to create message router for corosync topic\n");
1143                 goto failed;
1144         }
1145
1146         if (STASIS_MESSAGE_TYPE_INIT(corosync_ping_message_type) != 0) {
1147                 ast_log(AST_LOG_ERROR, "Failed to initialize corosync ping message type\n");
1148                 goto failed;
1149         }
1150
1151         if (load_config(0)) {
1152                 /* simply not configured is not a fatal error */
1153                 goto failed;
1154         }
1155
1156         if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
1157                 ast_log(LOG_ERROR, "Failed to initialize cfg: (%d)\n", (int) cs_err);
1158                 goto failed;
1159         }
1160
1161         if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
1162                 ast_log(LOG_ERROR, "Failed to initialize cpg: (%d)\n", (int) cs_err);
1163                 goto failed;
1164         }
1165
1166         ast_copy_string(name.value, "asterisk", sizeof(name.value));
1167         name.length = strlen(name.value);
1168
1169         if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
1170                 ast_log(LOG_ERROR, "Failed to join: (%d)\n", (int) cs_err);
1171                 goto failed;
1172         }
1173
1174         if (pipe(dispatch_thread.alert_pipe) == -1) {
1175                 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
1176                                 strerror(errno), errno);
1177                 goto failed;
1178         }
1179
1180         if (corosync_pthread_create_background(&dispatch_thread.id, NULL,
1181                         dispatch_thread_handler, NULL)) {
1182                 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
1183                 goto failed;
1184         }
1185
1186         ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
1187
1188
1189         return AST_MODULE_LOAD_SUCCESS;
1190
1191 failed:
1192         cleanup_module();
1193
1194         return AST_MODULE_LOAD_DECLINE;
1195 }
1196
1197 static int unload_module(void)
1198 {
1199         ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
1200
1201         cleanup_module();
1202
1203         return 0;
1204 }
1205
1206 AST_MODULE_INFO_STANDARD_EXTENDED(ASTERISK_GPL_KEY, "Corosync");