This patch adds a new message bus API to Asterisk.
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! Threadpool for dispatching notifications to subscribers */
45 static struct ast_threadpool *pool;
46
47 static struct stasis_message_type *__subscription_change_message_type;
48
49 /*! \private */
50 struct stasis_topic {
51         char *name;
52         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
53         struct stasis_subscription **subscribers;
54         /*! Allocated length of the subscribers array */
55         size_t num_subscribers_max;
56         /*! Current size of the subscribers array */
57         size_t num_subscribers_current;
58 };
59
60 /* Forward declarations for the tightly-coupled subscription object */
61 struct stasis_subscription;
62 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
63
64 static void topic_dtor(void *obj)
65 {
66         struct stasis_topic *topic = obj;
67         ast_free(topic->name);
68         topic->name = NULL;
69         ast_free(topic->subscribers);
70         topic->subscribers = NULL;
71 }
72
73 struct stasis_topic *stasis_topic_create(const char *name)
74 {
75         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
76
77         topic = ao2_alloc(sizeof(*topic), topic_dtor);
78
79         if (!topic) {
80                 return NULL;
81         }
82
83         topic->name = ast_strdup(name);
84         if (!topic->name) {
85                 return NULL;
86         }
87
88         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
89         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
90         if (!topic->subscribers) {
91                 return NULL;
92         }
93
94         ao2_ref(topic, +1);
95         return topic;
96 }
97
98 const char *stasis_topic_name(const struct stasis_topic *topic)
99 {
100         return topic->name;
101 }
102
103 /*! \private */
104 struct stasis_subscription {
105         /*! Unique ID for this subscription */
106         char *uniqueid;
107         /*! Topic subscribed to. */
108         struct stasis_topic *topic;
109         /*! Mailbox for processing incoming messages. */
110         struct ast_taskprocessor *mailbox;
111         /*! Callback function for incoming message processing. */
112         stasis_subscription_cb callback;
113         /*! Data pointer to be handed to the callback. */
114         void *data;
115 };
116
117 static void subscription_dtor(void *obj)
118 {
119         struct stasis_subscription *sub = obj;
120         ast_assert(!stasis_subscription_is_subscribed(sub));
121         ast_free(sub->uniqueid);
122         sub->uniqueid = NULL;
123         ao2_cleanup(sub->topic);
124         sub->topic = NULL;
125         ast_taskprocessor_unreference(sub->mailbox);
126         sub->mailbox = NULL;
127 }
128
129 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
130
131 static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
132 {
133         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
134         RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
135         char uniqueid[AST_UUID_STR_LEN];
136
137         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
138         if (!sub) {
139                 return NULL;
140         }
141
142         id = ast_uuid_generate();
143         if (!id) {
144                 ast_log(LOG_ERROR, "UUID generation failed\n");
145                 return NULL;
146         }
147         ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
148         if (needs_mailbox) {
149                 sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
150                 if (!sub->mailbox) {
151                         return NULL;
152                 }
153         }
154
155         sub->uniqueid = ast_strdup(uniqueid);
156         ao2_ref(topic, +1);
157         sub->topic = topic;
158         sub->callback = callback;
159         sub->data = data;
160
161         if (topic_add_subscription(topic, sub) != 0) {
162                 return NULL;
163         }
164         send_subscription_change_message(topic, uniqueid, "Subscribe");
165
166         ao2_ref(sub, +1);
167         return sub;
168 }
169
170 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
171 {
172         return __stasis_subscribe(topic, callback, data, 1);
173 }
174
175 void stasis_unsubscribe(struct stasis_subscription *sub)
176 {
177         if (sub) {
178                 size_t i;
179                 struct stasis_topic *topic = sub->topic;
180                 SCOPED_AO2LOCK(lock_topic, topic);
181
182                 for (i = 0; i < topic->num_subscribers_current; ++i) {
183                         if (topic->subscribers[i] == sub) {
184                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
185                                 /* swap [i] with last entry; remove last entry */
186                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
187                                 /* Unsubscribing unrefs the subscription */
188                                 ao2_cleanup(sub);
189                                 return;
190                         }
191                 }
192
193                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
194         }
195 }
196
197 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
198 {
199         if (sub) {
200                 size_t i;
201                 struct stasis_topic *topic = sub->topic;
202                 SCOPED_AO2LOCK(lock_topic, topic);
203
204                 for (i = 0; i < topic->num_subscribers_current; ++i) {
205                         if (topic->subscribers[i] == sub) {
206                                 return 1;
207                         }
208                 }
209         }
210
211         return 0;
212 }
213
214 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
215 {
216         return sub->uniqueid;
217 }
218
219 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
220 {
221         struct stasis_subscription_change *change;
222         if (stasis_message_type(msg) != stasis_subscription_change()) {
223                 return 0;
224         }
225
226         change = stasis_message_data(msg);
227         if (strcmp("Unsubscribe", change->description)) {
228                 return 0;
229         }
230
231         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
232                 return 0;
233         }
234
235         return 1;
236 }
237
238 /*!
239  * \brief Add a subscriber to a topic.
240  * \param topic Topic
241  * \param sub Subscriber
242  * \return 0 on success
243  * \return Non-zero on error
244  */
245 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
246 {
247         struct stasis_subscription **subscribers;
248         SCOPED_AO2LOCK(lock, topic);
249
250         /* Increase list size, if needed */
251         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
252                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
253                 if (!subscribers) {
254                         return -1;
255                 }
256                 topic->subscribers = subscribers;
257                 topic->num_subscribers_max *= 2;
258         }
259
260         /* Don't ref sub here or we'll cause a reference cycle. */
261         topic->subscribers[topic->num_subscribers_current++] = sub;
262         return 0;
263 }
264
265 /*!
266  * \private
267  * \brief Information needed to dispatch a message to a subscription
268  */
269 struct dispatch {
270         /*! Topic message was published to */
271         struct stasis_topic *topic;
272         /*! The message itself */
273         struct stasis_message *message;
274         /*! Subscription receiving the message */
275         struct stasis_subscription *sub;
276 };
277
278 static void dispatch_dtor(void *data)
279 {
280         struct dispatch *dispatch = data;
281         ao2_cleanup(dispatch->topic);
282         ao2_cleanup(dispatch->message);
283         ao2_cleanup(dispatch->sub);
284 }
285
286 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
287 {
288         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
289
290         ast_assert(topic != NULL);
291         ast_assert(message != NULL);
292         ast_assert(sub != NULL);
293
294         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
295         if (!dispatch) {
296                 return NULL;
297         }
298
299         dispatch->topic = topic;
300         ao2_ref(topic, +1);
301
302         dispatch->message = message;
303         ao2_ref(message, +1);
304
305         dispatch->sub = sub;
306         ao2_ref(sub, +1);
307
308         ao2_ref(dispatch, +1);
309         return dispatch;
310 }
311
312 /*!
313  * \brief Dispatch a message to a subscriber
314  * \param data \ref dispatch object
315  * \return 0
316  */
317 static int dispatch_exec(void *data)
318 {
319         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
320         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
321
322         /* Since sub->topic doesn't change, no need to lock sub */
323         ast_assert(dispatch->sub->topic != NULL);
324         ao2_ref(dispatch->sub->topic, +1);
325         sub_topic = dispatch->sub->topic;
326
327         dispatch->sub->callback(dispatch->sub->data,
328                                 dispatch->sub,
329                                 sub_topic,
330                                 dispatch->message);
331
332         return 0;
333 }
334
335 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
336 {
337         struct stasis_subscription **subscribers = NULL;
338         size_t num_subscribers, i;
339
340         /* Copy the subscribers, so we don't have to hold the mutex for long */
341         {
342                 SCOPED_AO2LOCK(lock, topic);
343                 num_subscribers = topic->num_subscribers_current;
344                 subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
345                 if (subscribers) {
346                         for (i = 0; i < num_subscribers; ++i) {
347                                 ao2_ref(topic->subscribers[i], +1);
348                                 subscribers[i] = topic->subscribers[i];
349                         }
350                 }
351         }
352
353         if (!subscribers) {
354                 ast_log(LOG_ERROR, "Dropping message\n");
355                 return;
356         }
357
358         for (i = 0; i < num_subscribers; ++i) {
359                 struct stasis_subscription *sub = subscribers[i];
360
361                 ast_assert(sub != NULL);
362
363                 if (sub->mailbox) {
364                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
365
366                         dispatch = dispatch_create(publisher_topic, message, sub);
367                         if (!dispatch) {
368                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
369                                 break;
370                         }
371
372                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
373                                 dispatch = NULL; /* Ownership transferred to mailbox */
374                         }
375                 } else {
376                         /* No mailbox; dispatch directly */
377                         sub->callback(sub->data, sub, sub->topic, message);
378                 }
379         }
380
381         for (i = 0; i < num_subscribers; ++i) {
382                 ao2_cleanup(subscribers[i]);
383         }
384         ast_free(subscribers);
385 }
386
387 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
388 {
389         stasis_forward_message(topic, topic, message);
390 }
391
392 /*! \brief Forwarding subscriber */
393 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
394 {
395         struct stasis_topic *to_topic = data;
396         stasis_forward_message(to_topic, topic, message);
397
398         if (stasis_subscription_final_message(sub, message)) {
399                 ao2_cleanup(to_topic);
400         }
401 }
402
403 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
404 {
405         struct stasis_subscription *sub;
406         if (!from_topic || !to_topic) {
407                 return NULL;
408         }
409         /* Subscribe without a mailbox, since we're just forwarding messages */
410         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
411         if (sub) {
412                 /* hold a ref to to_topic for this forwarding subscription */
413                 ao2_ref(to_topic, +1);
414         }
415         return sub;
416 }
417
418 static void subscription_change_dtor(void *obj)
419 {
420         struct stasis_subscription_change *change = obj;
421         ast_string_field_free_memory(change);
422         ao2_cleanup(change->topic);
423 }
424
425 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
426 {
427         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
428
429         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
430         if (ast_string_field_init(change, 128)) {
431                 return NULL;
432         }
433
434         ast_string_field_set(change, uniqueid, uniqueid);
435         ast_string_field_set(change, description, description);
436         ao2_ref(topic, +1);
437         change->topic = topic;
438
439         ao2_ref(change, +1);
440         return change;
441 }
442
443 struct stasis_message_type *stasis_subscription_change(void)
444 {
445         return __subscription_change_message_type;
446 }
447
448 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
449 {
450         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
451         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
452
453         change = subscription_change_alloc(topic, uniqueid, description);
454
455         if (!change) {
456                 return;
457         }
458
459         msg = stasis_message_create(stasis_subscription_change(), change);
460
461         if (!msg) {
462                 return;
463         }
464
465         stasis_publish(topic, msg);
466 }
467
468 /*! \brief Cleanup function */
469 static void stasis_exit(void)
470 {
471         ao2_cleanup(__subscription_change_message_type);
472         __subscription_change_message_type = NULL;
473         ast_threadpool_shutdown(pool);
474         pool = NULL;
475 }
476
477 int stasis_init(void)
478 {
479         int cache_init;
480
481         /* XXX Should this be configurable? */
482         struct ast_threadpool_options opts = {
483                 .version = AST_THREADPOOL_OPTIONS_VERSION,
484                 .idle_timeout = 20,
485                 .auto_increment = 1,
486                 .initial_size = 0,
487                 .max_size = 200
488         };
489
490         ast_register_atexit(stasis_exit);
491
492         if (pool) {
493                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
494                 return -1;
495         }
496
497         pool = ast_threadpool_create("stasis-core", NULL, &opts);
498         if (!pool) {
499                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
500                 return -1;
501         }
502
503         cache_init = stasis_cache_init();
504         if (cache_init != 0) {
505                 return -1;
506         }
507
508         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
509         if (!__subscription_change_message_type) {
510                 return -1;
511         }
512
513         return 0;
514 }