Configuration for Stasis threadpool
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_internal.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/threadpool.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/utils.h"
40 #include "asterisk/uuid.h"
41
42 /*! Initial size of the subscribers list. */
43 #define INITIAL_SUBSCRIBERS_MAX 4
44
45 /*! The number of buckets to use for topic pools */
46 #define TOPIC_POOL_BUCKETS 57
47
48 /*! Threadpool for dispatching notifications to subscribers */
49 static struct ast_threadpool *pool;
50
51 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
52
53 /*! \internal */
54 struct stasis_topic {
55         char *name;
56         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
57         struct stasis_subscription **subscribers;
58         /*! Allocated length of the subscribers array */
59         size_t num_subscribers_max;
60         /*! Current size of the subscribers array */
61         size_t num_subscribers_current;
62 };
63
64 /* Forward declarations for the tightly-coupled subscription object */
65 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
66
67 static void topic_dtor(void *obj)
68 {
69         struct stasis_topic *topic = obj;
70         ast_free(topic->name);
71         topic->name = NULL;
72         ast_free(topic->subscribers);
73         topic->subscribers = NULL;
74 }
75
76 struct stasis_topic *stasis_topic_create(const char *name)
77 {
78         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
79
80         topic = ao2_alloc(sizeof(*topic), topic_dtor);
81
82         if (!topic) {
83                 return NULL;
84         }
85
86         topic->name = ast_strdup(name);
87         if (!topic->name) {
88                 return NULL;
89         }
90
91         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
92         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
93         if (!topic->subscribers) {
94                 return NULL;
95         }
96
97         ao2_ref(topic, +1);
98         return topic;
99 }
100
101 const char *stasis_topic_name(const struct stasis_topic *topic)
102 {
103         return topic->name;
104 }
105
106 /*! \internal */
107 struct stasis_subscription {
108         /*! Unique ID for this subscription */
109         char uniqueid[AST_UUID_STR_LEN];
110         /*! Topic subscribed to. */
111         struct stasis_topic *topic;
112         /*! Mailbox for processing incoming messages. */
113         struct ast_taskprocessor *mailbox;
114         /*! Callback function for incoming message processing. */
115         stasis_subscription_cb callback;
116         /*! Data pointer to be handed to the callback. */
117         void *data;
118
119         /*! Lock for joining with subscription. */
120         ast_mutex_t join_lock;
121         /*! Condition for joining with subscription. */
122         ast_cond_t join_cond;
123         /*! Flag set when final message for sub has been received.
124          *  Be sure join_lock is held before reading/setting. */
125         int final_message_rxed;
126         /*! Flag set when final message for sub has been processed.
127          *  Be sure join_lock is held before reading/setting. */
128         int final_message_processed;
129 };
130
131 static void subscription_dtor(void *obj)
132 {
133         struct stasis_subscription *sub = obj;
134         ast_assert(!stasis_subscription_is_subscribed(sub));
135         ast_assert(stasis_subscription_is_done(sub));
136         ao2_cleanup(sub->topic);
137         sub->topic = NULL;
138         ast_taskprocessor_unreference(sub->mailbox);
139         sub->mailbox = NULL;
140         ast_mutex_destroy(&sub->join_lock);
141         ast_cond_destroy(&sub->join_cond);
142 }
143
144 /*!
145  * \brief Invoke the subscription's callback.
146  * \param sub Subscription to invoke.
147  * \param topic Topic message was published to.
148  * \param message Message to send.
149  */
150 static void subscription_invoke(struct stasis_subscription *sub,
151                                   struct stasis_topic *topic,
152                                   struct stasis_message *message)
153 {
154         /* Notify that the final message has been received */
155         if (stasis_subscription_final_message(sub, message)) {
156                 SCOPED_MUTEX(lock, &sub->join_lock);
157                 sub->final_message_rxed = 1;
158                 ast_cond_signal(&sub->join_cond);
159         }
160
161         /* Since sub is mostly immutable, no need to lock sub */
162         sub->callback(sub->data, sub, topic, message);
163
164         /* Notify that the final message has been processed */
165         if (stasis_subscription_final_message(sub, message)) {
166                 SCOPED_MUTEX(lock, &sub->join_lock);
167                 sub->final_message_processed = 1;
168                 ast_cond_signal(&sub->join_cond);
169         }
170 }
171
172 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
173
174 struct stasis_subscription *internal_stasis_subscribe(
175         struct stasis_topic *topic,
176         stasis_subscription_cb callback,
177         void *data,
178         int needs_mailbox)
179 {
180         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
181
182         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
183         if (!sub) {
184                 return NULL;
185         }
186
187         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
188
189         if (needs_mailbox) {
190                 sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
191                 if (!sub->mailbox) {
192                         return NULL;
193                 }
194         }
195
196         ao2_ref(topic, +1);
197         sub->topic = topic;
198         sub->callback = callback;
199         sub->data = data;
200         ast_mutex_init(&sub->join_lock);
201         ast_cond_init(&sub->join_cond, NULL);
202
203         if (topic_add_subscription(topic, sub) != 0) {
204                 return NULL;
205         }
206         send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
207
208         ao2_ref(sub, +1);
209         return sub;
210 }
211
212 struct stasis_subscription *stasis_subscribe(
213         struct stasis_topic *topic,
214         stasis_subscription_cb callback,
215         void *data)
216 {
217         return internal_stasis_subscribe(topic, callback, data, 1);
218 }
219
220 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
221 {
222         if (sub) {
223                 size_t i;
224                 struct stasis_topic *topic = sub->topic;
225                 SCOPED_AO2LOCK(lock_topic, topic);
226
227                 for (i = 0; i < topic->num_subscribers_current; ++i) {
228                         if (topic->subscribers[i] == sub) {
229                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
230                                 /* swap [i] with last entry; remove last entry */
231                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
232                                 /* Unsubscribing unrefs the subscription */
233                                 ao2_cleanup(sub);
234                                 return NULL;
235                         }
236                 }
237
238                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
239         }
240         return NULL;
241 }
242
243 /*!
244  * \brief Block until the final message has been received on a subscription.
245  *
246  * \param subscription Subscription to wait on.
247  */
248 void stasis_subscription_join(struct stasis_subscription *subscription)
249 {
250         if (subscription) {
251                 SCOPED_MUTEX(lock, &subscription->join_lock);
252                 /* Wait until the processed flag has been set */
253                 while (!subscription->final_message_processed) {
254                         ast_cond_wait(&subscription->join_cond,
255                                 &subscription->join_lock);
256                 }
257         }
258 }
259
260 int stasis_subscription_is_done(struct stasis_subscription *subscription)
261 {
262         if (subscription) {
263                 SCOPED_MUTEX(lock, &subscription->join_lock);
264                 return subscription->final_message_rxed;
265         }
266
267         /* Null subscription is about as done as you can get */
268         return 1;
269 }
270
271 struct stasis_subscription *stasis_unsubscribe_and_join(
272         struct stasis_subscription *subscription)
273 {
274         if (!subscription) {
275                 return NULL;
276         }
277
278         /* Bump refcount to hold it past the unsubscribe */
279         ao2_ref(subscription, +1);
280         stasis_unsubscribe(subscription);
281         stasis_subscription_join(subscription);
282         /* Now decrement the refcount back */
283         ao2_cleanup(subscription);
284         return NULL;
285 }
286
287 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
288 {
289         if (sub) {
290                 size_t i;
291                 struct stasis_topic *topic = sub->topic;
292                 SCOPED_AO2LOCK(lock_topic, topic);
293
294                 for (i = 0; i < topic->num_subscribers_current; ++i) {
295                         if (topic->subscribers[i] == sub) {
296                                 return 1;
297                         }
298                 }
299         }
300
301         return 0;
302 }
303
304 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
305 {
306         return sub->uniqueid;
307 }
308
309 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
310 {
311         struct stasis_subscription_change *change;
312         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
313                 return 0;
314         }
315
316         change = stasis_message_data(msg);
317         if (strcmp("Unsubscribe", change->description)) {
318                 return 0;
319         }
320
321         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
322                 return 0;
323         }
324
325         return 1;
326 }
327
328 /*!
329  * \brief Add a subscriber to a topic.
330  * \param topic Topic
331  * \param sub Subscriber
332  * \return 0 on success
333  * \return Non-zero on error
334  */
335 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
336 {
337         struct stasis_subscription **subscribers;
338         SCOPED_AO2LOCK(lock, topic);
339
340         /* Increase list size, if needed */
341         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
342                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
343                 if (!subscribers) {
344                         return -1;
345                 }
346                 topic->subscribers = subscribers;
347                 topic->num_subscribers_max *= 2;
348         }
349
350         /* Don't ref sub here or we'll cause a reference cycle. */
351         topic->subscribers[topic->num_subscribers_current++] = sub;
352         return 0;
353 }
354
355 /*!
356  * \internal
357  * \brief Information needed to dispatch a message to a subscription
358  */
359 struct dispatch {
360         /*! Topic message was published to */
361         struct stasis_topic *topic;
362         /*! The message itself */
363         struct stasis_message *message;
364         /*! Subscription receiving the message */
365         struct stasis_subscription *sub;
366 };
367
368 static void dispatch_dtor(void *data)
369 {
370         struct dispatch *dispatch = data;
371         ao2_cleanup(dispatch->topic);
372         ao2_cleanup(dispatch->message);
373         ao2_cleanup(dispatch->sub);
374 }
375
376 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
377 {
378         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
379
380         ast_assert(topic != NULL);
381         ast_assert(message != NULL);
382         ast_assert(sub != NULL);
383
384         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
385         if (!dispatch) {
386                 return NULL;
387         }
388
389         dispatch->topic = topic;
390         ao2_ref(topic, +1);
391
392         dispatch->message = message;
393         ao2_ref(message, +1);
394
395         dispatch->sub = sub;
396         ao2_ref(sub, +1);
397
398         ao2_ref(dispatch, +1);
399         return dispatch;
400 }
401
402 /*!
403  * \brief Dispatch a message to a subscriber
404  * \param data \ref dispatch object
405  * \return 0
406  */
407 static int dispatch_exec(void *data)
408 {
409         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
410
411         subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
412
413         return 0;
414 }
415
416 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
417 {
418         size_t i;
419         SCOPED_AO2LOCK(lock, topic);
420
421         ast_assert(topic != NULL);
422         ast_assert(publisher_topic != NULL);
423         ast_assert(message != NULL);
424
425         for (i = 0; i < topic->num_subscribers_current; ++i) {
426                 struct stasis_subscription *sub = topic->subscribers[i];
427
428                 ast_assert(sub != NULL);
429
430                 if (sub->mailbox) {
431                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
432
433                         dispatch = dispatch_create(publisher_topic, message, sub);
434                         if (!dispatch) {
435                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
436                                 break;
437                         }
438
439                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
440                                 /* Ownership transferred to mailbox.
441                                  * Don't increment ref, b/c the task processor
442                                  * may have already gotten rid of the object.
443                                  */
444                                 dispatch = NULL;
445                         }
446                 } else {
447                         /* Dispatch directly */
448                         subscription_invoke(sub, publisher_topic, message);
449                 }
450         }
451 }
452
453 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
454 {
455         stasis_forward_message(topic, topic, message);
456 }
457
458 /*! \brief Forwarding subscriber */
459 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
460 {
461         struct stasis_topic *to_topic = data;
462         stasis_forward_message(to_topic, topic, message);
463
464         if (stasis_subscription_final_message(sub, message)) {
465                 ao2_cleanup(to_topic);
466         }
467 }
468
469 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
470 {
471         struct stasis_subscription *sub;
472         if (!from_topic || !to_topic) {
473                 return NULL;
474         }
475
476         /* Forwarding subscriptions should dispatch directly instead of having a
477          * mailbox. Otherwise, messages forwarded to the same topic from
478          * different topics may get reordered. Which is bad.
479          */
480         sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
481         if (sub) {
482                 /* hold a ref to to_topic for this forwarding subscription */
483                 ao2_ref(to_topic, +1);
484         }
485         return sub;
486 }
487
488 static void subscription_change_dtor(void *obj)
489 {
490         struct stasis_subscription_change *change = obj;
491         ast_string_field_free_memory(change);
492         ao2_cleanup(change->topic);
493 }
494
495 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
496 {
497         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
498
499         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
500         if (ast_string_field_init(change, 128)) {
501                 return NULL;
502         }
503
504         ast_string_field_set(change, uniqueid, uniqueid);
505         ast_string_field_set(change, description, description);
506         ao2_ref(topic, +1);
507         change->topic = topic;
508
509         ao2_ref(change, +1);
510         return change;
511 }
512
513 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
514 {
515         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
516         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
517
518         change = subscription_change_alloc(topic, uniqueid, description);
519
520         if (!change) {
521                 return;
522         }
523
524         msg = stasis_message_create(stasis_subscription_change_type(), change);
525
526         if (!msg) {
527                 return;
528         }
529
530         stasis_publish(topic, msg);
531 }
532
533 struct topic_pool_entry {
534         struct stasis_subscription *forward;
535         struct stasis_topic *topic;
536 };
537
538 static void topic_pool_entry_dtor(void *obj)
539 {
540         struct topic_pool_entry *entry = obj;
541         entry->forward = stasis_unsubscribe(entry->forward);
542         ao2_cleanup(entry->topic);
543         entry->topic = NULL;
544 }
545
546 static struct topic_pool_entry *topic_pool_entry_alloc(void)
547 {
548         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
549 }
550
551 struct stasis_topic_pool {
552         struct ao2_container *pool_container;
553         struct stasis_topic *pool_topic;
554 };
555
556 static void topic_pool_dtor(void *obj)
557 {
558         struct stasis_topic_pool *pool = obj;
559         ao2_cleanup(pool->pool_container);
560         pool->pool_container = NULL;
561         ao2_cleanup(pool->pool_topic);
562         pool->pool_topic = NULL;
563 }
564
565 static int topic_pool_entry_hash(const void *obj, const int flags)
566 {
567         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
568         return ast_str_case_hash(topic_name);
569 }
570
571 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
572 {
573         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
574         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
575         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
576 }
577
578 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
579 {
580         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
581         if (!pool) {
582                 return NULL;
583         }
584         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
585         ao2_ref(pooled_topic, +1);
586         pool->pool_topic = pooled_topic;
587
588         ao2_ref(pool, +1);
589         return pool;
590 }
591
592 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
593 {
594         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
595         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
596         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
597
598         if (topic_pool_entry) {
599                 return topic_pool_entry->topic;
600         }
601
602         topic_pool_entry = topic_pool_entry_alloc();
603
604         if (!topic_pool_entry) {
605                 return NULL;
606         }
607
608         topic_pool_entry->topic = stasis_topic_create(topic_name);
609         if (!topic_pool_entry->topic) {
610                 return NULL;
611         }
612
613         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
614         if (!topic_pool_entry->forward) {
615                 return NULL;
616         }
617
618         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
619
620         return topic_pool_entry->topic;
621 }
622
623 void stasis_log_bad_type_access(const char *name)
624 {
625         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
626 }
627
628 /*! \brief Cleanup function */
629 static void stasis_exit(void)
630 {
631         ast_threadpool_shutdown(pool);
632         pool = NULL;
633 }
634
635 /*! \brief Cleanup function for graceful shutdowns */
636 static void stasis_cleanup(void)
637 {
638         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
639 }
640
641 int stasis_init(void)
642 {
643         int cache_init;
644
645         struct ast_threadpool_options opts;
646
647         /* Be sure the types are cleaned up after the message bus */
648         ast_register_cleanup(stasis_cleanup);
649         ast_register_atexit(stasis_exit);
650
651         if (stasis_config_init() != 0) {
652                 ast_log(LOG_ERROR, "Stasis configuration failed\n");
653                 return -1;
654         }
655
656         if (pool) {
657                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
658                 return -1;
659         }
660
661         stasis_config_get_threadpool_options(&opts);
662         ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
663                 opts.initial_size, opts.max_size, opts.idle_timeout);
664         pool = ast_threadpool_create("stasis-core", NULL, &opts);
665         if (!pool) {
666                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
667                 return -1;
668         }
669
670         cache_init = stasis_cache_init();
671         if (cache_init != 0) {
672                 return -1;
673         }
674
675         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
676                 return -1;
677         }
678
679         return 0;
680 }