Improve performance of the ast_event cache functionality.
[asterisk/asterisk.git] / res / ais / evt.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007, Digium, Inc.
5  *
6  * Russell Bryant <russell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! 
20  * \file
21  * \author Russell Bryant <russell@digium.com>
22  *
23  * \brief Usage of the SAForum AIS (Application Interface Specification)
24  *
25  * \arg http://www.openais.org/
26  *
27  * This file contains the code specific to the use of the EVT 
28  * (Event) Service.
29  */
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
34
35 #include <stdlib.h>
36 #include <stdio.h>
37 #include <string.h>
38 #include <unistd.h>
39 #include <errno.h>
40
41 #include "ais.h"
42
43 #include "asterisk/module.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/logger.h"
47 #include "asterisk/event.h"
48 #include "asterisk/config.h"
49 #include "asterisk/linkedlists.h"
50 #include "asterisk/devicestate.h"
51
52 #ifndef AST_MODULE
53 /* XXX HACK */
54 #define AST_MODULE "res_ais"
55 #endif
56
57 SaEvtHandleT evt_handle;
58
59 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
60         SaAisErrorT error);
61 void evt_event_deliver_cb(SaEvtSubscriptionIdT subscription_id,
62         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen);
63
64 static const SaEvtCallbacksT evt_callbacks = {
65         .saEvtChannelOpenCallback  = evt_channel_open_cb,
66         .saEvtEventDeliverCallback = evt_event_deliver_cb, 
67 };
68
69 static const struct {
70         const char *str;
71         enum ast_event_type type;
72 } supported_event_types[] = {
73         { "mwi", AST_EVENT_MWI },
74         { "device_state", AST_EVENT_DEVICE_STATE_CHANGE },
75 };
76
77 /*! Used to provide unique id's to egress subscriptions */
78 static int unique_id;
79
80 struct subscribe_event {
81         AST_LIST_ENTRY(subscribe_event) entry;
82         /*! This is a unique identifier to identify this subscription in the event
83          *  channel through the different API calls, subscribe, unsubscribe, and
84          *  the event deliver callback. */
85         SaEvtSubscriptionIdT id;
86         enum ast_event_type type;
87 };
88
89 struct publish_event {
90         AST_LIST_ENTRY(publish_event) entry;
91         /*! We subscribe to events internally so that we can publish them
92          *  on this event channel. */
93         struct ast_event_sub *sub;
94         enum ast_event_type type;
95 };
96
97 struct event_channel {
98         AST_RWLIST_ENTRY(event_channel) entry;
99         AST_LIST_HEAD_NOLOCK(, subscribe_event) subscribe_events;
100         AST_LIST_HEAD_NOLOCK(, publish_event) publish_events;
101         SaEvtChannelHandleT handle;
102         char name[1];
103 };
104
105 static AST_RWLIST_HEAD_STATIC(event_channels, event_channel);
106
107 void evt_channel_open_cb(SaInvocationT invocation, SaEvtChannelHandleT channel_handle,
108         SaAisErrorT error)
109 {
110
111 }
112
113 static void queue_event(struct ast_event *ast_event)
114 {
115         ast_event_queue_and_cache(ast_event);
116 }
117
118 void evt_event_deliver_cb(SaEvtSubscriptionIdT sub_id,
119         const SaEvtEventHandleT event_handle, const SaSizeT event_datalen)
120 {
121         /* It is important to note that this works because we *know* that this
122          * function will only be called by a single thread, the dispatch_thread.
123          * If this module gets changed such that this is no longer the case, this
124          * should get changed to a thread-local buffer, instead. */
125         static unsigned char buf[4096];
126         struct ast_event *event_dup, *event = (void *) buf;
127         SaAisErrorT ais_res;
128         SaSizeT len = sizeof(buf);
129
130         if (event_datalen > len) {
131                 ast_log(LOG_ERROR, "Event received with size %u, which is too big\n"
132                         "for the allocated size %u. Change the code to increase the size.\n",
133                         (unsigned int) event_datalen, (unsigned int) len);
134                 return;
135         }
136
137         ais_res = saEvtEventDataGet(event_handle, event, &len);
138         if (ais_res != SA_AIS_OK) {
139                 ast_log(LOG_ERROR, "Error retrieving event payload: %s\n", 
140                         ais_err2str(ais_res));
141                 return;
142         }
143
144         if (!ast_eid_cmp(&g_eid, ast_event_get_ie_raw(event, AST_EVENT_IE_EID))) {
145                 /* Don't feed events back in that originated locally. */
146                 return;
147         }
148
149         if (!(event_dup = ast_malloc(len)))
150                 return;
151         
152         memcpy(event_dup, event, len);
153
154         queue_event(event_dup);
155 }
156
157 static const char *type_to_filter_str(enum ast_event_type type)
158 {
159         const char *filter_str = NULL;
160         int i;
161
162         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
163                 if (supported_event_types[i].type == type) {
164                         filter_str = supported_event_types[i].str;
165                         break;
166                 }
167         }
168
169         return filter_str;
170 }
171
172 static void ast_event_cb(const struct ast_event *ast_event, void *data)
173 {
174         SaEvtEventHandleT event_handle;
175         SaAisErrorT ais_res;
176         struct event_channel *event_channel = data;
177         SaClmClusterNodeT local_node;
178         SaEvtEventPatternArrayT pattern_array;
179         SaEvtEventPatternT pattern;
180         SaSizeT len;
181         const char *filter_str;
182         SaEvtEventIdT event_id;
183
184         ast_log(LOG_DEBUG, "Got an event to forward\n");
185
186         if (ast_eid_cmp(&g_eid, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
187                 /* If the event didn't originate from this server, don't send it back out. */
188                 ast_log(LOG_DEBUG, "Returning here\n");
189                 return;
190         }
191
192         ais_res = saEvtEventAllocate(event_channel->handle, &event_handle);
193         if (ais_res != SA_AIS_OK) {
194                 ast_log(LOG_ERROR, "Error allocating event: %s\n", ais_err2str(ais_res));
195                 ast_log(LOG_DEBUG, "Returning here\n");
196                 return;
197         }
198         
199         ais_res = saClmClusterNodeGet(clm_handle, SA_CLM_LOCAL_NODE_ID, 
200                 SA_TIME_ONE_SECOND, &local_node);
201         if (ais_res != SA_AIS_OK) {
202                 ast_log(LOG_ERROR, "Error getting local node name: %s\n", ais_err2str(ais_res));
203                 goto return_event_free;
204         }
205
206         filter_str = type_to_filter_str(ast_event_get_type(ast_event));
207         len = strlen(filter_str) + 1;
208         pattern.pattern = (SaUint8T *) filter_str;
209         pattern.patternSize = len;
210         pattern.allocatedSize = len;
211
212         pattern_array.allocatedNumber = 1;
213         pattern_array.patternsNumber = 1;
214         pattern_array.patterns = &pattern;
215
216         /*! 
217          * /todo Make retention time configurable 
218          * /todo Make event priorities configurable
219          */
220         ais_res = saEvtEventAttributesSet(event_handle, &pattern_array,
221                 SA_EVT_LOWEST_PRIORITY, SA_TIME_ONE_MINUTE, &local_node.nodeName);
222         if (ais_res != SA_AIS_OK) {
223                 ast_log(LOG_ERROR, "Error setting event attributes: %s\n", ais_err2str(ais_res));
224                 goto return_event_free;
225         }
226
227         ais_res = saEvtEventPublish(event_handle, 
228                 ast_event, ast_event_get_size(ast_event), &event_id);
229         if (ais_res != SA_AIS_OK) {
230                 ast_log(LOG_ERROR, "Error publishing event: %s\n", ais_err2str(ais_res));
231                 goto return_event_free;
232         }
233
234 return_event_free:
235         ais_res = saEvtEventFree(event_handle);
236         if (ais_res != SA_AIS_OK) {
237                 ast_log(LOG_ERROR, "Error freeing allocated event: %s\n", ais_err2str(ais_res));
238         }
239         ast_log(LOG_DEBUG, "Returning here (event_free)\n");
240 }
241
242 static char *ais_evt_show_event_channels(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
243 {
244         struct event_channel *event_channel;
245
246         switch (cmd) {
247         case CLI_INIT:
248                 e->command = "ais show evt event channels";
249                 e->usage =
250                         "Usage: ais show evt event channels\n"
251                         "       List configured event channels for the (EVT) Eventing service.\n";
252                 return NULL;
253
254         case CLI_GENERATE:
255                 return NULL;    /* no completion */
256         }
257
258         if (a->argc != e->args)
259                 return CLI_SHOWUSAGE;
260
261         ast_cli(a->fd, "\n"
262                     "=============================================================\n"
263                     "=== Event Channels ==========================================\n"
264                     "=============================================================\n"
265                     "===\n");
266
267         AST_RWLIST_RDLOCK(&event_channels);
268         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
269                 struct publish_event *publish_event;
270                 struct subscribe_event *subscribe_event;
271
272                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
273                                "=== Event Channel Name: %s\n", event_channel->name);
274
275                 AST_LIST_TRAVERSE(&event_channel->publish_events, publish_event, entry) {
276                         ast_cli(a->fd, "=== ==> Publishing Event Type: %s\n", 
277                                 type_to_filter_str(publish_event->type));
278                 }
279                 
280                 AST_LIST_TRAVERSE(&event_channel->subscribe_events, subscribe_event, entry) {
281                         ast_cli(a->fd, "=== ==> Subscribing to Event Type: %s\n", 
282                                 type_to_filter_str(subscribe_event->type));
283                 }
284
285                 ast_cli(a->fd, "=== ---------------------------------------------------------\n"
286                                "===\n");
287         }
288         AST_RWLIST_UNLOCK(&event_channels);
289
290         ast_cli(a->fd, "=============================================================\n"
291                        "\n");
292
293         return CLI_SUCCESS;
294 }
295
296 static struct ast_cli_entry ais_cli[] = {
297         AST_CLI_DEFINE(ais_evt_show_event_channels, "Show configured event channels"),
298 };
299
300 static void add_publish_event(struct event_channel *event_channel, const char *event_type)
301 {
302         int i;
303         enum ast_event_type type = -1;
304         struct publish_event *publish_event;
305
306         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
307                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
308                         type = supported_event_types[i].type;
309                         break;
310                 }
311         }
312
313         if (type == -1) {
314                 ast_log(LOG_WARNING, "publish_event option given with invalid value '%s'\n", event_type);
315                 return;
316         }
317
318         if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
319                 return;
320         }
321
322         if (!(publish_event = ast_calloc(1, sizeof(*publish_event)))) {
323                 return;
324         }
325
326         publish_event->type = type;
327         ast_log(LOG_DEBUG, "Subscribing to event type %d\n", type);
328         publish_event->sub = ast_event_subscribe(type, ast_event_cb, event_channel,
329                 AST_EVENT_IE_END);
330         ast_event_dump_cache(publish_event->sub);
331
332         AST_LIST_INSERT_TAIL(&event_channel->publish_events, publish_event, entry);
333 }
334
335 static SaAisErrorT set_egress_subscription(struct event_channel *event_channel,
336         struct subscribe_event *subscribe_event)
337 {
338         SaAisErrorT ais_res;
339         SaEvtEventFilterArrayT filter_array;
340         SaEvtEventFilterT filter;
341         const char *filter_str = NULL;
342         SaSizeT len;
343
344         /* We know it's going to be valid.  It was checked earlier. */
345         filter_str = type_to_filter_str(subscribe_event->type);
346
347         filter.filterType = SA_EVT_EXACT_FILTER;
348         len = strlen(filter_str) + 1;
349         filter.filter.allocatedSize = len;
350         filter.filter.patternSize = len;
351         filter.filter.pattern = (SaUint8T *) filter_str;
352
353         filter_array.filtersNumber = 1;
354         filter_array.filters = &filter;
355
356         ais_res = saEvtEventSubscribe(event_channel->handle, &filter_array, 
357                 subscribe_event->id);
358
359         return ais_res;
360 }
361
362 static void add_subscribe_event(struct event_channel *event_channel, const char *event_type)
363 {
364         int i;
365         enum ast_event_type type = -1;
366         struct subscribe_event *subscribe_event;
367         SaAisErrorT ais_res;
368
369         for (i = 0; i < ARRAY_LEN(supported_event_types); i++) {
370                 if (!strcasecmp(event_type, supported_event_types[i].str)) {
371                         type = supported_event_types[i].type;
372                         break;
373                 }
374         }
375
376         if (type == -1) {
377                 ast_log(LOG_WARNING, "subscribe_event option given with invalid value '%s'\n", event_type);
378                 return;
379         }
380
381         if (type == AST_EVENT_DEVICE_STATE_CHANGE && ast_enable_distributed_devstate()) {
382                 return;
383         }
384
385         if (!(subscribe_event = ast_calloc(1, sizeof(*subscribe_event)))) {
386                 return;
387         }
388
389         subscribe_event->type = type;
390         subscribe_event->id = ast_atomic_fetchadd_int(&unique_id, +1);
391
392         ais_res = set_egress_subscription(event_channel, subscribe_event);
393         if (ais_res != SA_AIS_OK) {
394                 ast_log(LOG_ERROR, "Error setting up egress subscription: %s\n",
395                         ais_err2str(ais_res));
396                 free(subscribe_event);
397                 return;
398         }
399
400         AST_LIST_INSERT_TAIL(&event_channel->subscribe_events, subscribe_event, entry);
401 }
402
403 static void build_event_channel(struct ast_config *cfg, const char *cat)
404 {
405         struct ast_variable *var;
406         struct event_channel *event_channel;
407         SaAisErrorT ais_res;
408         SaNameT sa_name = { 0, };
409
410         AST_RWLIST_WRLOCK(&event_channels);
411         AST_RWLIST_TRAVERSE(&event_channels, event_channel, entry) {
412                 if (!strcasecmp(event_channel->name, cat))
413                         break;
414         }
415         AST_RWLIST_UNLOCK(&event_channels);
416         if (event_channel) {
417                 ast_log(LOG_WARNING, "Event channel '%s' was specified twice in "
418                         "configuration.  Second instance ignored.\n", cat);
419                 return;
420         }
421
422         if (!(event_channel = ast_calloc(1, sizeof(*event_channel) + strlen(cat))))
423                 return;
424
425         strcpy(event_channel->name, cat);
426         ast_copy_string((char *) sa_name.value, cat, sizeof(sa_name.value));
427         sa_name.length = strlen((char *) sa_name.value);
428         ais_res = saEvtChannelOpen(evt_handle, &sa_name, 
429                 SA_EVT_CHANNEL_PUBLISHER | SA_EVT_CHANNEL_SUBSCRIBER | SA_EVT_CHANNEL_CREATE,
430                 SA_TIME_MAX, &event_channel->handle);
431         if (ais_res != SA_AIS_OK) {
432                 ast_log(LOG_ERROR, "Error opening event channel: %s\n", ais_err2str(ais_res));
433                 free(event_channel);
434                 return;
435         }
436
437         for (var = ast_variable_browse(cfg, cat); var; var = var->next) {
438                 if (!strcasecmp(var->name, "type")) {
439                         continue;
440                 } else if (!strcasecmp(var->name, "publish_event")) {
441                         add_publish_event(event_channel, var->value);
442                 } else if (!strcasecmp(var->name, "subscribe_event")) {
443                         add_subscribe_event(event_channel, var->value);
444                 } else {
445                         ast_log(LOG_WARNING, "Event channel '%s' contains invalid option '%s'\n",
446                                 event_channel->name, var->name);
447                 }
448         }
449
450         AST_RWLIST_WRLOCK(&event_channels);
451         AST_RWLIST_INSERT_TAIL(&event_channels, event_channel, entry);
452         AST_RWLIST_UNLOCK(&event_channels);
453 }
454
455 static void load_config(void)
456 {
457         static const char filename[] = "ais.conf";
458         struct ast_config *cfg;
459         const char *cat = NULL;
460         struct ast_flags config_flags = { 0 };
461
462         if (!(cfg = ast_config_load(filename, config_flags)) || cfg == CONFIG_STATUS_FILEINVALID)
463                 return;
464
465         while ((cat = ast_category_browse(cfg, cat))) {
466                 const char *type;
467
468                 if (!strcasecmp(cat, "general"))
469                         continue;
470
471                 if (!(type = ast_variable_retrieve(cfg, cat, "type"))) {
472                         ast_log(LOG_WARNING, "Invalid entry in %s defined with no type!\n",
473                                 filename);
474                         continue;
475                 }
476
477                 if (!strcasecmp(type, "event_channel")) {
478                         build_event_channel(cfg, cat);
479                 } else {
480                         ast_log(LOG_WARNING, "Entry in %s defined with invalid type '%s'\n", 
481                                 filename, type);
482                 }
483         }
484
485         ast_config_destroy(cfg);
486 }
487
488 static void publish_event_destroy(struct publish_event *publish_event)
489 {
490         ast_event_unsubscribe(publish_event->sub);
491
492         free(publish_event);
493 }
494
495 static void subscribe_event_destroy(const struct event_channel *event_channel,
496         struct subscribe_event *subscribe_event)
497 {
498         SaAisErrorT ais_res;
499
500         /* saEvtChannelClose() will actually do this automatically, but it just
501          * feels cleaner to go ahead and do it manually ... */
502         ais_res = saEvtEventUnsubscribe(event_channel->handle, subscribe_event->id);
503         if (ais_res != SA_AIS_OK) {
504                 ast_log(LOG_ERROR, "Error unsubscribing: %s\n", ais_err2str(ais_res));
505         }
506
507         free(subscribe_event);
508 }
509
510 static void event_channel_destroy(struct event_channel *event_channel)
511 {
512         struct publish_event *publish_event;
513         struct subscribe_event *subscribe_event;
514         SaAisErrorT ais_res;
515
516         while ((publish_event = AST_LIST_REMOVE_HEAD(&event_channel->publish_events, entry)))
517                 publish_event_destroy(publish_event);
518         while ((subscribe_event = AST_LIST_REMOVE_HEAD(&event_channel->subscribe_events, entry)))
519                 subscribe_event_destroy(event_channel, subscribe_event);
520
521         ais_res = saEvtChannelClose(event_channel->handle);
522         if (ais_res != SA_AIS_OK) {
523                 ast_log(LOG_ERROR, "Error closing event channel '%s': %s\n",
524                         event_channel->name, ais_err2str(ais_res));
525         }
526
527         free(event_channel);
528 }
529
530 static void destroy_event_channels(void)
531 {
532         struct event_channel *event_channel;
533
534         AST_RWLIST_WRLOCK(&event_channels);
535         while ((event_channel = AST_RWLIST_REMOVE_HEAD(&event_channels, entry)))
536                 event_channel_destroy(event_channel);
537         AST_RWLIST_UNLOCK(&event_channels);
538 }
539
540 int ast_ais_evt_load_module(void)
541 {
542         SaAisErrorT ais_res;
543
544         ais_res = saEvtInitialize(&evt_handle, &evt_callbacks, &ais_version);
545         if (ais_res != SA_AIS_OK) {
546                 ast_log(LOG_ERROR, "Could not initialize eventing service: %s\n",
547                         ais_err2str(ais_res));
548                 return -1;
549         }
550         
551         load_config();
552
553         ast_cli_register_multiple(ais_cli, ARRAY_LEN(ais_cli));
554
555         return 0;
556 }
557
558 int ast_ais_evt_unload_module(void)
559 {
560         SaAisErrorT ais_res;
561
562         destroy_event_channels();
563
564         ais_res = saEvtFinalize(evt_handle);
565         if (ais_res != SA_AIS_OK) {
566                 ast_log(LOG_ERROR, "Problem stopping eventing service: %s\n", 
567                         ais_err2str(ais_res));
568                 return -1;
569         }
570
571         return 0;       
572 }