2ad0caf93b4699ce714a9ae6d1028042a9bbde45
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! The number of buckets to use for topic pools */
45 #define TOPIC_POOL_BUCKETS 57
46
47 /*! Threadpool for dispatching notifications to subscribers */
48 static struct ast_threadpool *pool;
49
50 static struct stasis_message_type *__subscription_change_message_type;
51
52 /*! \internal */
53 struct stasis_topic {
54         char *name;
55         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
56         struct stasis_subscription **subscribers;
57         /*! Allocated length of the subscribers array */
58         size_t num_subscribers_max;
59         /*! Current size of the subscribers array */
60         size_t num_subscribers_current;
61 };
62
63 /* Forward declarations for the tightly-coupled subscription object */
64 struct stasis_subscription;
65 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
66
67 static void topic_dtor(void *obj)
68 {
69         struct stasis_topic *topic = obj;
70         ast_free(topic->name);
71         topic->name = NULL;
72         ast_free(topic->subscribers);
73         topic->subscribers = NULL;
74 }
75
76 struct stasis_topic *stasis_topic_create(const char *name)
77 {
78         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
79
80         topic = ao2_alloc(sizeof(*topic), topic_dtor);
81
82         if (!topic) {
83                 return NULL;
84         }
85
86         topic->name = ast_strdup(name);
87         if (!topic->name) {
88                 return NULL;
89         }
90
91         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
92         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
93         if (!topic->subscribers) {
94                 return NULL;
95         }
96
97         ao2_ref(topic, +1);
98         return topic;
99 }
100
101 const char *stasis_topic_name(const struct stasis_topic *topic)
102 {
103         return topic->name;
104 }
105
106 /*! \internal */
107 struct stasis_subscription {
108         /*! Unique ID for this subscription */
109         char *uniqueid;
110         /*! Topic subscribed to. */
111         struct stasis_topic *topic;
112         /*! Mailbox for processing incoming messages. */
113         struct ast_taskprocessor *mailbox;
114         /*! Callback function for incoming message processing. */
115         stasis_subscription_cb callback;
116         /*! Data pointer to be handed to the callback. */
117         void *data;
118 };
119
120 static void subscription_dtor(void *obj)
121 {
122         struct stasis_subscription *sub = obj;
123         ast_assert(!stasis_subscription_is_subscribed(sub));
124         ast_free(sub->uniqueid);
125         sub->uniqueid = NULL;
126         ao2_cleanup(sub->topic);
127         sub->topic = NULL;
128         ast_taskprocessor_unreference(sub->mailbox);
129         sub->mailbox = NULL;
130 }
131
132 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
133
134 static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
135 {
136         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
137         RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
138         char uniqueid[AST_UUID_STR_LEN];
139
140         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
141         if (!sub) {
142                 return NULL;
143         }
144
145         id = ast_uuid_generate();
146         if (!id) {
147                 ast_log(LOG_ERROR, "UUID generation failed\n");
148                 return NULL;
149         }
150         ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
151         if (needs_mailbox) {
152                 sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
153                 if (!sub->mailbox) {
154                         return NULL;
155                 }
156         }
157
158         sub->uniqueid = ast_strdup(uniqueid);
159         ao2_ref(topic, +1);
160         sub->topic = topic;
161         sub->callback = callback;
162         sub->data = data;
163
164         if (topic_add_subscription(topic, sub) != 0) {
165                 return NULL;
166         }
167         send_subscription_change_message(topic, uniqueid, "Subscribe");
168
169         ao2_ref(sub, +1);
170         return sub;
171 }
172
173 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
174 {
175         return __stasis_subscribe(topic, callback, data, 1);
176 }
177
178 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
179 {
180         if (sub) {
181                 size_t i;
182                 struct stasis_topic *topic = sub->topic;
183                 SCOPED_AO2LOCK(lock_topic, topic);
184
185                 for (i = 0; i < topic->num_subscribers_current; ++i) {
186                         if (topic->subscribers[i] == sub) {
187                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
188                                 /* swap [i] with last entry; remove last entry */
189                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
190                                 /* Unsubscribing unrefs the subscription */
191                                 ao2_cleanup(sub);
192                                 return NULL;
193                         }
194                 }
195
196                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
197         }
198         return NULL;
199 }
200
201 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
202 {
203         if (sub) {
204                 size_t i;
205                 struct stasis_topic *topic = sub->topic;
206                 SCOPED_AO2LOCK(lock_topic, topic);
207
208                 for (i = 0; i < topic->num_subscribers_current; ++i) {
209                         if (topic->subscribers[i] == sub) {
210                                 return 1;
211                         }
212                 }
213         }
214
215         return 0;
216 }
217
218 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
219 {
220         return sub->uniqueid;
221 }
222
223 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
224 {
225         struct stasis_subscription_change *change;
226         if (stasis_message_type(msg) != stasis_subscription_change()) {
227                 return 0;
228         }
229
230         change = stasis_message_data(msg);
231         if (strcmp("Unsubscribe", change->description)) {
232                 return 0;
233         }
234
235         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
236                 return 0;
237         }
238
239         return 1;
240 }
241
242 /*!
243  * \brief Add a subscriber to a topic.
244  * \param topic Topic
245  * \param sub Subscriber
246  * \return 0 on success
247  * \return Non-zero on error
248  */
249 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
250 {
251         struct stasis_subscription **subscribers;
252         SCOPED_AO2LOCK(lock, topic);
253
254         /* Increase list size, if needed */
255         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
256                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
257                 if (!subscribers) {
258                         return -1;
259                 }
260                 topic->subscribers = subscribers;
261                 topic->num_subscribers_max *= 2;
262         }
263
264         /* Don't ref sub here or we'll cause a reference cycle. */
265         topic->subscribers[topic->num_subscribers_current++] = sub;
266         return 0;
267 }
268
269 /*!
270  * \internal
271  * \brief Information needed to dispatch a message to a subscription
272  */
273 struct dispatch {
274         /*! Topic message was published to */
275         struct stasis_topic *topic;
276         /*! The message itself */
277         struct stasis_message *message;
278         /*! Subscription receiving the message */
279         struct stasis_subscription *sub;
280 };
281
282 static void dispatch_dtor(void *data)
283 {
284         struct dispatch *dispatch = data;
285         ao2_cleanup(dispatch->topic);
286         ao2_cleanup(dispatch->message);
287         ao2_cleanup(dispatch->sub);
288 }
289
290 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
291 {
292         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
293
294         ast_assert(topic != NULL);
295         ast_assert(message != NULL);
296         ast_assert(sub != NULL);
297
298         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
299         if (!dispatch) {
300                 return NULL;
301         }
302
303         dispatch->topic = topic;
304         ao2_ref(topic, +1);
305
306         dispatch->message = message;
307         ao2_ref(message, +1);
308
309         dispatch->sub = sub;
310         ao2_ref(sub, +1);
311
312         ao2_ref(dispatch, +1);
313         return dispatch;
314 }
315
316 /*!
317  * \brief Dispatch a message to a subscriber
318  * \param data \ref dispatch object
319  * \return 0
320  */
321 static int dispatch_exec(void *data)
322 {
323         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
324         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
325
326         /* Since sub->topic doesn't change, no need to lock sub */
327         ast_assert(dispatch->sub->topic != NULL);
328         ao2_ref(dispatch->sub->topic, +1);
329         sub_topic = dispatch->sub->topic;
330
331         dispatch->sub->callback(dispatch->sub->data,
332                                 dispatch->sub,
333                                 sub_topic,
334                                 dispatch->message);
335
336         return 0;
337 }
338
339 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
340 {
341         struct stasis_subscription **subscribers = NULL;
342         size_t num_subscribers, i;
343
344         ast_assert(topic != NULL);
345         ast_assert(publisher_topic != NULL);
346         ast_assert(message != NULL);
347
348         /* Copy the subscribers, so we don't have to hold the mutex for long */
349         {
350                 SCOPED_AO2LOCK(lock, topic);
351                 num_subscribers = topic->num_subscribers_current;
352                 subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
353                 if (subscribers) {
354                         for (i = 0; i < num_subscribers; ++i) {
355                                 ao2_ref(topic->subscribers[i], +1);
356                                 subscribers[i] = topic->subscribers[i];
357                         }
358                 }
359         }
360
361         if (!subscribers) {
362                 ast_log(LOG_ERROR, "Dropping message\n");
363                 return;
364         }
365
366         for (i = 0; i < num_subscribers; ++i) {
367                 struct stasis_subscription *sub = subscribers[i];
368
369                 ast_assert(sub != NULL);
370
371                 if (sub->mailbox) {
372                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
373
374                         dispatch = dispatch_create(publisher_topic, message, sub);
375                         if (!dispatch) {
376                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
377                                 break;
378                         }
379
380                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
381                                 dispatch = NULL; /* Ownership transferred to mailbox */
382                         }
383                 } else {
384                         /* No mailbox; dispatch directly */
385                         sub->callback(sub->data, sub, sub->topic, message);
386                 }
387         }
388
389         for (i = 0; i < num_subscribers; ++i) {
390                 ao2_cleanup(subscribers[i]);
391         }
392         ast_free(subscribers);
393 }
394
395 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
396 {
397         stasis_forward_message(topic, topic, message);
398 }
399
400 /*! \brief Forwarding subscriber */
401 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
402 {
403         struct stasis_topic *to_topic = data;
404         stasis_forward_message(to_topic, topic, message);
405
406         if (stasis_subscription_final_message(sub, message)) {
407                 ao2_cleanup(to_topic);
408         }
409 }
410
411 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
412 {
413         struct stasis_subscription *sub;
414         if (!from_topic || !to_topic) {
415                 return NULL;
416         }
417         /* Subscribe without a mailbox, since we're just forwarding messages */
418         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
419         if (sub) {
420                 /* hold a ref to to_topic for this forwarding subscription */
421                 ao2_ref(to_topic, +1);
422         }
423         return sub;
424 }
425
426 static void subscription_change_dtor(void *obj)
427 {
428         struct stasis_subscription_change *change = obj;
429         ast_string_field_free_memory(change);
430         ao2_cleanup(change->topic);
431 }
432
433 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
434 {
435         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
436
437         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
438         if (ast_string_field_init(change, 128)) {
439                 return NULL;
440         }
441
442         ast_string_field_set(change, uniqueid, uniqueid);
443         ast_string_field_set(change, description, description);
444         ao2_ref(topic, +1);
445         change->topic = topic;
446
447         ao2_ref(change, +1);
448         return change;
449 }
450
451 struct stasis_message_type *stasis_subscription_change(void)
452 {
453         return __subscription_change_message_type;
454 }
455
456 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
457 {
458         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
459         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
460
461         change = subscription_change_alloc(topic, uniqueid, description);
462
463         if (!change) {
464                 return;
465         }
466
467         msg = stasis_message_create(stasis_subscription_change(), change);
468
469         if (!msg) {
470                 return;
471         }
472
473         stasis_publish(topic, msg);
474 }
475
476 struct topic_pool_entry {
477         struct stasis_subscription *forward;
478         struct stasis_topic *topic;
479 };
480
481 static void topic_pool_entry_dtor(void *obj)
482 {
483         struct topic_pool_entry *entry = obj;
484         entry->forward = stasis_unsubscribe(entry->forward);
485         ao2_cleanup(entry->topic);
486         entry->topic = NULL;
487 }
488
489 static struct topic_pool_entry *topic_pool_entry_alloc(void)
490 {
491         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
492 }
493
494 struct stasis_topic_pool {
495         struct ao2_container *pool_container;
496         struct stasis_topic *pool_topic;
497 };
498
499 static void topic_pool_dtor(void *obj)
500 {
501         struct stasis_topic_pool *pool = obj;
502         ao2_cleanup(pool->pool_container);
503         pool->pool_container = NULL;
504         ao2_cleanup(pool->pool_topic);
505         pool->pool_topic = NULL;
506 }
507
508 static int topic_pool_entry_hash(const void *obj, const int flags)
509 {
510         const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
511         return ast_str_case_hash(topic_name);
512 }
513
514 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
515 {
516         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
517         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
518         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
519 }
520
521 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
522 {
523         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
524         if (!pool) {
525                 return NULL;
526         }
527         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
528         ao2_ref(pooled_topic, +1);
529         pool->pool_topic = pooled_topic;
530
531         ao2_ref(pool, +1);
532         return pool;
533 }
534
535 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
536 {
537         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
538         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
539         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
540
541         if (topic_pool_entry) {
542                 return topic_pool_entry->topic;
543         }
544
545         topic_pool_entry = topic_pool_entry_alloc();
546
547         if (!topic_pool_entry) {
548                 return NULL;
549         }
550
551         topic_pool_entry->topic = stasis_topic_create(topic_name);
552         if (!topic_pool_entry->topic) {
553                 return NULL;
554         }
555
556         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
557         if (!topic_pool_entry->forward) {
558                 return NULL;
559         }
560
561         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
562
563         return topic_pool_entry->topic;
564 }
565
566 /*! \brief Cleanup function */
567 static void stasis_exit(void)
568 {
569         ao2_cleanup(__subscription_change_message_type);
570         __subscription_change_message_type = NULL;
571         ast_threadpool_shutdown(pool);
572         pool = NULL;
573 }
574
575 int stasis_init(void)
576 {
577         int cache_init;
578
579         /* XXX Should this be configurable? */
580         struct ast_threadpool_options opts = {
581                 .version = AST_THREADPOOL_OPTIONS_VERSION,
582                 .idle_timeout = 20,
583                 .auto_increment = 1,
584                 .initial_size = 0,
585                 .max_size = 200
586         };
587
588         ast_register_atexit(stasis_exit);
589
590         if (pool) {
591                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
592                 return -1;
593         }
594
595         pool = ast_threadpool_create("stasis-core", NULL, &opts);
596         if (!pool) {
597                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
598                 return -1;
599         }
600
601         cache_init = stasis_cache_init();
602         if (cache_init != 0) {
603                 return -1;
604         }
605
606         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
607         if (!__subscription_change_message_type) {
608                 return -1;
609         }
610
611         return 0;
612 }