Merged revisions 335510 via svnmerge from
[asterisk/asterisk.git] / res / ais / evt.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief Usage of the SAForum AIS (Application Interface Specification)
24  *
25  * \arg http://www.openais.org/
26  *
27  * This file contains the code specific to the use of the EVT
28  * (Event) Service.
29  */
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
34
35 #include <stdlib.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <unistd.h>
39 #include <errno.h>
40
41 #include "ais.h"
42
43 #include "asterisk/module.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/logger.h"
47 #include "asterisk/event.h"
48 #include "asterisk/config.h"
49 #include "asterisk/linkedlists.h"
50 #include "asterisk/devicestate.h"
51
52 #ifndef AST_MODULE
53 /* XXX HACK */
54 #define AST_MODULE "res_ais"
55 #endif
56
57 SaEvtHandleT evt_handle;
58 static SaAisErrorT evt_init_res;
59
60 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
61         SaAisErrorT error);
62 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
63         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
64
65 static const SaEvtCallbacksT evt_callbacks = {
66         .saEvtChannelOpenCallback  = evt_channel_open_cb,
67         .saEvtEventDeliverCallback = evt_event_deliver_cb,
68 };
69
70 static const struct {
71         const char *str;
72         enum ast_event_type type;
73 } supported_event_types[] = {
74         { "mwi", AST_EVENT_MWI },
75         { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
76 };
77
78 /*! Used to provide unique id's to egress subscriptions */
79 static int unique_id;
80
81 struct subscribe_event {
82         AST_LIST_ENTRY(subscribe_event) entry;
83         /*! This is a unique identifier to identify this subscription in the event
84          *  channel through the different API calls, subscribe, unsubscribe, and
85          *  the event deliver callback. */
86         SaEvtSubscriptionIdT id;
87         enum ast_event_type type;
88 };
89
90 struct publish_event {
91         AST_LIST_ENTRY(publish_event) entry;
92         /*! We subscribe to events internally so that we can publish them
93          *  on this event channel. */
94         struct ast_event_sub *sub;
95         enum ast_event_type type;
96 };
97
98 struct event_channel {
99         AST_RWLIST_ENTRY(event_channel) entry;
100         AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
101         AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
102         SaEvtChannelHandleT handle;
103         char name[1];
104 };
105
106 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
107
108 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
109         SaAisErrorT error)
110 {
111
112 }
113
114 static void queue_event(struct ast_event *ast_event)
115 {
116         ast_event_queue_and_cache(ast_event);
117 }
118
119 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
120         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
121 {
122         /* It is important to note that this works because we *know* that this
123          * function will only be called by a single thread, the dispatch_thread.
124          * If this module gets changed such that this is no longer the case, this
125          * should get changed to a thread-local buffer, instead. */
126         static unsigned char buf[4096];
127         struct ast_event *event_dup, *event = (void *) buf;
128         SaAisErrorT ais_res;
129         SaSizeT len = sizeof(buf);
130
131         if (event_datalen > len) {
132                 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
133                         "for the allocated size %u. Change the code to increase the size.\n",
134                         (unsigned int) event_datalen, (unsigned int) len);
135                 return;
136         }
137
138         if (event_datalen < ast_event_minimum_length()) {
139                 ast_debug(1, "Ignoring event that's too small. %u < %u\n",
140                         (unsigned int) event_datalen,
141                         (unsigned int) ast_event_minimum_length());
142                 return;
143         }
144
145         ais_res = saEvtEventDataGet(event_handle, event, &len);
146         if (ais_res != SA_AIS_OK) {
147                 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n",
148                         ais_err2str(ais_res));
149                 return;
150         }
151
152         if (!ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
153                 /* Don't feed events back in that originated locally. */
154                 return;
155         }
156
157         if (!(event_dup = ast_malloc(len)))
158                 return;
159
160         memcpy(event_dup, event, len);
161
162         queue_event(event_dup);
163 }
164
165 static const char *type_to_filter_str(enum ast_event_type type)
166 {
167         const char *filter_str = NULL;
168         int i;
169
170         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
171                 if (supported_event_types[i].type == type) {
172                         filter_str = supported_event_types[i].str;
173                         break;
174                 }
175         }
176
177         return filter_str;
178 }
179
180 static void ast_event_cb(const struct ast_event *ast_event, void *data)
181 {
182         SaEvtEventHandleT event_handle;
183         SaAisErrorT ais_res;
184         struct event_channel *event_channel = data;
185         SaClmClusterNodeT local_node;
186         SaEvtEventPatternArrayT pattern_array;
187         SaEvtEventPatternT pattern;
188         SaSizeT len;
189         const char *filter_str;
190         SaEvtEventIdT event_id;
191
192         ast_debug(1, "Got an event to forward\n");
193
194         if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
195                 /* If the event didn't originate from this server, don't send it back out. */
196                 ast_debug(1, "Returning here\n");
197                 return;
198         }
199
200         ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
201         if (ais_res != SA_AIS_OK) {
202                 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
203                 ast_debug(1, "Returning here\n");
204                 return;
205         }
206
207         ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID,
208                 SA_TIME_ONE_SECOND, &local_node);
209         if (ais_res != SA_AIS_OK) {
210                 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
211                 goto return_event_free;
212         }
213
214         filter_str = type_to_filter_str(ast_event_get_type(ast_event));
215         len = strlen(filter_str) + 1;
216         pattern.pattern = (SaUint8T *) filter_str;
217         pattern.patternSize = len;
218         pattern.allocatedSize = len;
219
220         pattern_array.allocatedNumber = 1;
221         pattern_array.patternsNumber = 1;
222         pattern_array.patterns = &pattern;
223
224         /*!
225          * /todo Make retention time configurable
226          * /todo Make event priorities configurable
227          */
228         ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
229                 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
230         if (ais_res != SA_AIS_OK) {
231                 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
232                 goto return_event_free;
233         }
234
235         ais_res = saEvtEventPublish(event_handle,
236                 ast_event, ast_event_get_size(ast_event), &event_id);
237         if (ais_res != SA_AIS_OK) {
238                 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
239                 goto return_event_free;
240         }
241
242 return_event_free:
243         ais_res = saEvtEventFree(event_handle);
244         if (ais_res != SA_AIS_OK) {
245                 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
246         }
247         ast_debug(1, "Returning here (event_free)\n");
248 }
249
250 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
251 {
252         struct event_channel *event_channel;
253
254         switch (cmd) {
255         case CLI_INIT:
256                 e->command = "ais evt show event channels";
257                 e->usage =
258                         "Usage: ais evt show event channels\n"
259                         "       List configured event channels for the (EVT) Eventing service.\n";
260                 return NULL;
261
262         case CLI_GENERATE:
263                 return NULL;    /* no completion */
264         }
265
266         if (a->argc != e->args)
267                 return CLI_SHOWUSAGE;
268
269         ast_cli(a->fd, "\n"
270                     "=============================================================\n"
271                     "=== Event Channels ==========================================\n"
272                     "=============================================================\n"
273                     "===\n");
274
275         AST_RWLIST_RDLOCK(&event_channels);
276         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
277                 struct publish_event *publish_event;
278                 struct subscribe_event *subscribe_event;
279
280                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
281                                "=== Event Channel Name: %s\n", event_channel->name);
282
283                 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
284                         ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n",
285                                 type_to_filter_str(publish_event->type));
286                 }
287
288                 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
289                         ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n",
290                                 type_to_filter_str(subscribe_event->type));
291                 }
292
293                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
294                                "===\n");
295         }
296         AST_RWLIST_UNLOCK(&event_channels);
297
298         ast_cli(a->fd, "=============================================================\n"
299                        "\n");
300
301         return CLI_SUCCESS;
302 }
303
304 static struct ast_cli_entry ais_cli[] = {
305         AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
306 };
307
308 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
309 {
310         int i;
311         enum ast_event_type type = -1;
312         struct publish_event *publish_event;
313
314         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
315                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
316                         type = supported_event_types[i].type;
317                         break;
318                 }
319         }
320
321         if (type == -1) {
322                 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
323                 return;
324         }
325
326         if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
327                 return;
328         }
329
330         if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
331                 return;
332         }
333
334         publish_event->type = type;
335         ast_debug(1, "Subscribing to event type %d\n", type);
336         publish_event->sub = ast_event_subscribe(type, ast_event_cb, "AIS", event_channel,
337                 AST_EVENT_IE_END);
338         ast_event_dump_cache(publish_event->sub);
339
340         AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
341 }
342
343 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
344         struct subscribe_event *subscribe_event)
345 {
346         SaAisErrorT ais_res;
347         SaEvtEventFilterArrayT filter_array;
348         SaEvtEventFilterT filter;
349         const char *filter_str = NULL;
350         SaSizeT len;
351
352         /* We know it's going to be valid.  It was checked earlier. */
353         filter_str = type_to_filter_str(subscribe_event->type);
354
355         filter.filterType = SA_EVT_EXACT_FILTER;
356         len = strlen(filter_str) + 1;
357         filter.filter.allocatedSize = len;
358         filter.filter.patternSize = len;
359         filter.filter.pattern = (SaUint8T *) filter_str;
360
361         filter_array.filtersNumber = 1;
362         filter_array.filters = &filter;
363
364         ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array,
365                 subscribe_event->id);
366
367         return ais_res;
368 }
369
370 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
371 {
372         int i;
373         enum ast_event_type type = -1;
374         struct subscribe_event *subscribe_event;
375         SaAisErrorT ais_res;
376
377         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
378                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
379                         type = supported_event_types[i].type;
380                         break;
381                 }
382         }
383
384         if (type == -1) {
385                 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
386                 return;
387         }
388
389         if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
390                 return;
391         }
392
393         if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
394                 return;
395         }
396
397         subscribe_event->type = type;
398         subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
399
400         ais_res = set_egress_subscription(event_channel, subscribe_event);
401         if (ais_res != SA_AIS_OK) {
402                 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
403                         ais_err2str(ais_res));
404                 free(subscribe_event);
405                 return;
406         }
407
408         AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
409 }
410
411 static void build_event_channel(struct ast_config *cfg, const char *cat)
412 {
413         struct ast_variable *var;
414         struct event_channel *event_channel;
415         SaAisErrorT ais_res;
416         SaNameT sa_name = { 0, };
417
418         AST_RWLIST_WRLOCK(&event_channels);
419         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
420                 if (!strcasecmp(event_channel->name, cat))
421                         break;
422         }
423         AST_RWLIST_UNLOCK(&event_channels);
424         if (event_channel) {
425                 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
426                         "configuration.  Second instance ignored.\n", cat);
427                 return;
428         }
429
430         if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
431                 return;
432
433         strcpy(event_channel->name, cat);
434         ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
435         sa_name.length = strlen((char *) sa_name.value);
436         ais_res = saEvtChannelOpen(evt_handle, &sa_name,
437                 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
438                 SA_TIME_MAX, &event_channel->handle);
439         if (ais_res != SA_AIS_OK) {
440                 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
441                 free(event_channel);
442                 return;
443         }
444
445         for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
446                 if (!strcasecmp(var->name, "type")) {
447                         continue;
448                 } else if (!strcasecmp(var->name, "publish_event")) {
449                         add_publish_event(event_channel, var->value);
450                 } else if (!strcasecmp(var->name, "subscribe_event")) {
451                         add_subscribe_event(event_channel, var->value);
452                 } else {
453                         ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
454                                 event_channel->name, var->name);
455                 }
456         }
457
458         AST_RWLIST_WRLOCK(&event_channels);
459         AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
460         AST_RWLIST_UNLOCK(&event_channels);
461 }
462
463 static void load_config(void)
464 {
465         static const char filename[] = "ais.conf";
466         struct ast_config *cfg;
467         const char *cat = NULL;
468         struct ast_flags config_flags = { 0 };
469
470         if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
471                 return;
472
473         while ((cat = ast_category_browse(cfg, cat))) {
474                 const char *type;
475
476                 if (!strcasecmp(cat, "general"))
477                         continue;
478
479                 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
480                         ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
481                                 filename);
482                         continue;
483                 }
484
485                 if (!strcasecmp(type, "event_channel")) {
486                         build_event_channel(cfg, cat);
487                 } else {
488                         ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n",
489                                 filename, type);
490                 }
491         }
492
493         ast_config_destroy(cfg);
494 }
495
496 static void publish_event_destroy(struct publish_event *publish_event)
497 {
498         ast_event_unsubscribe(publish_event->sub);
499
500         free(publish_event);
501 }
502
503 static void subscribe_event_destroy(const struct event_channel *event_channel,
504         struct subscribe_event *subscribe_event)
505 {
506         SaAisErrorT ais_res;
507
508         /* saEvtChannelClose() will actually do this automatically, but it just
509          * feels cleaner to go ahead and do it manually ... */
510         ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
511         if (ais_res != SA_AIS_OK) {
512                 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
513         }
514
515         free(subscribe_event);
516 }
517
518 static void event_channel_destroy(struct event_channel *event_channel)
519 {
520         struct publish_event *publish_event;
521         struct subscribe_event *subscribe_event;
522         SaAisErrorT ais_res;
523
524         while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
525                 publish_event_destroy(publish_event);
526         while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
527                 subscribe_event_destroy(event_channel, subscribe_event);
528
529         ais_res = saEvtChannelClose(event_channel->handle);
530         if (ais_res != SA_AIS_OK) {
531                 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
532                         event_channel->name, ais_err2str(ais_res));
533         }
534
535         free(event_channel);
536 }
537
538 static void destroy_event_channels(void)
539 {
540         struct event_channel *event_channel;
541
542         AST_RWLIST_WRLOCK(&event_channels);
543         while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry))) {
544                 event_channel_destroy(event_channel);
545         }
546         AST_RWLIST_UNLOCK(&event_channels);
547 }
548
549 int ast_ais_evt_load_module(void)
550 {
551         evt_init_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
552         if (evt_init_res != SA_AIS_OK) {
553                 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
554                         ais_err2str(evt_init_res));
555                 return -1;
556         }
557
558         load_config();
559
560         ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
561
562         return 0;
563 }
564
565 int ast_ais_evt_unload_module(void)
566 {
567         SaAisErrorT ais_res;
568
569         if (evt_init_res != SA_AIS_OK) {
570                 return 0;
571         }
572
573         destroy_event_channels();
574
575         ais_res = saEvtFinalize(evt_handle);
576         if (ais_res != SA_AIS_OK) {
577                 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n",
578                         ais_err2str(ais_res));
579                 return -1;
580         }
581
582         return 0;
583 }