0057f0481a1e7262f5fa27c071175986c927288c
[asterisk/asterisk.git] / res / ais / evt.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief Usage of the SAForum AIS (Application Interface Specification)
24  *
25  * \arg http://www.openais.org/
26  *
27  * This file contains the code specific to the use of the EVT 
28  * (Event) Service.
29  */
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
34
35 #include <stdlib.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <unistd.h>
39 #include <errno.h>
40
41 #include "ais.h"
42
43 #include "asterisk/module.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/logger.h"
47 #include "asterisk/event.h"
48 #include "asterisk/config.h"
49 #include "asterisk/linkedlists.h"
50
51 #ifndef AST_MODULE
52 /* XXX HACK */
53 #define AST_MODULE "res_ais"
54 #endif
55
56 SaEvtHandleT evt_handle;
57
58 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
59         SaAisErrorT error);
60 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
61         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
62
63 static const SaEvtCallbacksT evt_callbacks = {
64         .saEvtChannelOpenCallback  = evt_channel_open_cb,
65         .saEvtEventDeliverCallback = evt_event_deliver_cb, 
66 };
67
68 static const struct {
69         const char *str;
70         enum ast_event_type type;
71 } supported_event_types[] = {
72         { "mwi", AST_EVENT_MWI },
73         { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
74 };
75
76 /*! Used to provide unique id's to egress subscriptions */
77 static int unique_id;
78
79 struct subscribe_event {
80         AST_LIST_ENTRY(subscribe_event) entry;
81         /*! This is a unique identifier to identify this subscription in the event
82          *  channel through the different API calls, subscribe, unsubscribe, and
83          *  the event deliver callback. */
84         SaEvtSubscriptionIdT id;
85         enum ast_event_type type;
86 };
87
88 struct publish_event {
89         AST_LIST_ENTRY(publish_event) entry;
90         /*! We subscribe to events internally so that we can publish them
91          *  on this event channel. */
92         struct ast_event_sub *sub;
93         enum ast_event_type type;
94 };
95
96 struct event_channel {
97         AST_RWLIST_ENTRY(event_channel) entry;
98         AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
99         AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
100         SaEvtChannelHandleT handle;
101         char name[1];
102 };
103
104 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
105
106 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
107         SaAisErrorT error)
108 {
109
110 }
111
112 static void queue_event(struct ast_event *ast_event)
113 {
114         enum ast_event_type type;
115
116         /*! 
117          * \todo This hack macks me sad.  I need to come up with a better way to
118          *       figure out whether an event should be cached or not, and what
119          *       parameters to cache on.
120          *
121          *       As long as the types of events that are supported is limited,
122          *       this isn't *terrible*, I guess.  Perhaps we should just define
123          *       caching rules in the core, and make them configurable, and not
124          *       have it be the job of the event publishers.
125          */
126
127         type = ast_event_get_type(ast_event);
128
129         if (type == AST_EVENT_MWI) {
130                 ast_event_queue_and_cache(ast_event,
131                         AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR,
132                         AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR,
133                         AST_EVENT_IE_END);
134         } else if (type == AST_EVENT_DEVICE_STATE_CHANGE) {
135                 ast_event_queue_and_cache(ast_event,
136                         AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR,
137                         AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW, sizeof(struct ast_eid),
138                         AST_EVENT_IE_END);
139         } else {
140                 ast_event_queue(ast_event);
141         }
142 }
143
144 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
145         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
146 {
147         /* It is important to note that this works because we *know* that this
148          * function will only be called by a single thread, the dispatch_thread.
149          * If this module gets changed such that this is no longer the case, this
150          * should get changed to a thread-local buffer, instead. */
151         static unsigned char buf[4096];
152         struct ast_event *event_dup, *event = (void *) buf;
153         SaAisErrorT ais_res;
154         SaSizeT len = sizeof(buf);
155
156         if (event_datalen > len) {
157                 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
158                         "for the allocated size %u. Change the code to increase the size.\n",
159                         (unsigned int) event_datalen, (unsigned int) len);
160                 return;
161         }
162
163         ais_res = saEvtEventDataGet(event_handle, event, &len);
164         if (ais_res != SA_AIS_OK) {
165                 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n", 
166                         ais_err2str(ais_res));
167                 return;
168         }
169
170         if (!ast_eid_cmp(&g_eid, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
171                 /* Don't feed events back in that originated locally. */
172                 return;
173         }
174
175         if (!(event_dup = ast_malloc(len)))
176                 return;
177         
178         memcpy(event_dup, event, len);
179
180         queue_event(event_dup);
181 }
182
183 static const char *type_to_filter_str(enum ast_event_type type)
184 {
185         const char *filter_str = NULL;
186         int i;
187
188         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
189                 if (supported_event_types[i].type == type) {
190                         filter_str = supported_event_types[i].str;
191                         break;
192                 }
193         }
194
195         return filter_str;
196 }
197
198 static void ast_event_cb(const struct ast_event *ast_event, void *data)
199 {
200         SaEvtEventHandleT event_handle;
201         SaAisErrorT ais_res;
202         struct event_channel *event_channel = data;
203         SaClmClusterNodeT local_node;
204         SaEvtEventPatternArrayT pattern_array;
205         SaEvtEventPatternT pattern;
206         SaSizeT len;
207         const char *filter_str;
208         SaEvtEventIdT event_id;
209
210         ast_log(LOG_DEBUG, "Got an event to forward\n");
211
212         if (ast_eid_cmp(&g_eid, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
213                 /* If the event didn't originate from this server, don't send it back out. */
214                 ast_log(LOG_DEBUG, "Returning here\n");
215                 return;
216         }
217
218         ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
219         if (ais_res != SA_AIS_OK) {
220                 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
221                 ast_log(LOG_DEBUG, "Returning here\n");
222                 return;
223         }
224         
225         ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID, 
226                 SA_TIME_ONE_SECOND, &local_node);
227         if (ais_res != SA_AIS_OK) {
228                 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
229                 goto return_event_free;
230         }
231
232         filter_str = type_to_filter_str(ast_event_get_type(ast_event));
233         len = strlen(filter_str) + 1;
234         pattern.pattern = (SaUint8T *) filter_str;
235         pattern.patternSize = len;
236         pattern.allocatedSize = len;
237
238         pattern_array.allocatedNumber = 1;
239         pattern_array.patternsNumber = 1;
240         pattern_array.patterns = &pattern;
241
242         /*! 
243          * /todo Make retention time configurable 
244          * /todo Make event priorities configurable
245          */
246         ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
247                 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
248         if (ais_res != SA_AIS_OK) {
249                 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
250                 goto return_event_free;
251         }
252
253         ais_res = saEvtEventPublish(event_handle, 
254                 ast_event, ast_event_get_size(ast_event), &event_id);
255         if (ais_res != SA_AIS_OK) {
256                 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
257                 goto return_event_free;
258         }
259
260 return_event_free:
261         ais_res = saEvtEventFree(event_handle);
262         if (ais_res != SA_AIS_OK) {
263                 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
264         }
265         ast_log(LOG_DEBUG, "Returning here (event_free)\n");
266 }
267
268 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
269 {
270         struct event_channel *event_channel;
271
272         switch (cmd) {
273         case CLI_INIT:
274                 e->command = "ais evt show event channels";
275                 e->usage =
276                         "Usage: ais evt show event channels\n"
277                         "       List configured event channels for the (EVT) Eventing service.\n";
278                 return NULL;
279
280         case CLI_GENERATE:
281                 return NULL;    /* no completion */
282         }
283
284         if (a->argc != e->args)
285                 return CLI_SHOWUSAGE;
286
287         ast_cli(a->fd, "\n"
288                     "=============================================================\n"
289                     "=== Event Channels ==========================================\n"
290                     "=============================================================\n"
291                     "===\n");
292
293         AST_RWLIST_RDLOCK(&event_channels);
294         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
295                 struct publish_event *publish_event;
296                 struct subscribe_event *subscribe_event;
297
298                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
299                                "=== Event Channel Name: %s\n", event_channel->name);
300
301                 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
302                         ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", 
303                                 type_to_filter_str(publish_event->type));
304                 }
305                 
306                 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
307                         ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n", 
308                                 type_to_filter_str(subscribe_event->type));
309                 }
310
311                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
312                                "===\n");
313         }
314         AST_RWLIST_UNLOCK(&event_channels);
315
316         ast_cli(a->fd, "=============================================================\n"
317                        "\n");
318
319         return CLI_SUCCESS;
320 }
321
322 static struct ast_cli_entry ais_cli[] = {
323         AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
324 };
325
326 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
327 {
328         int i;
329         enum ast_event_type type = -1;
330         struct publish_event *publish_event;
331
332         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
333                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
334                         type = supported_event_types[i].type;
335                         break;
336                 }
337         }
338
339         if (type == -1) {
340                 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
341                 return;
342         }
343
344         if (!(publish_event = ast_calloc(1, sizeof(*publish_event))))
345                 return;
346         
347         publish_event->type = type;
348         ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
349         publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
350                 AST_EVENT_IE_END);
351         ast_event_dump_cache(publish_event->sub);
352
353         AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
354 }
355
356 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
357         struct subscribe_event *subscribe_event)
358 {
359         SaAisErrorT ais_res;
360         SaEvtEventFilterArrayT filter_array;
361         SaEvtEventFilterT filter;
362         const char *filter_str = NULL;
363         SaSizeT len;
364
365         /* We know it's going to be valid.  It was checked earlier. */
366         filter_str = type_to_filter_str(subscribe_event->type);
367
368         filter.filterType = SA_EVT_EXACT_FILTER;
369         len = strlen(filter_str) + 1;
370         filter.filter.allocatedSize = len;
371         filter.filter.patternSize = len;
372         filter.filter.pattern = (SaUint8T *) filter_str;
373
374         filter_array.filtersNumber = 1;
375         filter_array.filters = &filter;
376
377         ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array, 
378                 subscribe_event->id);
379
380         return ais_res;
381 }
382
383 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
384 {
385         int i;
386         enum ast_event_type type = -1;
387         struct subscribe_event *subscribe_event;
388         SaAisErrorT ais_res;
389
390         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
391                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
392                         type = supported_event_types[i].type;
393                         break;
394                 }
395         }
396
397         if (type == -1) {
398                 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
399                 return;
400         }
401
402         if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event))))
403                 return;
404         
405         subscribe_event->type = type;
406         subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
407
408         ais_res = set_egress_subscription(event_channel, subscribe_event);
409         if (ais_res != SA_AIS_OK) {
410                 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
411                         ais_err2str(ais_res));
412                 free(subscribe_event);
413                 return;
414         }
415
416         AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
417 }
418
419 static void build_event_channel(struct ast_config *cfg, const char *cat)
420 {
421         struct ast_variable *var;
422         struct event_channel *event_channel;
423         SaAisErrorT ais_res;
424         SaNameT sa_name = { 0, };
425
426         AST_RWLIST_WRLOCK(&event_channels);
427         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
428                 if (!strcasecmp(event_channel->name, cat))
429                         break;
430         }
431         AST_RWLIST_UNLOCK(&event_channels);
432         if (event_channel) {
433                 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
434                         "configuration.  Second instance ignored.\n", cat);
435                 return;
436         }
437
438         if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
439                 return;
440
441         strcpy(event_channel->name, cat);
442         ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
443         sa_name.length = strlen((char *) sa_name.value);
444         ais_res = saEvtChannelOpen(evt_handle, &sa_name, 
445                 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
446                 SA_TIME_MAX, &event_channel->handle);
447         if (ais_res != SA_AIS_OK) {
448                 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
449                 free(event_channel);
450                 return;
451         }
452
453         for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
454                 if (!strcasecmp(var->name, "type")) {
455                         continue;
456                 } else if (!strcasecmp(var->name, "publish_event")) {
457                         add_publish_event(event_channel, var->value);
458                 } else if (!strcasecmp(var->name, "subscribe_event")) {
459                         add_subscribe_event(event_channel, var->value);
460                 } else {
461                         ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
462                                 event_channel->name, var->name);
463                 }
464         }
465
466         AST_RWLIST_WRLOCK(&event_channels);
467         AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
468         AST_RWLIST_UNLOCK(&event_channels);
469 }
470
471 static void load_config(void)
472 {
473         static const char filename[] = "ais.conf";
474         struct ast_config *cfg;
475         const char *cat = NULL;
476         struct ast_flags config_flags = { 0 };
477
478         if (!(cfg = ast_config_load(filename, config_flags)))
479                 return;
480
481         while ((cat = ast_category_browse(cfg, cat))) {
482                 const char *type;
483
484                 if (!strcasecmp(cat, "general"))
485                         continue;
486
487                 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
488                         ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
489                                 filename);
490                         continue;
491                 }
492
493                 if (!strcasecmp(type, "event_channel")) {
494                         build_event_channel(cfg, cat);
495                 } else {
496                         ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n", 
497                                 filename, type);
498                 }
499         }
500
501         ast_config_destroy(cfg);
502 }
503
504 static void publish_event_destroy(struct publish_event *publish_event)
505 {
506         ast_event_unsubscribe(publish_event->sub);
507
508         free(publish_event);
509 }
510
511 static void subscribe_event_destroy(const struct event_channel *event_channel,
512         struct subscribe_event *subscribe_event)
513 {
514         SaAisErrorT ais_res;
515
516         /* saEvtChannelClose() will actually do this automatically, but it just
517          * feels cleaner to go ahead and do it manually ... */
518         ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
519         if (ais_res != SA_AIS_OK) {
520                 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
521         }
522
523         free(subscribe_event);
524 }
525
526 static void event_channel_destroy(struct event_channel *event_channel)
527 {
528         struct publish_event *publish_event;
529         struct subscribe_event *subscribe_event;
530         SaAisErrorT ais_res;
531
532         while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
533                 publish_event_destroy(publish_event);
534         while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
535                 subscribe_event_destroy(event_channel, subscribe_event);
536
537         ais_res = saEvtChannelClose(event_channel->handle);
538         if (ais_res != SA_AIS_OK) {
539                 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
540                         event_channel->name, ais_err2str(ais_res));
541         }
542
543         free(event_channel);
544 }
545
546 static void destroy_event_channels(void)
547 {
548         struct event_channel *event_channel;
549
550         AST_RWLIST_WRLOCK(&event_channels);
551         while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry)))
552                 event_channel_destroy(event_channel);
553         AST_RWLIST_UNLOCK(&event_channels);
554 }
555
556 int ast_ais_evt_load_module(void)
557 {
558         SaAisErrorT ais_res;
559
560         ais_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
561         if (ais_res != SA_AIS_OK) {
562                 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
563                         ais_err2str(ais_res));
564                 return -1;
565         }
566         
567         load_config();
568
569         ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
570
571         return 0;
572 }
573
574 int ast_ais_evt_unload_module(void)
575 {
576         SaAisErrorT ais_res;
577
578         destroy_event_channels();
579
580         ais_res = saEvtFinalize(evt_handle);
581         if (ais_res != SA_AIS_OK) {
582                 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n", 
583                         ais_err2str(ais_res));
584                 return -1;
585         }
586
587         return 0;       
588 }