Avoid unnecessary cleanups during immediate shutdown
[asterisk/asterisk.git] / main / stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message Bus API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/threadpool.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/utils.h"
39 #include "asterisk/uuid.h"
40
41 /*! Initial size of the subscribers list. */
42 #define INITIAL_SUBSCRIBERS_MAX 4
43
44 /*! The number of buckets to use for topic pools */
45 #define TOPIC_POOL_BUCKETS 57
46
47 /*! Threadpool for dispatching notifications to subscribers */
48 static struct ast_threadpool *pool;
49
50 STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
51
52 /*! \internal */
53 struct stasis_topic {
54         char *name;
55         /*! Variable length array of the subscribers (raw pointer to avoid cyclic references) */
56         struct stasis_subscription **subscribers;
57         /*! Allocated length of the subscribers array */
58         size_t num_subscribers_max;
59         /*! Current size of the subscribers array */
60         size_t num_subscribers_current;
61 };
62
63 /* Forward declarations for the tightly-coupled subscription object */
64 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
65
66 static void topic_dtor(void *obj)
67 {
68         struct stasis_topic *topic = obj;
69         ast_free(topic->name);
70         topic->name = NULL;
71         ast_free(topic->subscribers);
72         topic->subscribers = NULL;
73 }
74
75 struct stasis_topic *stasis_topic_create(const char *name)
76 {
77         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
78
79         topic = ao2_alloc(sizeof(*topic), topic_dtor);
80
81         if (!topic) {
82                 return NULL;
83         }
84
85         topic->name = ast_strdup(name);
86         if (!topic->name) {
87                 return NULL;
88         }
89
90         topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
91         topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
92         if (!topic->subscribers) {
93                 return NULL;
94         }
95
96         ao2_ref(topic, +1);
97         return topic;
98 }
99
100 const char *stasis_topic_name(const struct stasis_topic *topic)
101 {
102         return topic->name;
103 }
104
105 /*! \internal */
106 struct stasis_subscription {
107         /*! Unique ID for this subscription */
108         char uniqueid[AST_UUID_STR_LEN];
109         /*! Topic subscribed to. */
110         struct stasis_topic *topic;
111         /*! Mailbox for processing incoming messages. */
112         struct ast_taskprocessor *mailbox;
113         /*! Callback function for incoming message processing. */
114         stasis_subscription_cb callback;
115         /*! Data pointer to be handed to the callback. */
116         void *data;
117
118         /*! Lock for joining with subscription. */
119         ast_mutex_t join_lock;
120         /*! Condition for joining with subscription. */
121         ast_cond_t join_cond;
122         /*! Flag set when final message for sub has been received.
123          *  Be sure join_lock is held before reading/setting. */
124         int final_message_rxed;
125         /*! Flag set when final message for sub has been processed.
126          *  Be sure join_lock is held before reading/setting. */
127         int final_message_processed;
128 };
129
130 static void subscription_dtor(void *obj)
131 {
132         struct stasis_subscription *sub = obj;
133         ast_assert(!stasis_subscription_is_subscribed(sub));
134         ast_assert(stasis_subscription_is_done(sub));
135         ao2_cleanup(sub->topic);
136         sub->topic = NULL;
137         ast_taskprocessor_unreference(sub->mailbox);
138         sub->mailbox = NULL;
139         ast_mutex_destroy(&sub->join_lock);
140         ast_cond_destroy(&sub->join_cond);
141 }
142
143 /*!
144  * \brief Invoke the subscription's callback.
145  * \param sub Subscription to invoke.
146  * \param topic Topic message was published to.
147  * \param message Message to send.
148  */
149 static void subscription_invoke(struct stasis_subscription *sub,
150                                   struct stasis_topic *topic,
151                                   struct stasis_message *message)
152 {
153         /* Notify that the final message has been received */
154         if (stasis_subscription_final_message(sub, message)) {
155                 SCOPED_MUTEX(lock, &sub->join_lock);
156                 sub->final_message_rxed = 1;
157                 ast_cond_signal(&sub->join_cond);
158         }
159
160         /* Since sub is mostly immutable, no need to lock sub */
161         sub->callback(sub->data, sub, topic, message);
162
163         /* Notify that the final message has been processed */
164         if (stasis_subscription_final_message(sub, message)) {
165                 SCOPED_MUTEX(lock, &sub->join_lock);
166                 sub->final_message_processed = 1;
167                 ast_cond_signal(&sub->join_cond);
168         }
169 }
170
171 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
172
173 static struct stasis_subscription *__stasis_subscribe(
174         struct stasis_topic *topic,
175         stasis_subscription_cb callback,
176         void *data,
177         int needs_mailbox)
178 {
179         RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
180
181         sub = ao2_alloc(sizeof(*sub), subscription_dtor);
182         if (!sub) {
183                 return NULL;
184         }
185
186         ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
187
188         if (needs_mailbox) {
189                 sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
190                 if (!sub->mailbox) {
191                         return NULL;
192                 }
193         }
194
195         ao2_ref(topic, +1);
196         sub->topic = topic;
197         sub->callback = callback;
198         sub->data = data;
199         ast_mutex_init(&sub->join_lock);
200         ast_cond_init(&sub->join_cond, NULL);
201
202         if (topic_add_subscription(topic, sub) != 0) {
203                 return NULL;
204         }
205         send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
206
207         ao2_ref(sub, +1);
208         return sub;
209 }
210
211 struct stasis_subscription *stasis_subscribe(
212         struct stasis_topic *topic,
213         stasis_subscription_cb callback,
214         void *data)
215 {
216         return __stasis_subscribe(topic, callback, data, 1);
217 }
218
219 struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
220 {
221         if (sub) {
222                 size_t i;
223                 struct stasis_topic *topic = sub->topic;
224                 SCOPED_AO2LOCK(lock_topic, topic);
225
226                 for (i = 0; i < topic->num_subscribers_current; ++i) {
227                         if (topic->subscribers[i] == sub) {
228                                 send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
229                                 /* swap [i] with last entry; remove last entry */
230                                 topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
231                                 /* Unsubscribing unrefs the subscription */
232                                 ao2_cleanup(sub);
233                                 return NULL;
234                         }
235                 }
236
237                 ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
238         }
239         return NULL;
240 }
241
242 /*!
243  * \brief Block until the final message has been received on a subscription.
244  *
245  * \param subscription Subscription to wait on.
246  */
247 void stasis_subscription_join(struct stasis_subscription *subscription)
248 {
249         if (subscription) {
250                 SCOPED_MUTEX(lock, &subscription->join_lock);
251                 /* Wait until the processed flag has been set */
252                 while (!subscription->final_message_processed) {
253                         ast_cond_wait(&subscription->join_cond,
254                                 &subscription->join_lock);
255                 }
256         }
257 }
258
259 int stasis_subscription_is_done(struct stasis_subscription *subscription)
260 {
261         if (subscription) {
262                 SCOPED_MUTEX(lock, &subscription->join_lock);
263                 return subscription->final_message_rxed;
264         }
265
266         /* Null subscription is about as done as you can get */
267         return 1;
268 }
269
270 struct stasis_subscription *stasis_unsubscribe_and_join(
271         struct stasis_subscription *subscription)
272 {
273         if (!subscription) {
274                 return NULL;
275         }
276
277         /* Bump refcount to hold it past the unsubscribe */
278         ao2_ref(subscription, +1);
279         stasis_unsubscribe(subscription);
280         stasis_subscription_join(subscription);
281         /* Now decrement the refcount back */
282         ao2_cleanup(subscription);
283         return NULL;
284 }
285
286 int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
287 {
288         if (sub) {
289                 size_t i;
290                 struct stasis_topic *topic = sub->topic;
291                 SCOPED_AO2LOCK(lock_topic, topic);
292
293                 for (i = 0; i < topic->num_subscribers_current; ++i) {
294                         if (topic->subscribers[i] == sub) {
295                                 return 1;
296                         }
297                 }
298         }
299
300         return 0;
301 }
302
303 const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
304 {
305         return sub->uniqueid;
306 }
307
308 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
309 {
310         struct stasis_subscription_change *change;
311         if (stasis_message_type(msg) != stasis_subscription_change_type()) {
312                 return 0;
313         }
314
315         change = stasis_message_data(msg);
316         if (strcmp("Unsubscribe", change->description)) {
317                 return 0;
318         }
319
320         if (strcmp(stasis_subscription_uniqueid(sub), change->uniqueid)) {
321                 return 0;
322         }
323
324         return 1;
325 }
326
327 /*!
328  * \brief Add a subscriber to a topic.
329  * \param topic Topic
330  * \param sub Subscriber
331  * \return 0 on success
332  * \return Non-zero on error
333  */
334 static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
335 {
336         struct stasis_subscription **subscribers;
337         SCOPED_AO2LOCK(lock, topic);
338
339         /* Increase list size, if needed */
340         if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
341                 subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
342                 if (!subscribers) {
343                         return -1;
344                 }
345                 topic->subscribers = subscribers;
346                 topic->num_subscribers_max *= 2;
347         }
348
349         /* Don't ref sub here or we'll cause a reference cycle. */
350         topic->subscribers[topic->num_subscribers_current++] = sub;
351         return 0;
352 }
353
354 /*!
355  * \internal
356  * \brief Information needed to dispatch a message to a subscription
357  */
358 struct dispatch {
359         /*! Topic message was published to */
360         struct stasis_topic *topic;
361         /*! The message itself */
362         struct stasis_message *message;
363         /*! Subscription receiving the message */
364         struct stasis_subscription *sub;
365 };
366
367 static void dispatch_dtor(void *data)
368 {
369         struct dispatch *dispatch = data;
370         ao2_cleanup(dispatch->topic);
371         ao2_cleanup(dispatch->message);
372         ao2_cleanup(dispatch->sub);
373 }
374
375 static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
376 {
377         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
378
379         ast_assert(topic != NULL);
380         ast_assert(message != NULL);
381         ast_assert(sub != NULL);
382
383         dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
384         if (!dispatch) {
385                 return NULL;
386         }
387
388         dispatch->topic = topic;
389         ao2_ref(topic, +1);
390
391         dispatch->message = message;
392         ao2_ref(message, +1);
393
394         dispatch->sub = sub;
395         ao2_ref(sub, +1);
396
397         ao2_ref(dispatch, +1);
398         return dispatch;
399 }
400
401 /*!
402  * \brief Dispatch a message to a subscriber
403  * \param data \ref dispatch object
404  * \return 0
405  */
406 static int dispatch_exec(void *data)
407 {
408         RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
409
410         subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
411
412         return 0;
413 }
414
415 void stasis_forward_message(struct stasis_topic *topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
416 {
417         size_t i;
418         SCOPED_AO2LOCK(lock, topic);
419
420         ast_assert(topic != NULL);
421         ast_assert(publisher_topic != NULL);
422         ast_assert(message != NULL);
423
424         for (i = 0; i < topic->num_subscribers_current; ++i) {
425                 struct stasis_subscription *sub = topic->subscribers[i];
426
427                 ast_assert(sub != NULL);
428
429                 if (sub->mailbox) {
430                         RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
431
432                         dispatch = dispatch_create(publisher_topic, message, sub);
433                         if (!dispatch) {
434                                 ast_log(LOG_DEBUG, "Dropping dispatch\n");
435                                 break;
436                         }
437
438                         if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
439                                 /* Ownership transferred to mailbox.
440                                  * Don't increment ref, b/c the task processor
441                                  * may have already gotten rid of the object.
442                                  */
443                                 dispatch = NULL;
444                         }
445                 } else {
446                         /* Dispatch directly */
447                         subscription_invoke(sub, publisher_topic, message);
448                 }
449         }
450 }
451
452 void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
453 {
454         stasis_forward_message(topic, topic, message);
455 }
456
457 /*! \brief Forwarding subscriber */
458 static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
459 {
460         struct stasis_topic *to_topic = data;
461         stasis_forward_message(to_topic, topic, message);
462
463         if (stasis_subscription_final_message(sub, message)) {
464                 ao2_cleanup(to_topic);
465         }
466 }
467
468 struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
469 {
470         struct stasis_subscription *sub;
471         if (!from_topic || !to_topic) {
472                 return NULL;
473         }
474
475         /* Forwarding subscriptions should dispatch directly instead of having a
476          * mailbox. Otherwise, messages forwarded to the same topic from
477          * different topics may get reordered. Which is bad.
478          */
479         sub = __stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
480         if (sub) {
481                 /* hold a ref to to_topic for this forwarding subscription */
482                 ao2_ref(to_topic, +1);
483         }
484         return sub;
485 }
486
487 static void subscription_change_dtor(void *obj)
488 {
489         struct stasis_subscription_change *change = obj;
490         ast_string_field_free_memory(change);
491         ao2_cleanup(change->topic);
492 }
493
494 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
495 {
496         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
497
498         change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
499         if (ast_string_field_init(change, 128)) {
500                 return NULL;
501         }
502
503         ast_string_field_set(change, uniqueid, uniqueid);
504         ast_string_field_set(change, description, description);
505         ao2_ref(topic, +1);
506         change->topic = topic;
507
508         ao2_ref(change, +1);
509         return change;
510 }
511
512 static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
513 {
514         RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
515         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
516
517         change = subscription_change_alloc(topic, uniqueid, description);
518
519         if (!change) {
520                 return;
521         }
522
523         msg = stasis_message_create(stasis_subscription_change_type(), change);
524
525         if (!msg) {
526                 return;
527         }
528
529         stasis_publish(topic, msg);
530 }
531
532 struct topic_pool_entry {
533         struct stasis_subscription *forward;
534         struct stasis_topic *topic;
535 };
536
537 static void topic_pool_entry_dtor(void *obj)
538 {
539         struct topic_pool_entry *entry = obj;
540         entry->forward = stasis_unsubscribe(entry->forward);
541         ao2_cleanup(entry->topic);
542         entry->topic = NULL;
543 }
544
545 static struct topic_pool_entry *topic_pool_entry_alloc(void)
546 {
547         return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
548 }
549
550 struct stasis_topic_pool {
551         struct ao2_container *pool_container;
552         struct stasis_topic *pool_topic;
553 };
554
555 static void topic_pool_dtor(void *obj)
556 {
557         struct stasis_topic_pool *pool = obj;
558         ao2_cleanup(pool->pool_container);
559         pool->pool_container = NULL;
560         ao2_cleanup(pool->pool_topic);
561         pool->pool_topic = NULL;
562 }
563
564 static int topic_pool_entry_hash(const void *obj, const int flags)
565 {
566         const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
567         return ast_str_case_hash(topic_name);
568 }
569
570 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
571 {
572         struct topic_pool_entry *opt1 = obj, *opt2 = arg;
573         const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
574         return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
575 }
576
577 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
578 {
579         RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
580         if (!pool) {
581                 return NULL;
582         }
583         pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
584         ao2_ref(pooled_topic, +1);
585         pool->pool_topic = pooled_topic;
586
587         ao2_ref(pool, +1);
588         return pool;
589 }
590
591 struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
592 {
593         RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
594         SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
595         topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
596
597         if (topic_pool_entry) {
598                 return topic_pool_entry->topic;
599         }
600
601         topic_pool_entry = topic_pool_entry_alloc();
602
603         if (!topic_pool_entry) {
604                 return NULL;
605         }
606
607         topic_pool_entry->topic = stasis_topic_create(topic_name);
608         if (!topic_pool_entry->topic) {
609                 return NULL;
610         }
611
612         topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
613         if (!topic_pool_entry->forward) {
614                 return NULL;
615         }
616
617         ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
618
619         return topic_pool_entry->topic;
620 }
621
622 void stasis_log_bad_type_access(const char *name)
623 {
624         ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
625 }
626
627 /*! \brief Cleanup function */
628 static void stasis_exit(void)
629 {
630         ast_threadpool_shutdown(pool);
631         pool = NULL;
632 }
633
634 /*! \brief Cleanup function for graceful shutdowns */
635 static void stasis_cleanup(void)
636 {
637         STASIS_MESSAGE_TYPE_CLEANUP(stasis_subscription_change_type);
638 }
639
640 int stasis_init(void)
641 {
642         int cache_init;
643
644         /* XXX Should this be configurable? */
645         struct ast_threadpool_options opts = {
646                 .version = AST_THREADPOOL_OPTIONS_VERSION,
647                 .idle_timeout = 20,
648                 .auto_increment = 1,
649                 .initial_size = 0,
650                 .max_size = 200
651         };
652
653         /* Be sure the types are cleaned up after the message bus */
654         ast_register_cleanup(stasis_cleanup);
655         ast_register_atexit(stasis_exit);
656
657         if (pool) {
658                 ast_log(LOG_ERROR, "Stasis double-initialized\n");
659                 return -1;
660         }
661
662         pool = ast_threadpool_create("stasis-core", NULL, &opts);
663         if (!pool) {
664                 ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
665                 return -1;
666         }
667
668         cache_init = stasis_cache_init();
669         if (cache_init != 0) {
670                 return -1;
671         }
672
673         if (STASIS_MESSAGE_TYPE_INIT(stasis_subscription_change_type) != 0) {
674                 return -1;
675         }
676
677         return 0;
678 }