Add uuid wrapper API call ast_uuid_generate_str().
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! The number of buckets to use for topic pools */
45 #define TOPIC_POOL_BUCKETS 57
46
47 /*! Threadpool for dispatching notifications to subscribers */
48 static struct ast_threadpool *pool;
49
50 static struct stasis_message_type *__subscription_change_message_type;
51
52 /*! \internal */
53 struct stasis_topic {
54         char *name;
55         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
56         struct stasis_subscription **subscribers;
57         /*! Allocated length of the subscribers array */
58         size_t num_subscribers_max;
59         /*! Current size of the subscribers array */
60         size_t num_subscribers_current;
61 };
62
63 /* Forward declarations for the tightly-coupled subscription object */
64 struct stasis_subscription;
65 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
66
67 static void topic_dtor(void *obj)
68 {
69         struct stasis_topic *topic = obj;
70         ast_free(topic->name);
71         topic->name = NULL;
72         ast_free(topic->subscribers);
73         topic->subscribers = NULL;
74 }
75
76 struct stasis_topic *stasis_topic_create(const char *name)
77 {
78         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
79
80         topic = ao2_alloc(sizeof(*topic), topic_dtor);
81
82         if (!topic) {
83                 return NULL;
84         }
85
86         topic->name = ast_strdup(name);
87         if (!topic->name) {
88                 return NULL;
89         }
90
91         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
92         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
93         if (!topic->subscribers) {
94                 return NULL;
95         }
96
97         ao2_ref(topic, +1);
98         return topic;
99 }
100
101 const char *stasis_topic_name(const struct stasis_topic *topic)
102 {
103         return topic->name;
104 }
105
106 /*! \internal */
107 struct stasis_subscription {
108         /*! Unique ID for this subscription */
109         char uniqueid[AST_UUID_STR_LEN];
110         /*! Topic subscribed to. */
111         struct stasis_topic *topic;
112         /*! Mailbox for processing incoming messages. */
113         struct ast_taskprocessor *mailbox;
114         /*! Callback function for incoming message processing. */
115         stasis_subscription_cb callback;
116         /*! Data pointer to be handed to the callback. */
117         void *data;
118 };
119
120 static void subscription_dtor(void *obj)
121 {
122         struct stasis_subscription *sub = obj;
123         ast_assert(!stasis_subscription_is_subscribed(sub));
124         ao2_cleanup(sub->topic);
125         sub->topic = NULL;
126         ast_taskprocessor_unreference(sub->mailbox);
127         sub->mailbox = NULL;
128 }
129
130 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
131
132 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
133 {
134         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
135
136         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
137         if (!sub) {
138                 return NULL;
139         }
140
141         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
142
143         sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
144         if (!sub->mailbox) {
145                 return NULL;
146         }
147
148         ao2_ref(topic, +1);
149         sub->topic = topic;
150         sub->callback = callback;
151         sub->data = data;
152
153         if (topic_add_subscription(topic, sub) != 0) {
154                 return NULL;
155         }
156         send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
157
158         ao2_ref(sub, +1);
159         return sub;
160 }
161
162 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
163 {
164         if (sub) {
165                 size_t i;
166                 struct stasis_topic *topic = sub->topic;
167                 SCOPED_AO2LOCK(lock_topic, topic);
168
169                 for (i = 0; i < topic->num_subscribers_current; ++i) {
170                         if (topic->subscribers[i] == sub) {
171                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
172                                 /* swap [i] with last entry; remove last entry */
173                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
174                                 /* Unsubscribing unrefs the subscription */
175                                 ao2_cleanup(sub);
176                                 return NULL;
177                         }
178                 }
179
180                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
181         }
182         return NULL;
183 }
184
185 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
186 {
187         if (sub) {
188                 size_t i;
189                 struct stasis_topic *topic = sub->topic;
190                 SCOPED_AO2LOCK(lock_topic, topic);
191
192                 for (i = 0; i < topic->num_subscribers_current; ++i) {
193                         if (topic->subscribers[i] == sub) {
194                                 return 1;
195                         }
196                 }
197         }
198
199         return 0;
200 }
201
202 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
203 {
204         return sub->uniqueid;
205 }
206
207 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
208 {
209         struct stasis_subscription_change *change;
210         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
211                 return 0;
212         }
213
214         change = stasis_message_data(msg);
215         if (strcmp("Unsubscribe", change->description)) {
216                 return 0;
217         }
218
219         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
220                 return 0;
221         }
222
223         return 1;
224 }
225
226 /*!
227  * \brief Add a subscriber to a topic.
228  * \param topic Topic
229  * \param sub Subscriber
230  * \return 0 on success
231  * \return Non-zero on error
232  */
233 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
234 {
235         struct stasis_subscription **subscribers;
236         SCOPED_AO2LOCK(lock, topic);
237
238         /* Increase list size, if needed */
239         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
240                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
241                 if (!subscribers) {
242                         return -1;
243                 }
244                 topic->subscribers = subscribers;
245                 topic->num_subscribers_max *= 2;
246         }
247
248         /* Don't ref sub here or we'll cause a reference cycle. */
249         topic->subscribers[topic->num_subscribers_current++] = sub;
250         return 0;
251 }
252
253 /*!
254  * \internal
255  * \brief Information needed to dispatch a message to a subscription
256  */
257 struct dispatch {
258         /*! Topic message was published to */
259         struct stasis_topic *topic;
260         /*! The message itself */
261         struct stasis_message *message;
262         /*! Subscription receiving the message */
263         struct stasis_subscription *sub;
264 };
265
266 static void dispatch_dtor(void *data)
267 {
268         struct dispatch *dispatch = data;
269         ao2_cleanup(dispatch->topic);
270         ao2_cleanup(dispatch->message);
271         ao2_cleanup(dispatch->sub);
272 }
273
274 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
275 {
276         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
277
278         ast_assert(topic != NULL);
279         ast_assert(message != NULL);
280         ast_assert(sub != NULL);
281
282         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
283         if (!dispatch) {
284                 return NULL;
285         }
286
287         dispatch->topic = topic;
288         ao2_ref(topic, +1);
289
290         dispatch->message = message;
291         ao2_ref(message, +1);
292
293         dispatch->sub = sub;
294         ao2_ref(sub, +1);
295
296         ao2_ref(dispatch, +1);
297         return dispatch;
298 }
299
300 /*!
301  * \brief Dispatch a message to a subscriber
302  * \param data \ref dispatch object
303  * \return 0
304  */
305 static int dispatch_exec(void *data)
306 {
307         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
308         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
309
310         /* Since sub->topic doesn't change, no need to lock sub */
311         ast_assert(dispatch->sub->topic != NULL);
312         ao2_ref(dispatch->sub->topic, +1);
313         sub_topic = dispatch->sub->topic;
314
315         dispatch->sub->callback(dispatch->sub->data,
316                                 dispatch->sub,
317                                 sub_topic,
318                                 dispatch->message);
319
320         return 0;
321 }
322
323 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
324 {
325         size_t i;
326         SCOPED_AO2LOCK(lock, topic);
327
328         ast_assert(topic != NULL);
329         ast_assert(publisher_topic != NULL);
330         ast_assert(message != NULL);
331
332         for (i = 0; i < topic->num_subscribers_current; ++i) {
333                 struct stasis_subscription *sub = topic->subscribers[i];
334                 RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
335
336                 ast_assert(sub != NULL);
337
338                 dispatch = dispatch_create(publisher_topic, message, sub);
339                 if (!dispatch) {
340                         ast_log(LOG_DEBUG, "Dropping dispatch\n");
341                         break;
342                 }
343
344                 if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
345                         dispatch = NULL; /* Ownership transferred to mailbox */
346                 }
347         }
348 }
349
350 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
351 {
352         stasis_forward_message(topic, topic, message);
353 }
354
355 /*! \brief Forwarding subscriber */
356 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
357 {
358         struct stasis_topic *to_topic = data;
359         stasis_forward_message(to_topic, topic, message);
360
361         if (stasis_subscription_final_message(sub, message)) {
362                 ao2_cleanup(to_topic);
363         }
364 }
365
366 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
367 {
368         struct stasis_subscription *sub;
369         if (!from_topic || !to_topic) {
370                 return NULL;
371         }
372
373         sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
374         if (sub) {
375                 /* hold a ref to to_topic for this forwarding subscription */
376                 ao2_ref(to_topic, +1);
377         }
378         return sub;
379 }
380
381 static void subscription_change_dtor(void *obj)
382 {
383         struct stasis_subscription_change *change = obj;
384         ast_string_field_free_memory(change);
385         ao2_cleanup(change->topic);
386 }
387
388 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
389 {
390         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
391
392         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
393         if (ast_string_field_init(change, 128)) {
394                 return NULL;
395         }
396
397         ast_string_field_set(change, uniqueid, uniqueid);
398         ast_string_field_set(change, description, description);
399         ao2_ref(topic, +1);
400         change->topic = topic;
401
402         ao2_ref(change, +1);
403         return change;
404 }
405
406 struct stasis_message_type *stasis_subscription_change_type(void)
407 {
408         return __subscription_change_message_type;
409 }
410
411 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
412 {
413         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
414         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
415
416         change = subscription_change_alloc(topic, uniqueid, description);
417
418         if (!change) {
419                 return;
420         }
421
422         msg = stasis_message_create(stasis_subscription_change_type(), change);
423
424         if (!msg) {
425                 return;
426         }
427
428         stasis_publish(topic, msg);
429 }
430
431 struct topic_pool_entry {
432         struct stasis_subscription *forward;
433         struct stasis_topic *topic;
434 };
435
436 static void topic_pool_entry_dtor(void *obj)
437 {
438         struct topic_pool_entry *entry = obj;
439         entry->forward = stasis_unsubscribe(entry->forward);
440         ao2_cleanup(entry->topic);
441         entry->topic = NULL;
442 }
443
444 static struct topic_pool_entry *topic_pool_entry_alloc(void)
445 {
446         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
447 }
448
449 struct stasis_topic_pool {
450         struct ao2_container *pool_container;
451         struct stasis_topic *pool_topic;
452 };
453
454 static void topic_pool_dtor(void *obj)
455 {
456         struct stasis_topic_pool *pool = obj;
457         ao2_cleanup(pool->pool_container);
458         pool->pool_container = NULL;
459         ao2_cleanup(pool->pool_topic);
460         pool->pool_topic = NULL;
461 }
462
463 static int topic_pool_entry_hash(const void *obj, const int flags)
464 {
465         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
466         return ast_str_case_hash(topic_name);
467 }
468
469 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
470 {
471         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
472         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
473         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
474 }
475
476 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
477 {
478         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
479         if (!pool) {
480                 return NULL;
481         }
482         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
483         ao2_ref(pooled_topic, +1);
484         pool->pool_topic = pooled_topic;
485
486         ao2_ref(pool, +1);
487         return pool;
488 }
489
490 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
491 {
492         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
493         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
494         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
495
496         if (topic_pool_entry) {
497                 return topic_pool_entry->topic;
498         }
499
500         topic_pool_entry = topic_pool_entry_alloc();
501
502         if (!topic_pool_entry) {
503                 return NULL;
504         }
505
506         topic_pool_entry->topic = stasis_topic_create(topic_name);
507         if (!topic_pool_entry->topic) {
508                 return NULL;
509         }
510
511         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
512         if (!topic_pool_entry->forward) {
513                 return NULL;
514         }
515
516         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
517
518         return topic_pool_entry->topic;
519 }
520
521 /*! \brief Cleanup function */
522 static void stasis_exit(void)
523 {
524         ao2_cleanup(__subscription_change_message_type);
525         __subscription_change_message_type = NULL;
526         ast_threadpool_shutdown(pool);
527         pool = NULL;
528 }
529
530 int stasis_init(void)
531 {
532         int cache_init;
533
534         /* XXX Should this be configurable? */
535         struct ast_threadpool_options opts = {
536                 .version = AST_THREADPOOL_OPTIONS_VERSION,
537                 .idle_timeout = 20,
538                 .auto_increment = 1,
539                 .initial_size = 0,
540                 .max_size = 200
541         };
542
543         ast_register_atexit(stasis_exit);
544
545         if (pool) {
546                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
547                 return -1;
548         }
549
550         pool = ast_threadpool_create("stasis-core", NULL, &opts);
551         if (!pool) {
552                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
553                 return -1;
554         }
555
556         cache_init = stasis_cache_init();
557         if (cache_init != 0) {
558                 return -1;
559         }
560
561         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
562         if (!__subscription_change_message_type) {
563                 return -1;
564         }
565
566         return 0;
567 }