Refactored the rest of the message types to use the STASIS_MESSAGE_TYPE_*
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! The number of buckets to use for topic pools */
45 #define TOPIC_POOL_BUCKETS 57
46
47 /*! Threadpool for dispatching notifications to subscribers */
48 static struct ast_threadpool *pool;
49
50 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
51
52 /*! \internal */
53 struct stasis_topic {
54         char *name;
55         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
56         struct stasis_subscription **subscribers;
57         /*! Allocated length of the subscribers array */
58         size_t num_subscribers_max;
59         /*! Current size of the subscribers array */
60         size_t num_subscribers_current;
61 };
62
63 /* Forward declarations for the tightly-coupled subscription object */
64 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
65
66 static void topic_dtor(void *obj)
67 {
68         struct stasis_topic *topic = obj;
69         ast_free(topic->name);
70         topic->name = NULL;
71         ast_free(topic->subscribers);
72         topic->subscribers = NULL;
73 }
74
75 struct stasis_topic *stasis_topic_create(const char *name)
76 {
77         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
78
79         topic = ao2_alloc(sizeof(*topic), topic_dtor);
80
81         if (!topic) {
82                 return NULL;
83         }
84
85         topic->name = ast_strdup(name);
86         if (!topic->name) {
87                 return NULL;
88         }
89
90         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
91         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
92         if (!topic->subscribers) {
93                 return NULL;
94         }
95
96         ao2_ref(topic, +1);
97         return topic;
98 }
99
100 const char *stasis_topic_name(const struct stasis_topic *topic)
101 {
102         return topic->name;
103 }
104
105 /*! \internal */
106 struct stasis_subscription {
107         /*! Unique ID for this subscription */
108         char uniqueid[AST_UUID_STR_LEN];
109         /*! Topic subscribed to. */
110         struct stasis_topic *topic;
111         /*! Mailbox for processing incoming messages. */
112         struct ast_taskprocessor *mailbox;
113         /*! Callback function for incoming message processing. */
114         stasis_subscription_cb callback;
115         /*! Data pointer to be handed to the callback. */
116         void *data;
117 };
118
119 static void subscription_dtor(void *obj)
120 {
121         struct stasis_subscription *sub = obj;
122         ast_assert(!stasis_subscription_is_subscribed(sub));
123         ao2_cleanup(sub->topic);
124         sub->topic = NULL;
125         ast_taskprocessor_unreference(sub->mailbox);
126         sub->mailbox = NULL;
127 }
128
129 /*!
130  * \brief Invoke the subscription's callback.
131  * \param sub Subscription to invoke.
132  * \param topic Topic message was published to.
133  * \param message Message to send.
134  */
135 static void subscription_invoke(struct stasis_subscription *sub,
136                                   struct stasis_topic *topic,
137                                   struct stasis_message *message)
138 {
139         /* Since sub->topic doesn't change, no need to lock sub */
140         sub->callback(sub->data,
141                       sub,
142                       topic,
143                       message);
144 }
145
146 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
147
148 static struct stasis_subscription *__stasis_subscribe(
149         struct stasis_topic *topic,
150         stasis_subscription_cb callback,
151         void *data,
152         int needs_mailbox)
153 {
154         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
155
156         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
157         if (!sub) {
158                 return NULL;
159         }
160
161         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
162
163         if (needs_mailbox) {
164                 sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
165                 if (!sub->mailbox) {
166                         return NULL;
167                 }
168         }
169
170         ao2_ref(topic, +1);
171         sub->topic = topic;
172         sub->callback = callback;
173         sub->data = data;
174
175         if (topic_add_subscription(topic, sub) != 0) {
176                 return NULL;
177         }
178         send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
179
180         ao2_ref(sub, +1);
181         return sub;
182 }
183
184 struct stasis_subscription *stasis_subscribe(
185         struct stasis_topic *topic,
186         stasis_subscription_cb callback,
187         void *data)
188 {
189         return __stasis_subscribe(topic, callback, data, 1);
190 }
191
192 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
193 {
194         if (sub) {
195                 size_t i;
196                 struct stasis_topic *topic = sub->topic;
197                 SCOPED_AO2LOCK(lock_topic, topic);
198
199                 for (i = 0; i < topic->num_subscribers_current; ++i) {
200                         if (topic->subscribers[i] == sub) {
201                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
202                                 /* swap [i] with last entry; remove last entry */
203                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
204                                 /* Unsubscribing unrefs the subscription */
205                                 ao2_cleanup(sub);
206                                 return NULL;
207                         }
208                 }
209
210                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
211         }
212         return NULL;
213 }
214
215 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
216 {
217         if (sub) {
218                 size_t i;
219                 struct stasis_topic *topic = sub->topic;
220                 SCOPED_AO2LOCK(lock_topic, topic);
221
222                 for (i = 0; i < topic->num_subscribers_current; ++i) {
223                         if (topic->subscribers[i] == sub) {
224                                 return 1;
225                         }
226                 }
227         }
228
229         return 0;
230 }
231
232 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
233 {
234         return sub->uniqueid;
235 }
236
237 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
238 {
239         struct stasis_subscription_change *change;
240         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
241                 return 0;
242         }
243
244         change = stasis_message_data(msg);
245         if (strcmp("Unsubscribe", change->description)) {
246                 return 0;
247         }
248
249         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
250                 return 0;
251         }
252
253         return 1;
254 }
255
256 /*!
257  * \brief Add a subscriber to a topic.
258  * \param topic Topic
259  * \param sub Subscriber
260  * \return 0 on success
261  * \return Non-zero on error
262  */
263 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
264 {
265         struct stasis_subscription **subscribers;
266         SCOPED_AO2LOCK(lock, topic);
267
268         /* Increase list size, if needed */
269         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
270                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
271                 if (!subscribers) {
272                         return -1;
273                 }
274                 topic->subscribers = subscribers;
275                 topic->num_subscribers_max *= 2;
276         }
277
278         /* Don't ref sub here or we'll cause a reference cycle. */
279         topic->subscribers[topic->num_subscribers_current++] = sub;
280         return 0;
281 }
282
283 /*!
284  * \internal
285  * \brief Information needed to dispatch a message to a subscription
286  */
287 struct dispatch {
288         /*! Topic message was published to */
289         struct stasis_topic *topic;
290         /*! The message itself */
291         struct stasis_message *message;
292         /*! Subscription receiving the message */
293         struct stasis_subscription *sub;
294 };
295
296 static void dispatch_dtor(void *data)
297 {
298         struct dispatch *dispatch = data;
299         ao2_cleanup(dispatch->topic);
300         ao2_cleanup(dispatch->message);
301         ao2_cleanup(dispatch->sub);
302 }
303
304 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
305 {
306         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
307
308         ast_assert(topic != NULL);
309         ast_assert(message != NULL);
310         ast_assert(sub != NULL);
311
312         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
313         if (!dispatch) {
314                 return NULL;
315         }
316
317         dispatch->topic = topic;
318         ao2_ref(topic, +1);
319
320         dispatch->message = message;
321         ao2_ref(message, +1);
322
323         dispatch->sub = sub;
324         ao2_ref(sub, +1);
325
326         ao2_ref(dispatch, +1);
327         return dispatch;
328 }
329
330 /*!
331  * \brief Dispatch a message to a subscriber
332  * \param data \ref dispatch object
333  * \return 0
334  */
335 static int dispatch_exec(void *data)
336 {
337         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
338
339         subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
340
341         return 0;
342 }
343
344 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
345 {
346         size_t i;
347         SCOPED_AO2LOCK(lock, topic);
348
349         ast_assert(topic != NULL);
350         ast_assert(publisher_topic != NULL);
351         ast_assert(message != NULL);
352
353         for (i = 0; i < topic->num_subscribers_current; ++i) {
354                 struct stasis_subscription *sub = topic->subscribers[i];
355
356                 ast_assert(sub != NULL);
357
358                 if (sub->mailbox) {
359                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
360
361                         dispatch = dispatch_create(publisher_topic, message, sub);
362                         if (!dispatch) {
363                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
364                                 break;
365                         }
366
367                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
368                                 /* Ownership transferred to mailbox.
369                                  * Don't increment ref, b/c the task processor
370                                  * may have already gotten rid of the object.
371                                  */
372                                 dispatch = NULL;
373                         }
374                 } else {
375                         /* Dispatch directly */
376                         subscription_invoke(sub, publisher_topic, message);
377                 }
378         }
379 }
380
381 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
382 {
383         stasis_forward_message(topic, topic, message);
384 }
385
386 /*! \brief Forwarding subscriber */
387 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
388 {
389         struct stasis_topic *to_topic = data;
390         stasis_forward_message(to_topic, topic, message);
391
392         if (stasis_subscription_final_message(sub, message)) {
393                 ao2_cleanup(to_topic);
394         }
395 }
396
397 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
398 {
399         struct stasis_subscription *sub;
400         if (!from_topic || !to_topic) {
401                 return NULL;
402         }
403
404         /* Forwarding subscriptions should dispatch directly instead of having a
405          * mailbox. Otherwise, messages forwarded to the same topic from
406          * different topics may get reordered. Which is bad.
407          */
408         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
409         if (sub) {
410                 /* hold a ref to to_topic for this forwarding subscription */
411                 ao2_ref(to_topic, +1);
412         }
413         return sub;
414 }
415
416 static void subscription_change_dtor(void *obj)
417 {
418         struct stasis_subscription_change *change = obj;
419         ast_string_field_free_memory(change);
420         ao2_cleanup(change->topic);
421 }
422
423 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
424 {
425         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
426
427         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
428         if (ast_string_field_init(change, 128)) {
429                 return NULL;
430         }
431
432         ast_string_field_set(change, uniqueid, uniqueid);
433         ast_string_field_set(change, description, description);
434         ao2_ref(topic, +1);
435         change->topic = topic;
436
437         ao2_ref(change, +1);
438         return change;
439 }
440
441 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
442 {
443         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
444         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
445
446         change = subscription_change_alloc(topic, uniqueid, description);
447
448         if (!change) {
449                 return;
450         }
451
452         msg = stasis_message_create(stasis_subscription_change_type(), change);
453
454         if (!msg) {
455                 return;
456         }
457
458         stasis_publish(topic, msg);
459 }
460
461 struct topic_pool_entry {
462         struct stasis_subscription *forward;
463         struct stasis_topic *topic;
464 };
465
466 static void topic_pool_entry_dtor(void *obj)
467 {
468         struct topic_pool_entry *entry = obj;
469         entry->forward = stasis_unsubscribe(entry->forward);
470         ao2_cleanup(entry->topic);
471         entry->topic = NULL;
472 }
473
474 static struct topic_pool_entry *topic_pool_entry_alloc(void)
475 {
476         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
477 }
478
479 struct stasis_topic_pool {
480         struct ao2_container *pool_container;
481         struct stasis_topic *pool_topic;
482 };
483
484 static void topic_pool_dtor(void *obj)
485 {
486         struct stasis_topic_pool *pool = obj;
487         ao2_cleanup(pool->pool_container);
488         pool->pool_container = NULL;
489         ao2_cleanup(pool->pool_topic);
490         pool->pool_topic = NULL;
491 }
492
493 static int topic_pool_entry_hash(const void *obj, const int flags)
494 {
495         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
496         return ast_str_case_hash(topic_name);
497 }
498
499 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
500 {
501         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
502         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
503         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
504 }
505
506 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
507 {
508         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
509         if (!pool) {
510                 return NULL;
511         }
512         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
513         ao2_ref(pooled_topic, +1);
514         pool->pool_topic = pooled_topic;
515
516         ao2_ref(pool, +1);
517         return pool;
518 }
519
520 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
521 {
522         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
523         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
524         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
525
526         if (topic_pool_entry) {
527                 return topic_pool_entry->topic;
528         }
529
530         topic_pool_entry = topic_pool_entry_alloc();
531
532         if (!topic_pool_entry) {
533                 return NULL;
534         }
535
536         topic_pool_entry->topic = stasis_topic_create(topic_name);
537         if (!topic_pool_entry->topic) {
538                 return NULL;
539         }
540
541         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
542         if (!topic_pool_entry->forward) {
543                 return NULL;
544         }
545
546         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
547
548         return topic_pool_entry->topic;
549 }
550
551 /*! \brief Cleanup function */
552 static void stasis_exit(void)
553 {
554         ast_threadpool_shutdown(pool);
555         pool = NULL;
556         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
557 }
558
559 int stasis_init(void)
560 {
561         int cache_init;
562
563         /* XXX Should this be configurable? */
564         struct ast_threadpool_options opts = {
565                 .version = AST_THREADPOOL_OPTIONS_VERSION,
566                 .idle_timeout = 20,
567                 .auto_increment = 1,
568                 .initial_size = 0,
569                 .max_size = 200
570         };
571
572         ast_register_atexit(stasis_exit);
573
574         if (pool) {
575                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
576                 return -1;
577         }
578
579         pool = ast_threadpool_create("stasis-core", NULL, &opts);
580         if (!pool) {
581                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
582                 return -1;
583         }
584
585         cache_init = stasis_cache_init();
586         if (cache_init != 0) {
587                 return -1;
588         }
589
590         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
591                 return -1;
592         }
593
594         return 0;
595 }