Fix missing ' ' around '='
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! The number of buckets to use for topic pools */
45 #define TOPIC_POOL_BUCKETS 57
46
47 /*! Threadpool for dispatching notifications to subscribers */
48 static struct ast_threadpool *pool;
49
50 static struct stasis_message_type *__subscription_change_message_type;
51
52 /*! \internal */
53 struct stasis_topic {
54         char *name;
55         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
56         struct stasis_subscription **subscribers;
57         /*! Allocated length of the subscribers array */
58         size_t num_subscribers_max;
59         /*! Current size of the subscribers array */
60         size_t num_subscribers_current;
61 };
62
63 /* Forward declarations for the tightly-coupled subscription object */
64 struct stasis_subscription;
65 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
66
67 static void topic_dtor(void *obj)
68 {
69         struct stasis_topic *topic = obj;
70         ast_free(topic->name);
71         topic->name = NULL;
72         ast_free(topic->subscribers);
73         topic->subscribers = NULL;
74 }
75
76 struct stasis_topic *stasis_topic_create(const char *name)
77 {
78         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
79
80         topic = ao2_alloc(sizeof(*topic), topic_dtor);
81
82         if (!topic) {
83                 return NULL;
84         }
85
86         topic->name = ast_strdup(name);
87         if (!topic->name) {
88                 return NULL;
89         }
90
91         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
92         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
93         if (!topic->subscribers) {
94                 return NULL;
95         }
96
97         ao2_ref(topic, +1);
98         return topic;
99 }
100
101 const char *stasis_topic_name(const struct stasis_topic *topic)
102 {
103         return topic->name;
104 }
105
106 /*! \internal */
107 struct stasis_subscription {
108         /*! Unique ID for this subscription */
109         char *uniqueid;
110         /*! Topic subscribed to. */
111         struct stasis_topic *topic;
112         /*! Mailbox for processing incoming messages. */
113         struct ast_taskprocessor *mailbox;
114         /*! Callback function for incoming message processing. */
115         stasis_subscription_cb callback;
116         /*! Data pointer to be handed to the callback. */
117         void *data;
118 };
119
120 static void subscription_dtor(void *obj)
121 {
122         struct stasis_subscription *sub = obj;
123         ast_assert(!stasis_subscription_is_subscribed(sub));
124         ast_free(sub->uniqueid);
125         sub->uniqueid = NULL;
126         ao2_cleanup(sub->topic);
127         sub->topic = NULL;
128         ast_taskprocessor_unreference(sub->mailbox);
129         sub->mailbox = NULL;
130 }
131
132 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
133
134 struct stasis_subscription *stasis_subscribe(struct stasis_topic *topic, stasis_subscription_cb callback, void *data)
135 {
136         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
137         RAII_VAR(struct ast_uuid *, id, NULL, ast_free);
138         char uniqueid[AST_UUID_STR_LEN];
139
140         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
141         if (!sub) {
142                 return NULL;
143         }
144
145         id = ast_uuid_generate();
146         if (!id) {
147                 ast_log(LOG_ERROR, "UUID generation failed\n");
148                 return NULL;
149         }
150         ast_uuid_to_str(id, uniqueid, sizeof(uniqueid));
151
152         sub->mailbox = ast_threadpool_serializer(uniqueid, pool);
153         if (!sub->mailbox) {
154                 return NULL;
155         }
156
157         sub->uniqueid = ast_strdup(uniqueid);
158         ao2_ref(topic, +1);
159         sub->topic = topic;
160         sub->callback = callback;
161         sub->data = data;
162
163         if (topic_add_subscription(topic, sub) != 0) {
164                 return NULL;
165         }
166         send_subscription_change_message(topic, uniqueid, "Subscribe");
167
168         ao2_ref(sub, +1);
169         return sub;
170 }
171
172 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
173 {
174         if (sub) {
175                 size_t i;
176                 struct stasis_topic *topic = sub->topic;
177                 SCOPED_AO2LOCK(lock_topic, topic);
178
179                 for (i = 0; i < topic->num_subscribers_current; ++i) {
180                         if (topic->subscribers[i] == sub) {
181                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
182                                 /* swap [i] with last entry; remove last entry */
183                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
184                                 /* Unsubscribing unrefs the subscription */
185                                 ao2_cleanup(sub);
186                                 return NULL;
187                         }
188                 }
189
190                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
191         }
192         return NULL;
193 }
194
195 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
196 {
197         if (sub) {
198                 size_t i;
199                 struct stasis_topic *topic = sub->topic;
200                 SCOPED_AO2LOCK(lock_topic, topic);
201
202                 for (i = 0; i < topic->num_subscribers_current; ++i) {
203                         if (topic->subscribers[i] == sub) {
204                                 return 1;
205                         }
206                 }
207         }
208
209         return 0;
210 }
211
212 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
213 {
214         return sub->uniqueid;
215 }
216
217 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
218 {
219         struct stasis_subscription_change *change;
220         if (stasis_message_type(msg) != stasis_subscription_change()) {
221                 return 0;
222         }
223
224         change = stasis_message_data(msg);
225         if (strcmp("Unsubscribe", change->description)) {
226                 return 0;
227         }
228
229         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
230                 return 0;
231         }
232
233         return 1;
234 }
235
236 /*!
237  * \brief Add a subscriber to a topic.
238  * \param topic Topic
239  * \param sub Subscriber
240  * \return 0 on success
241  * \return Non-zero on error
242  */
243 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
244 {
245         struct stasis_subscription **subscribers;
246         SCOPED_AO2LOCK(lock, topic);
247
248         /* Increase list size, if needed */
249         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
250                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
251                 if (!subscribers) {
252                         return -1;
253                 }
254                 topic->subscribers = subscribers;
255                 topic->num_subscribers_max *= 2;
256         }
257
258         /* Don't ref sub here or we'll cause a reference cycle. */
259         topic->subscribers[topic->num_subscribers_current++] = sub;
260         return 0;
261 }
262
263 /*!
264  * \internal
265  * \brief Information needed to dispatch a message to a subscription
266  */
267 struct dispatch {
268         /*! Topic message was published to */
269         struct stasis_topic *topic;
270         /*! The message itself */
271         struct stasis_message *message;
272         /*! Subscription receiving the message */
273         struct stasis_subscription *sub;
274 };
275
276 static void dispatch_dtor(void *data)
277 {
278         struct dispatch *dispatch = data;
279         ao2_cleanup(dispatch->topic);
280         ao2_cleanup(dispatch->message);
281         ao2_cleanup(dispatch->sub);
282 }
283
284 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
285 {
286         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
287
288         ast_assert(topic != NULL);
289         ast_assert(message != NULL);
290         ast_assert(sub != NULL);
291
292         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
293         if (!dispatch) {
294                 return NULL;
295         }
296
297         dispatch->topic = topic;
298         ao2_ref(topic, +1);
299
300         dispatch->message = message;
301         ao2_ref(message, +1);
302
303         dispatch->sub = sub;
304         ao2_ref(sub, +1);
305
306         ao2_ref(dispatch, +1);
307         return dispatch;
308 }
309
310 /*!
311  * \brief Dispatch a message to a subscriber
312  * \param data \ref dispatch object
313  * \return 0
314  */
315 static int dispatch_exec(void *data)
316 {
317         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
318         RAII_VAR(struct stasis_topic *, sub_topic, NULL, ao2_cleanup);
319
320         /* Since sub->topic doesn't change, no need to lock sub */
321         ast_assert(dispatch->sub->topic != NULL);
322         ao2_ref(dispatch->sub->topic, +1);
323         sub_topic = dispatch->sub->topic;
324
325         dispatch->sub->callback(dispatch->sub->data,
326                                 dispatch->sub,
327                                 sub_topic,
328                                 dispatch->message);
329
330         return 0;
331 }
332
333 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
334 {
335         size_t i;
336         SCOPED_AO2LOCK(lock, topic);
337
338         ast_assert(topic != NULL);
339         ast_assert(publisher_topic != NULL);
340         ast_assert(message != NULL);
341
342         for (i = 0; i < topic->num_subscribers_current; ++i) {
343                 struct stasis_subscription *sub = topic->subscribers[i];
344                 RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
345
346                 ast_assert(sub != NULL);
347
348                 dispatch = dispatch_create(publisher_topic, message, sub);
349                 if (!dispatch) {
350                         ast_log(LOG_DEBUG, "Dropping dispatch\n");
351                         break;
352                 }
353
354                 if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
355                         dispatch = NULL; /* Ownership transferred to mailbox */
356                 }
357         }
358 }
359
360 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
361 {
362         stasis_forward_message(topic, topic, message);
363 }
364
365 /*! \brief Forwarding subscriber */
366 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
367 {
368         struct stasis_topic *to_topic = data;
369         stasis_forward_message(to_topic, topic, message);
370
371         if (stasis_subscription_final_message(sub, message)) {
372                 ao2_cleanup(to_topic);
373         }
374 }
375
376 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
377 {
378         struct stasis_subscription *sub;
379         if (!from_topic || !to_topic) {
380                 return NULL;
381         }
382
383         sub = stasis_subscribe(from_topic, stasis_forward_cb, to_topic);
384         if (sub) {
385                 /* hold a ref to to_topic for this forwarding subscription */
386                 ao2_ref(to_topic, +1);
387         }
388         return sub;
389 }
390
391 static void subscription_change_dtor(void *obj)
392 {
393         struct stasis_subscription_change *change = obj;
394         ast_string_field_free_memory(change);
395         ao2_cleanup(change->topic);
396 }
397
398 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
399 {
400         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
401
402         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
403         if (ast_string_field_init(change, 128)) {
404                 return NULL;
405         }
406
407         ast_string_field_set(change, uniqueid, uniqueid);
408         ast_string_field_set(change, description, description);
409         ao2_ref(topic, +1);
410         change->topic = topic;
411
412         ao2_ref(change, +1);
413         return change;
414 }
415
416 struct stasis_message_type *stasis_subscription_change(void)
417 {
418         return __subscription_change_message_type;
419 }
420
421 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
422 {
423         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
424         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
425
426         change = subscription_change_alloc(topic, uniqueid, description);
427
428         if (!change) {
429                 return;
430         }
431
432         msg = stasis_message_create(stasis_subscription_change(), change);
433
434         if (!msg) {
435                 return;
436         }
437
438         stasis_publish(topic, msg);
439 }
440
441 struct topic_pool_entry {
442         struct stasis_subscription *forward;
443         struct stasis_topic *topic;
444 };
445
446 static void topic_pool_entry_dtor(void *obj)
447 {
448         struct topic_pool_entry *entry = obj;
449         entry->forward = stasis_unsubscribe(entry->forward);
450         ao2_cleanup(entry->topic);
451         entry->topic = NULL;
452 }
453
454 static struct topic_pool_entry *topic_pool_entry_alloc(void)
455 {
456         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
457 }
458
459 struct stasis_topic_pool {
460         struct ao2_container *pool_container;
461         struct stasis_topic *pool_topic;
462 };
463
464 static void topic_pool_dtor(void *obj)
465 {
466         struct stasis_topic_pool *pool = obj;
467         ao2_cleanup(pool->pool_container);
468         pool->pool_container = NULL;
469         ao2_cleanup(pool->pool_topic);
470         pool->pool_topic = NULL;
471 }
472
473 static int topic_pool_entry_hash(const void *obj, const int flags)
474 {
475         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
476         return ast_str_case_hash(topic_name);
477 }
478
479 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
480 {
481         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
482         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
483         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
484 }
485
486 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
487 {
488         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
489         if (!pool) {
490                 return NULL;
491         }
492         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
493         ao2_ref(pooled_topic, +1);
494         pool->pool_topic = pooled_topic;
495
496         ao2_ref(pool, +1);
497         return pool;
498 }
499
500 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
501 {
502         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
503         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
504         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
505
506         if (topic_pool_entry) {
507                 return topic_pool_entry->topic;
508         }
509
510         topic_pool_entry = topic_pool_entry_alloc();
511
512         if (!topic_pool_entry) {
513                 return NULL;
514         }
515
516         topic_pool_entry->topic = stasis_topic_create(topic_name);
517         if (!topic_pool_entry->topic) {
518                 return NULL;
519         }
520
521         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
522         if (!topic_pool_entry->forward) {
523                 return NULL;
524         }
525
526         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
527
528         return topic_pool_entry->topic;
529 }
530
531 /*! \brief Cleanup function */
532 static void stasis_exit(void)
533 {
534         ao2_cleanup(__subscription_change_message_type);
535         __subscription_change_message_type = NULL;
536         ast_threadpool_shutdown(pool);
537         pool = NULL;
538 }
539
540 int stasis_init(void)
541 {
542         int cache_init;
543
544         /* XXX Should this be configurable? */
545         struct ast_threadpool_options opts = {
546                 .version = AST_THREADPOOL_OPTIONS_VERSION,
547                 .idle_timeout = 20,
548                 .auto_increment = 1,
549                 .initial_size = 0,
550                 .max_size = 200
551         };
552
553         ast_register_atexit(stasis_exit);
554
555         if (pool) {
556                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
557                 return -1;
558         }
559
560         pool = ast_threadpool_create("stasis-core", NULL, &opts);
561         if (!pool) {
562                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
563                 return -1;
564         }
565
566         cache_init = stasis_cache_init();
567         if (cache_init != 0) {
568                 return -1;
569         }
570
571         __subscription_change_message_type = stasis_message_type_create("stasis_subscription_change");
572         if (!__subscription_change_message_type) {
573                 return -1;
574         }
575
576         return 0;
577 }