2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2007, Digium, Inc.
5 * Copyright (C) 2012, Russell Bryant
7 * Russell Bryant <russell@russellbryant.net>
9 * See http://www.asterisk.org for more information about
10 * the Asterisk project. Please do not directly contact
11 * any of the maintainers of this project for assistance;
12 * the project provides a web site, mailing lists and IRC
13 * channels for your use.
15 * This program is free software, distributed under the terms of
16 * the GNU General Public License Version 2. See the LICENSE file
17 * at the top of the source tree.
22 * \author Russell Bryant <russell@russellbryant.net>
24 * This module is based on and replaces the previous res_ais module.
28 <depend>corosync</depend>
29 <support_level>extended</support_level>
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
36 #include <corosync/cpg.h>
37 #include <corosync/cfg.h>
39 #include "asterisk/module.h"
40 #include "asterisk/logger.h"
41 #include "asterisk/poll-compat.h"
42 #include "asterisk/config.h"
43 #include "asterisk/event.h"
44 #include "asterisk/cli.h"
45 #include "asterisk/devicestate.h"
47 AST_RWLOCK_DEFINE_STATIC(event_types_lock);
51 struct ast_event_sub *sub;
52 unsigned char publish;
53 unsigned char subscribe;
55 [AST_EVENT_MWI] = { .name = "mwi", },
56 [AST_EVENT_DEVICE_STATE_CHANGE] = { .name = "device_state", },
64 .id = AST_PTHREADT_NULL,
65 .alert_pipe = { -1, -1 },
68 static cpg_handle_t cpg_handle;
69 static corosync_cfg_handle_t cfg_handle;
71 static void cfg_state_track_cb(
72 corosync_cfg_state_notification_buffer_t *notification_buffer,
75 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
76 corosync_cfg_shutdown_flags_t flags);
78 static corosync_cfg_callbacks_t cfg_callbacks = {
79 .corosync_cfg_state_track_callback = cfg_state_track_cb,
80 .corosync_cfg_shutdown_callback = cfg_shutdown_cb,
83 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
84 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len);
86 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
87 const struct cpg_address *member_list, size_t member_list_entries,
88 const struct cpg_address *left_list, size_t left_list_entries,
89 const struct cpg_address *joined_list, size_t joined_list_entries);
91 static cpg_callbacks_t cpg_callbacks = {
92 .cpg_deliver_fn = cpg_deliver_cb,
93 .cpg_confchg_fn = cpg_confchg_cb,
96 static void ast_event_cb(const struct ast_event *event, void *data);
98 static void cfg_state_track_cb(
99 corosync_cfg_state_notification_buffer_t *notification_buffer,
104 static void cfg_shutdown_cb(corosync_cfg_handle_t cfg_handle,
105 corosync_cfg_shutdown_flags_t flags)
109 static void cpg_deliver_cb(cpg_handle_t handle, const struct cpg_name *group_name,
110 uint32_t nodeid, uint32_t pid, void *msg, size_t msg_len)
112 struct ast_event *event;
114 if (msg_len < ast_event_minimum_length()) {
115 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
116 (unsigned int) msg_len,
117 (unsigned int) ast_event_minimum_length());
121 if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(msg, AST_EVENT_IE_EID))) {
122 /* Don't feed events back in that originated locally. */
126 ast_rwlock_rdlock(&event_types_lock);
127 if (!event_types[ast_event_get_type(msg)].subscribe) {
128 /* We are not configured to subscribe to these events. */
129 ast_rwlock_unlock(&event_types_lock);
132 ast_rwlock_unlock(&event_types_lock);
134 if (!(event = ast_malloc(msg_len))) {
138 memcpy(event, msg, msg_len);
140 ast_event_queue_and_cache(event);
143 static void cpg_confchg_cb(cpg_handle_t handle, const struct cpg_name *group_name,
144 const struct cpg_address *member_list, size_t member_list_entries,
145 const struct cpg_address *left_list, size_t left_list_entries,
146 const struct cpg_address *joined_list, size_t joined_list_entries)
150 /* If any new nodes have joined, dump our cache of events we are publishing
151 * that originated from this server. */
153 if (!joined_list_entries) {
157 for (i = 0; i < ARRAY_LEN(event_types); i++) {
158 struct ast_event_sub *event_sub;
160 ast_rwlock_rdlock(&event_types_lock);
161 if (!event_types[i].publish) {
162 ast_rwlock_unlock(&event_types_lock);
165 ast_rwlock_unlock(&event_types_lock);
167 event_sub = ast_event_subscribe_new(i, ast_event_cb, NULL);
168 ast_event_sub_append_ie_raw(event_sub, AST_EVENT_IE_EID,
169 &ast_eid_default, sizeof(ast_eid_default));
170 ast_event_dump_cache(event_sub);
171 ast_event_sub_destroy(event_sub);
175 static void *dispatch_thread_handler(void *data)
178 struct pollfd pfd[3] = {
179 { .events = POLLIN, },
180 { .events = POLLIN, },
181 { .events = POLLIN, },
184 if ((cs_err = cpg_fd_get(cpg_handle, &pfd[0].fd)) != CS_OK) {
185 ast_log(LOG_ERROR, "Failed to get CPG fd. This module is now broken.\n");
189 if ((cs_err = corosync_cfg_fd_get(cfg_handle, &pfd[1].fd)) != CS_OK) {
190 ast_log(LOG_ERROR, "Failed to get CFG fd. This module is now broken.\n");
194 pfd[2].fd = dispatch_thread.alert_pipe[0];
196 while (!dispatch_thread.stop) {
203 res = ast_poll(pfd, ARRAY_LEN(pfd), -1);
204 if (res == -1 && errno != EINTR && errno != EAGAIN) {
205 ast_log(LOG_ERROR, "poll() error: %s (%d)\n", strerror(errno), errno);
209 if (pfd[0].revents & POLLIN) {
210 if ((cs_err = cpg_dispatch(cpg_handle, CS_DISPATCH_ALL)) != CS_OK) {
211 ast_log(LOG_WARNING, "Failed CPG dispatch: %d\n", cs_err);
215 if (pfd[1].revents & POLLIN) {
216 if ((cs_err = corosync_cfg_dispatch(cfg_handle, CS_DISPATCH_ALL)) != CS_OK) {
217 ast_log(LOG_WARNING, "Failed CFG dispatch: %d\n", cs_err);
225 static void ast_event_cb(const struct ast_event *event, void *data)
229 .iov_base = (void *) event,
230 .iov_len = ast_event_get_size(event),
233 if (ast_eid_cmp(&ast_eid_default,
234 ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
235 /* If the event didn't originate from this server, don't send it back out. */
239 /* The ast_event subscription will only exist if we are configured to publish
240 * these events, so just send away. */
242 if ((cs_err = cpg_mcast_joined(cpg_handle, CPG_TYPE_FIFO, &iov, 1)) != CS_OK) {
243 ast_log(LOG_WARNING, "CPG mcast failed (%d)\n", cs_err);
247 static char *corosync_show_members(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
250 cpg_iteration_handle_t cpg_iter;
251 struct cpg_iteration_description_t cpg_desc;
256 e->command = "corosync show members";
258 "Usage: corosync show members\n"
259 " Show corosync cluster members\n";
263 return NULL; /* no completion */
266 if (a->argc != e->args) {
267 return CLI_SHOWUSAGE;
270 cs_err = cpg_iteration_initialize(cpg_handle, CPG_ITERATION_ALL, NULL, &cpg_iter);
272 if (cs_err != CS_OK) {
273 ast_cli(a->fd, "Failed to initialize CPG iterator.\n");
278 "=============================================================\n"
279 "=== Cluster members =========================================\n"
280 "=============================================================\n"
283 for (i = 1, cs_err = cpg_iteration_next(cpg_iter, &cpg_desc);
285 cs_err = cpg_iteration_next(cpg_iter, &cpg_desc), i++) {
286 corosync_cfg_node_address_t addrs[8];
290 cs_err = corosync_cfg_get_node_addrs(cfg_handle, cpg_desc.nodeid,
291 ARRAY_LEN(addrs), &num_addrs, addrs);
292 if (cs_err != CS_OK) {
293 ast_log(LOG_WARNING, "Failed to get node addresses\n");
297 ast_cli(a->fd, "=== Node %d\n", i);
298 ast_cli(a->fd, "=== --> Group: %s\n", cpg_desc.group.value);
300 for (j = 0; j < num_addrs; j++) {
301 struct sockaddr *sa = (struct sockaddr *) addrs[j].address;
302 size_t sa_len = (size_t) addrs[j].address_length;
305 getnameinfo(sa, sa_len, buf, sizeof(buf), NULL, 0, NI_NUMERICHOST);
307 ast_cli(a->fd, "=== --> Address %d: %s\n", j + 1, buf);
312 ast_cli(a->fd, "===\n"
313 "=============================================================\n"
316 cpg_iteration_finalize(cpg_iter);
321 static char *corosync_show_config(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
327 e->command = "corosync show config";
329 "Usage: corosync show config\n"
330 " Show configuration loaded from res_corosync.conf\n";
334 return NULL; /* no completion */
337 if (a->argc != e->args) {
338 return CLI_SHOWUSAGE;
342 "=============================================================\n"
343 "=== res_corosync config =====================================\n"
344 "=============================================================\n"
347 ast_rwlock_rdlock(&event_types_lock);
348 for (i = 0; i < ARRAY_LEN(event_types); i++) {
349 if (event_types[i].publish) {
350 ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
351 event_types[i].name);
353 if (event_types[i].subscribe) {
354 ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
355 event_types[i].name);
358 ast_rwlock_unlock(&event_types_lock);
360 ast_cli(a->fd, "===\n"
361 "=============================================================\n"
367 static struct ast_cli_entry corosync_cli[] = {
368 AST_CLI_DEFINE(corosync_show_config, "Show configuration"),
369 AST_CLI_DEFINE(corosync_show_members, "Show cluster members"),
377 static int set_event(const char *event_type, int pubsub)
381 for (i = 0; i < ARRAY_LEN(event_types); i++) {
382 if (!event_types[i].name || strcasecmp(event_type, event_types[i].name)) {
388 event_types[i].publish = 1;
391 event_types[i].subscribe = 1;
398 return (i == ARRAY_LEN(event_types)) ? -1 : 0;
401 static int load_general_config(struct ast_config *cfg)
403 struct ast_variable *v;
407 ast_rwlock_wrlock(&event_types_lock);
409 for (i = 0; i < ARRAY_LEN(event_types); i++) {
410 event_types[i].publish = 0;
411 event_types[i].subscribe = 0;
414 for (v = ast_variable_browse(cfg, "general"); v && !res; v = v->next) {
415 if (!strcasecmp(v->name, "publish_event")) {
416 res = set_event(v->value, PUBLISH);
417 } else if (!strcasecmp(v->name, "subscribe_event")) {
418 res = set_event(v->value, SUBSCRIBE);
420 ast_log(LOG_WARNING, "Unknown option '%s'\n", v->name);
424 for (i = 0; i < ARRAY_LEN(event_types); i++) {
425 if (event_types[i].publish && !event_types[i].sub) {
426 event_types[i].sub = ast_event_subscribe(i,
427 ast_event_cb, "Corosync", NULL,
429 } else if (!event_types[i].publish && event_types[i].sub) {
430 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
434 ast_rwlock_unlock(&event_types_lock);
439 static int load_config(unsigned int reload)
441 static const char filename[] = "res_corosync.conf";
442 struct ast_config *cfg;
443 const char *cat = NULL;
444 struct ast_flags config_flags = { 0 };
447 cfg = ast_config_load(filename, config_flags);
449 if (cfg == CONFIG_STATUS_FILEMISSING || cfg == CONFIG_STATUS_FILEINVALID) {
453 while ((cat = ast_category_browse(cfg, cat))) {
454 if (!strcasecmp(cat, "general")) {
455 res = load_general_config(cfg);
457 ast_log(LOG_WARNING, "Unknown configuration section '%s'\n", cat);
461 ast_config_destroy(cfg);
466 static void cleanup_module(void)
471 for (i = 0; i < ARRAY_LEN(event_types); i++) {
472 if (event_types[i].sub) {
473 event_types[i].sub = ast_event_unsubscribe(event_types[i].sub);
475 event_types[i].publish = 0;
476 event_types[i].subscribe = 0;
479 if (dispatch_thread.id != AST_PTHREADT_NULL) {
481 dispatch_thread.stop = 1;
482 if (ast_carefulwrite(dispatch_thread.alert_pipe[1], &meepmeep, 1,
484 ast_log(LOG_ERROR, "Failed to write to pipe: %s (%d)\n",
485 strerror(errno), errno);
487 pthread_join(dispatch_thread.id, NULL);
490 if (dispatch_thread.alert_pipe[0] != -1) {
491 close(dispatch_thread.alert_pipe[0]);
492 dispatch_thread.alert_pipe[0] = -1;
495 if (dispatch_thread.alert_pipe[1] != -1) {
496 close(dispatch_thread.alert_pipe[1]);
497 dispatch_thread.alert_pipe[1] = -1;
500 if (cpg_handle && (cs_err = cpg_finalize(cpg_handle)) != CS_OK) {
501 ast_log(LOG_ERROR, "Failed to finalize cpg (%d)\n", (int) cs_err);
505 if (cfg_handle && (cs_err = corosync_cfg_finalize(cfg_handle)) != CS_OK) {
506 ast_log(LOG_ERROR, "Failed to finalize cfg (%d)\n", (int) cs_err);
511 static int load_module(void)
514 enum ast_module_load_result res = AST_MODULE_LOAD_FAILURE;
515 struct cpg_name name;
517 if ((cs_err = corosync_cfg_initialize(&cfg_handle, &cfg_callbacks)) != CS_OK) {
518 ast_log(LOG_ERROR, "Failed to initialize cfg (%d)\n", (int) cs_err);
519 return AST_MODULE_LOAD_DECLINE;
522 if ((cs_err = cpg_initialize(&cpg_handle, &cpg_callbacks)) != CS_OK) {
523 ast_log(LOG_ERROR, "Failed to initialize cpg (%d)\n", (int) cs_err);
527 ast_copy_string(name.value, "asterisk", sizeof(name.value));
528 name.length = strlen(name.value);
530 if ((cs_err = cpg_join(cpg_handle, &name)) != CS_OK) {
531 ast_log(LOG_ERROR, "Failed to join (%d)\n", (int) cs_err);
535 if (pipe(dispatch_thread.alert_pipe) == -1) {
536 ast_log(LOG_ERROR, "Failed to create alert pipe: %s (%d)\n",
537 strerror(errno), errno);
541 if (ast_pthread_create_background(&dispatch_thread.id, NULL,
542 dispatch_thread_handler, NULL)) {
543 ast_log(LOG_ERROR, "Error starting CPG dispatch thread.\n");
547 if (load_config(0)) {
548 /* simply not configured is not a fatal error */
549 res = AST_MODULE_LOAD_DECLINE;
553 ast_cli_register_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
555 ast_enable_distributed_devstate();
557 return AST_MODULE_LOAD_SUCCESS;
565 static int unload_module(void)
567 ast_cli_unregister_multiple(corosync_cli, ARRAY_LEN(corosync_cli));
574 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "Corosync");