Make stasis unsubscription functions return NULL
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! Threadpool for dispatching notifications to subscribers */
45 static struct ast_threadpool *pool;
46
47 static struct stasis_message_type *__subscription_change_message_type;
48
49 /*! \private */
50 struct stasis_topic {
51         char *name;
52         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
53         struct stasis_subscription **subscribers;
54         /*! Allocated length of the subscribers array */
55         size_t num_subscribers_max;
56         /*! Current size of the subscribers array */
57         size_t num_subscribers_current;
58 };
59
60 /* Forward declarations for the tightly-coupled subscription object */
61 struct stasis_subscription;
62 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
63
64 static void topic_dtor(void *obj)
65 {
66         struct stasis_topic *topic = obj;
67         ast_free(topic->name);
68         topic->name = NULL;
69         ast_free(topic->subscribers);
70         topic->subscribers = NULL;
71 }
72
73 struct stasis_topic *stasis_topic_create(const char *name)
74 {
75         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
76
77         topic = ao2_alloc(sizeof(*topic), topic_dtor);
78
79         if (!topic) {
80                 return NULL;
81         }
82
83         topic->name = ast_strdup(name);
84         if (!topic->name) {
85                 return NULL;
86         }
87
88         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
89         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(topic->subscribers));
90         if (!topic->subscribers) {
91                 return NULL;
92         }
93
94         ao2_ref(topic, +1);
95         return topic;
96 }
97
98 const char *stasis_topic_name(const struct stasis_topic *topic)
99 {
100         return topic->name;
101 }
102
103 /*! \private */
104 struct stasis_subscription {
105         /*! Unique ID for this subscription */
106         char *uniqueid;
107         /*! Topic subscribed to. */
108         struct stasis_topic *topic;
109         /*! Mailbox for processing incoming messages. */
110         struct ast_taskprocessor *mailbox;
111         /*! Callback function for incoming message processing. */
112         stasis_subscription_cb callback;
113         /*! Data pointer to be handed to the callback. */
114         void *data;
115 };
116
117 static void subscription_dtor(void *obj)
118 {
119         struct stasis_subscription *sub = obj;
120         ast_assert(!stasis_subscription_is_subscribed(sub));
121         ast_free(sub->uniqueid);
122         sub->uniqueid = NULL;
123         ao2_cleanup(sub->topic);
124         sub->topic = NULL;
125         ast_taskprocessor_unreference(sub->mailbox);
126         sub->mailbox = NULL;
127 }
128
129 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
130
131 static struct stasis_subscription *__stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data, int needs_mailbox)
132 {
133         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
134         RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
135         char uniqueid[AST_UUID_STR_LEN];
136
137         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
138         if (!sub) {
139                 return NULL;
140         }
141
142         id = ast_uuid_generate();
143         if (!id) {
144                 ast_log(LOG_ERROR, "UUID generation failed\n");
145                 return NULL;
146         }
147         ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
148         if (needs_mailbox) {
149                 sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
150                 if (!sub->mailbox) {
151                         return NULL;
152                 }
153         }
154
155         sub->uniqueid = ast_strdup(uniqueid);
156         ao2_ref(topic, +1);
157         sub->topic = topic;
158         sub->callback = callback;
159         sub->data = data;
160
161         if (topic_add_subscription(topic, sub) != 0) {
162                 return NULL;
163         }
164         send_subscription_change_message(topic, uniqueid, "Subscribe");
165
166         ao2_ref(sub, +1);
167         return sub;
168 }
169
170 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
171 {
172         return __stasis_subscribe(topic, callback, data, 1);
173 }
174
175 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
176 {
177         if (sub) {
178                 size_t i;
179                 struct stasis_topic *topic = sub->topic;
180                 SCOPED_AO2LOCK(lock_topic, topic);
181
182                 for (i = 0; i < topic->num_subscribers_current; ++i) {
183                         if (topic->subscribers[i] == sub) {
184                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
185                                 /* swap [i] with last entry; remove last entry */
186                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
187                                 /* Unsubscribing unrefs the subscription */
188                                 ao2_cleanup(sub);
189                                 return NULL;
190                         }
191                 }
192
193                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
194         }
195         return NULL;
196 }
197
198 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
199 {
200         if (sub) {
201                 size_t i;
202                 struct stasis_topic *topic = sub->topic;
203                 SCOPED_AO2LOCK(lock_topic, topic);
204
205                 for (i = 0; i < topic->num_subscribers_current; ++i) {
206                         if (topic->subscribers[i] == sub) {
207                                 return 1;
208                         }
209                 }
210         }
211
212         return 0;
213 }
214
215 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
216 {
217         return sub->uniqueid;
218 }
219
220 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
221 {
222         struct stasis_subscription_change *change;
223         if (stasis_message_type(msg) != stasis_subscription_change()) {
224                 return 0;
225         }
226
227         change = stasis_message_data(msg);
228         if (strcmp("Unsubscribe", change->description)) {
229                 return 0;
230         }
231
232         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
233                 return 0;
234         }
235
236         return 1;
237 }
238
239 /*!
240  * \brief Add a subscriber to a topic.
241  * \param topic Topic
242  * \param sub Subscriber
243  * \return 0 on success
244  * \return Non-zero on error
245  */
246 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
247 {
248         struct stasis_subscription **subscribers;
249         SCOPED_AO2LOCK(lock, topic);
250
251         /* Increase list size, if needed */
252         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
253                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
254                 if (!subscribers) {
255                         return -1;
256                 }
257                 topic->subscribers = subscribers;
258                 topic->num_subscribers_max *= 2;
259         }
260
261         /* Don't ref sub here or we'll cause a reference cycle. */
262         topic->subscribers[topic->num_subscribers_current++] = sub;
263         return 0;
264 }
265
266 /*!
267  * \private
268  * \brief Information needed to dispatch a message to a subscription
269  */
270 struct dispatch {
271         /*! Topic message was published to */
272         struct stasis_topic *topic;
273         /*! The message itself */
274         struct stasis_message *message;
275         /*! Subscription receiving the message */
276         struct stasis_subscription *sub;
277 };
278
279 static void dispatch_dtor(void *data)
280 {
281         struct dispatch *dispatch = data;
282         ao2_cleanup(dispatch->topic);
283         ao2_cleanup(dispatch->message);
284         ao2_cleanup(dispatch->sub);
285 }
286
287 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
288 {
289         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
290
291         ast_assert(topic != NULL);
292         ast_assert(message != NULL);
293         ast_assert(sub != NULL);
294
295         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
296         if (!dispatch) {
297                 return NULL;
298         }
299
300         dispatch->topic = topic;
301         ao2_ref(topic, +1);
302
303         dispatch->message = message;
304         ao2_ref(message, +1);
305
306         dispatch->sub = sub;
307         ao2_ref(sub, +1);
308
309         ao2_ref(dispatch, +1);
310         return dispatch;
311 }
312
313 /*!
314  * \brief Dispatch a message to a subscriber
315  * \param data \ref dispatch object
316  * \return 0
317  */
318 static int dispatch_exec(void *data)
319 {
320         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
321         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
322
323         /* Since sub->topic doesn't change, no need to lock sub */
324         ast_assert(dispatch->sub->topic != NULL);
325         ao2_ref(dispatch->sub->topic, +1);
326         sub_topic = dispatch->sub->topic;
327
328         dispatch->sub->callback(dispatch->sub->data,
329                                 dispatch->sub,
330                                 sub_topic,
331                                 dispatch->message);
332
333         return 0;
334 }
335
336 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
337 {
338         struct stasis_subscription **subscribers = NULL;
339         size_t num_subscribers, i;
340
341         ast_assert(topic != NULL);
342         ast_assert(publisher_topic != NULL);
343         ast_assert(message != NULL);
344
345         /* Copy the subscribers, so we don't have to hold the mutex for long */
346         {
347                 SCOPED_AO2LOCK(lock, topic);
348                 num_subscribers = topic->num_subscribers_current;
349                 subscribers = ast_malloc(num_subscribers * sizeof(*subscribers));
350                 if (subscribers) {
351                         for (i = 0; i < num_subscribers; ++i) {
352                                 ao2_ref(topic->subscribers[i], +1);
353                                 subscribers[i] = topic->subscribers[i];
354                         }
355                 }
356         }
357
358         if (!subscribers) {
359                 ast_log(LOG_ERROR, "Dropping message\n");
360                 return;
361         }
362
363         for (i = 0; i < num_subscribers; ++i) {
364                 struct stasis_subscription *sub = subscribers[i];
365
366                 ast_assert(sub != NULL);
367
368                 if (sub->mailbox) {
369                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
370
371                         dispatch = dispatch_create(publisher_topic, message, sub);
372                         if (!dispatch) {
373                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
374                                 break;
375                         }
376
377                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
378                                 dispatch = NULL; /* Ownership transferred to mailbox */
379                         }
380                 } else {
381                         /* No mailbox; dispatch directly */
382                         sub->callback(sub->data, sub, sub->topic, message);
383                 }
384         }
385
386         for (i = 0; i < num_subscribers; ++i) {
387                 ao2_cleanup(subscribers[i]);
388         }
389         ast_free(subscribers);
390 }
391
392 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
393 {
394         stasis_forward_message(topic, topic, message);
395 }
396
397 /*! \brief Forwarding subscriber */
398 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
399 {
400         struct stasis_topic *to_topic = data;
401         stasis_forward_message(to_topic, topic, message);
402
403         if (stasis_subscription_final_message(sub, message)) {
404                 ao2_cleanup(to_topic);
405         }
406 }
407
408 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
409 {
410         struct stasis_subscription *sub;
411         if (!from_topic || !to_topic) {
412                 return NULL;
413         }
414         /* Subscribe without a mailbox, since we're just forwarding messages */
415         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
416         if (sub) {
417                 /* hold a ref to to_topic for this forwarding subscription */
418                 ao2_ref(to_topic, +1);
419         }
420         return sub;
421 }
422
423 static void subscription_change_dtor(void *obj)
424 {
425         struct stasis_subscription_change *change = obj;
426         ast_string_field_free_memory(change);
427         ao2_cleanup(change->topic);
428 }
429
430 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
431 {
432         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
433
434         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
435         if (ast_string_field_init(change, 128)) {
436                 return NULL;
437         }
438
439         ast_string_field_set(change, uniqueid, uniqueid);
440         ast_string_field_set(change, description, description);
441         ao2_ref(topic, +1);
442         change->topic = topic;
443
444         ao2_ref(change, +1);
445         return change;
446 }
447
448 struct stasis_message_type *stasis_subscription_change(void)
449 {
450         return __subscription_change_message_type;
451 }
452
453 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
454 {
455         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
456         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
457
458         change = subscription_change_alloc(topic, uniqueid, description);
459
460         if (!change) {
461                 return;
462         }
463
464         msg = stasis_message_create(stasis_subscription_change(), change);
465
466         if (!msg) {
467                 return;
468         }
469
470         stasis_publish(topic, msg);
471 }
472
473 /*! \brief Cleanup function */
474 static void stasis_exit(void)
475 {
476         ao2_cleanup(__subscription_change_message_type);
477         __subscription_change_message_type = NULL;
478         ast_threadpool_shutdown(pool);
479         pool = NULL;
480 }
481
482 int stasis_init(void)
483 {
484         int cache_init;
485
486         /* XXX Should this be configurable? */
487         struct ast_threadpool_options opts = {
488                 .version = AST_THREADPOOL_OPTIONS_VERSION,
489                 .idle_timeout = 20,
490                 .auto_increment = 1,
491                 .initial_size = 0,
492                 .max_size = 200
493         };
494
495         ast_register_atexit(stasis_exit);
496
497         if (pool) {
498                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
499                 return -1;
500         }
501
502         pool = ast_threadpool_create("stasis-core", NULL, &opts);
503         if (!pool) {
504                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
505                 return -1;
506         }
507
508         cache_init = stasis_cache_init();
509         if (cache_init != 0) {
510                 return -1;
511         }
512
513         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
514         if (!__subscription_change_message_type) {
515                 return -1;
516         }
517
518         return 0;
519 }