Ensure dummy channels get a stasis topic.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! Threadpool for dispatching notifications to subscribers */
45 static struct ast_threadpool *pool;
46
47 static struct stasis_message_type *__subscription_change_message_type;
48
49 /*! \private */
50 struct stasis_topic {
51         char *name;
52         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
53         struct stasis_subscription **subscribers;
54         /*! Allocated length of the subscribers array */
55         size_t num_subscribers_max;
56         /*! Current size of the subscribers array */
57         size_t num_subscribers_current;
58 };
59
60 /* Forward declarations for the tightly-coupled subscription object */
61 struct stasis_subscription;
62 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
63
64 static void topic_dtor(void *obj)
65 {
66         struct stasis_topic *topic = obj;
67         ast_free(topic->name);
68         topic->name = NULL;
69         ast_free(topic->subscribers);
70         topic->subscribers = NULL;
71 }
72
73 struct stasis_topic *stasis_topic_create(const char *name)
74 {
75         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
76
77         topic = ao2_alloc(sizeof(*topic), topic_dtor);
78
79         if (!topic) {
80                 return NULL;
81         }
82
83         topic->name = ast_strdup(name);
84         if (!topic->name) {
85                 return NULL;
86         }
87
88         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
89         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
90         if (!topic->subscribers) {
91                 return NULL;
92         }
93
94         ao2_ref(topic, +1);
95         return topic;
96 }
97
98 const char *stasis_topic_name(const struct stasis_topic *topic)
99 {
100         return topic->name;
101 }
102
103 /*! \private */
104 struct stasis_subscription {
105         /*! Unique ID for this subscription */
106         char *uniqueid;
107         /*! Topic subscribed to. */
108         struct stasis_topic *topic;
109         /*! Mailbox for processing incoming messages. */
110         struct ast_taskprocessor *mailbox;
111         /*! Callback function for incoming message processing. */
112         stasis_subscription_cb callback;
113         /*! Data pointer to be handed to the callback. */
114         void *data;
115 };
116
117 static void subscription_dtor(void *obj)
118 {
119         struct stasis_subscription *sub = obj;
120         ast_assert(!stasis_subscription_is_subscribed(sub));
121         ast_free(sub->uniqueid);
122         sub->uniqueid = NULL;
123         ao2_cleanup(sub->topic);
124         sub->topic = NULL;
125         ast_taskprocessor_unreference(sub->mailbox);
126         sub->mailbox = NULL;
127 }
128
129 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
130
131 static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
132 {
133         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
134         RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
135         char uniqueid[AST_UUID_STR_LEN];
136
137         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
138         if (!sub) {
139                 return NULL;
140         }
141
142         id = ast_uuid_generate();
143         if (!id) {
144                 ast_log(LOG_ERROR, "UUID generation failed\n");
145                 return NULL;
146         }
147         ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
148         if (needs_mailbox) {
149                 sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
150                 if (!sub->mailbox) {
151                         return NULL;
152                 }
153         }
154
155         sub->uniqueid = ast_strdup(uniqueid);
156         ao2_ref(topic, +1);
157         sub->topic = topic;
158         sub->callback = callback;
159         sub->data = data;
160
161         if (topic_add_subscription(topic, sub) != 0) {
162                 return NULL;
163         }
164         send_subscription_change_message(topic, uniqueid, "Subscribe");
165
166         ao2_ref(sub, +1);
167         return sub;
168 }
169
170 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
171 {
172         return __stasis_subscribe(topic, callback, data, 1);
173 }
174
175 void stasis_unsubscribe(struct stasis_subscription *sub)
176 {
177         if (sub) {
178                 size_t i;
179                 struct stasis_topic *topic = sub->topic;
180                 SCOPED_AO2LOCK(lock_topic, topic);
181
182                 for (i = 0; i < topic->num_subscribers_current; ++i) {
183                         if (topic->subscribers[i] == sub) {
184                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
185                                 /* swap [i] with last entry; remove last entry */
186                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
187                                 /* Unsubscribing unrefs the subscription */
188                                 ao2_cleanup(sub);
189                                 return;
190                         }
191                 }
192
193                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
194         }
195 }
196
197 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
198 {
199         if (sub) {
200                 size_t i;
201                 struct stasis_topic *topic = sub->topic;
202                 SCOPED_AO2LOCK(lock_topic, topic);
203
204                 for (i = 0; i < topic->num_subscribers_current; ++i) {
205                         if (topic->subscribers[i] == sub) {
206                                 return 1;
207                         }
208                 }
209         }
210
211         return 0;
212 }
213
214 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
215 {
216         return sub->uniqueid;
217 }
218
219 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
220 {
221         struct stasis_subscription_change *change;
222         if (stasis_message_type(msg) != stasis_subscription_change()) {
223                 return 0;
224         }
225
226         change = stasis_message_data(msg);
227         if (strcmp("Unsubscribe", change->description)) {
228                 return 0;
229         }
230
231         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
232                 return 0;
233         }
234
235         return 1;
236 }
237
238 /*!
239  * \brief Add a subscriber to a topic.
240  * \param topic Topic
241  * \param sub Subscriber
242  * \return 0 on success
243  * \return Non-zero on error
244  */
245 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
246 {
247         struct stasis_subscription **subscribers;
248         SCOPED_AO2LOCK(lock, topic);
249
250         /* Increase list size, if needed */
251         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
252                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
253                 if (!subscribers) {
254                         return -1;
255                 }
256                 topic->subscribers = subscribers;
257                 topic->num_subscribers_max *= 2;
258         }
259
260         /* Don't ref sub here or we'll cause a reference cycle. */
261         topic->subscribers[topic->num_subscribers_current++] = sub;
262         return 0;
263 }
264
265 /*!
266  * \private
267  * \brief Information needed to dispatch a message to a subscription
268  */
269 struct dispatch {
270         /*! Topic message was published to */
271         struct stasis_topic *topic;
272         /*! The message itself */
273         struct stasis_message *message;
274         /*! Subscription receiving the message */
275         struct stasis_subscription *sub;
276 };
277
278 static void dispatch_dtor(void *data)
279 {
280         struct dispatch *dispatch = data;
281         ao2_cleanup(dispatch->topic);
282         ao2_cleanup(dispatch->message);
283         ao2_cleanup(dispatch->sub);
284 }
285
286 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
287 {
288         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
289
290         ast_assert(topic != NULL);
291         ast_assert(message != NULL);
292         ast_assert(sub != NULL);
293
294         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
295         if (!dispatch) {
296                 return NULL;
297         }
298
299         dispatch->topic = topic;
300         ao2_ref(topic, +1);
301
302         dispatch->message = message;
303         ao2_ref(message, +1);
304
305         dispatch->sub = sub;
306         ao2_ref(sub, +1);
307
308         ao2_ref(dispatch, +1);
309         return dispatch;
310 }
311
312 /*!
313  * \brief Dispatch a message to a subscriber
314  * \param data \ref dispatch object
315  * \return 0
316  */
317 static int dispatch_exec(void *data)
318 {
319         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
320         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
321
322         /* Since sub->topic doesn't change, no need to lock sub */
323         ast_assert(dispatch->sub->topic != NULL);
324         ao2_ref(dispatch->sub->topic, +1);
325         sub_topic = dispatch->sub->topic;
326
327         dispatch->sub->callback(dispatch->sub->data,
328                                 dispatch->sub,
329                                 sub_topic,
330                                 dispatch->message);
331
332         return 0;
333 }
334
335 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
336 {
337         struct stasis_subscription **subscribers = NULL;
338         size_t num_subscribers, i;
339
340         ast_assert(topic != NULL);
341         ast_assert(publisher_topic != NULL);
342         ast_assert(message != NULL);
343
344         /* Copy the subscribers, so we don't have to hold the mutex for long */
345         {
346                 SCOPED_AO2LOCK(lock, topic);
347                 num_subscribers = topic->num_subscribers_current;
348                 subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
349                 if (subscribers) {
350                         for (i = 0; i < num_subscribers; ++i) {
351                                 ao2_ref(topic->subscribers[i], +1);
352                                 subscribers[i] = topic->subscribers[i];
353                         }
354                 }
355         }
356
357         if (!subscribers) {
358                 ast_log(LOG_ERROR, "Dropping message\n");
359                 return;
360         }
361
362         for (i = 0; i < num_subscribers; ++i) {
363                 struct stasis_subscription *sub = subscribers[i];
364
365                 ast_assert(sub != NULL);
366
367                 if (sub->mailbox) {
368                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
369
370                         dispatch = dispatch_create(publisher_topic, message, sub);
371                         if (!dispatch) {
372                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
373                                 break;
374                         }
375
376                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
377                                 dispatch = NULL; /* Ownership transferred to mailbox */
378                         }
379                 } else {
380                         /* No mailbox; dispatch directly */
381                         sub->callback(sub->data, sub, sub->topic, message);
382                 }
383         }
384
385         for (i = 0; i < num_subscribers; ++i) {
386                 ao2_cleanup(subscribers[i]);
387         }
388         ast_free(subscribers);
389 }
390
391 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
392 {
393         stasis_forward_message(topic, topic, message);
394 }
395
396 /*! \brief Forwarding subscriber */
397 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
398 {
399         struct stasis_topic *to_topic = data;
400         stasis_forward_message(to_topic, topic, message);
401
402         if (stasis_subscription_final_message(sub, message)) {
403                 ao2_cleanup(to_topic);
404         }
405 }
406
407 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
408 {
409         struct stasis_subscription *sub;
410         if (!from_topic || !to_topic) {
411                 return NULL;
412         }
413         /* Subscribe without a mailbox, since we're just forwarding messages */
414         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
415         if (sub) {
416                 /* hold a ref to to_topic for this forwarding subscription */
417                 ao2_ref(to_topic, +1);
418         }
419         return sub;
420 }
421
422 static void subscription_change_dtor(void *obj)
423 {
424         struct stasis_subscription_change *change = obj;
425         ast_string_field_free_memory(change);
426         ao2_cleanup(change->topic);
427 }
428
429 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
430 {
431         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
432
433         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
434         if (ast_string_field_init(change, 128)) {
435                 return NULL;
436         }
437
438         ast_string_field_set(change, uniqueid, uniqueid);
439         ast_string_field_set(change, description, description);
440         ao2_ref(topic, +1);
441         change->topic = topic;
442
443         ao2_ref(change, +1);
444         return change;
445 }
446
447 struct stasis_message_type *stasis_subscription_change(void)
448 {
449         return __subscription_change_message_type;
450 }
451
452 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
453 {
454         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
455         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
456
457         change = subscription_change_alloc(topic, uniqueid, description);
458
459         if (!change) {
460                 return;
461         }
462
463         msg = stasis_message_create(stasis_subscription_change(), change);
464
465         if (!msg) {
466                 return;
467         }
468
469         stasis_publish(topic, msg);
470 }
471
472 /*! \brief Cleanup function */
473 static void stasis_exit(void)
474 {
475         ao2_cleanup(__subscription_change_message_type);
476         __subscription_change_message_type = NULL;
477         ast_threadpool_shutdown(pool);
478         pool = NULL;
479 }
480
481 int stasis_init(void)
482 {
483         int cache_init;
484
485         /* XXX Should this be configurable? */
486         struct ast_threadpool_options opts = {
487                 .version = AST_THREADPOOL_OPTIONS_VERSION,
488                 .idle_timeout = 20,
489                 .auto_increment = 1,
490                 .initial_size = 0,
491                 .max_size = 200
492         };
493
494         ast_register_atexit(stasis_exit);
495
496         if (pool) {
497                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
498                 return -1;
499         }
500
501         pool = ast_threadpool_create("stasis-core", NULL, &opts);
502         if (!pool) {
503                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
504                 return -1;
505         }
506
507         cache_init = stasis_cache_init();
508         if (cache_init != 0) {
509                 return -1;
510         }
511
512         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
513         if (!__subscription_change_message_type) {
514                 return -1;
515         }
516
517         return 0;
518 }