Merge "func_jitterbuffer: Add audio/video sync support."
[asterisk/asterisk.git] / main / stasis_message_router.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis message router implementation.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/stasis_message_router.h"
34 #include "asterisk/vector.h"
35
36 /*! \internal */
37 struct stasis_message_route {
38         /*! Message type handle by this route. */
39         struct stasis_message_type *message_type;
40         /*! Callback function for incoming message processing. */
41         stasis_subscription_cb callback;
42         /*! Data pointer to be handed to the callback. */
43         void *data;
44 };
45
46 AST_VECTOR(route_table, struct stasis_message_route);
47
48 static struct stasis_message_route *route_table_find(struct route_table *table,
49         struct stasis_message_type *message_type)
50 {
51         size_t idx;
52         struct stasis_message_route *route;
53
54         /* While a linear search for routes may seem very inefficient, most
55          * route tables have six routes or less. For such small data, it's
56          * hard to beat a linear search. If we start having larger route
57          * tables, then we can look into containers with more efficient
58          * lookups.
59          */
60         for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
61                 route = AST_VECTOR_GET_ADDR(table, idx);
62                 if (route->message_type == message_type) {
63                         return route;
64                 }
65         }
66
67         return NULL;
68 }
69
70 /*!
71  * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
72  *
73  * \param elem Element to compare against
74  * \param value Value to compare with the vector element.
75  *
76  * \return 0 if element does not match.
77  * \return Non-zero if element matches.
78  */
79 #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
80
81 /*!
82  * \brief route_table vector element cleanup.
83  *
84  * \param elem Element to cleanup
85  *
86  * \return Nothing
87  */
88 #define ROUTE_TABLE_ELEM_CLEANUP(elem)  ao2_cleanup((elem).message_type)
89
90 static int route_table_remove(struct route_table *table,
91         struct stasis_message_type *message_type)
92 {
93         return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
94                 ROUTE_TABLE_ELEM_CLEANUP);
95 }
96
97 static int route_table_add(struct route_table *table,
98         struct stasis_message_type *message_type,
99         stasis_subscription_cb callback, void *data)
100 {
101         struct stasis_message_route route;
102         int res;
103
104         ast_assert(callback != NULL);
105         ast_assert(route_table_find(table, message_type) == NULL);
106
107         route.message_type = ao2_bump(message_type);
108         route.callback = callback;
109         route.data = data;
110
111         res = AST_VECTOR_APPEND(table, route);
112         if (res) {
113                 ROUTE_TABLE_ELEM_CLEANUP(route);
114         }
115         return res;
116 }
117
118 static void route_table_dtor(struct route_table *table)
119 {
120         size_t idx;
121         struct stasis_message_route *route;
122
123         for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
124                 route = AST_VECTOR_GET_ADDR(table, idx);
125                 ROUTE_TABLE_ELEM_CLEANUP(*route);
126         }
127         AST_VECTOR_FREE(table);
128 }
129
130 /*! \internal */
131 struct stasis_message_router {
132         /*! Subscription to the upstream topic */
133         struct stasis_subscription *subscription;
134         /*! Subscribed routes */
135         struct route_table routes;
136         /*! Subscribed routes for \ref stasis_cache_update messages */
137         struct route_table cache_routes;
138         /*! Route of last resort */
139         struct stasis_message_route default_route;
140 };
141
142 static void router_dtor(void *obj)
143 {
144         struct stasis_message_router *router = obj;
145
146         ast_assert(!stasis_subscription_is_subscribed(router->subscription));
147         ast_assert(stasis_subscription_is_done(router->subscription));
148
149         router->subscription = NULL;
150
151         route_table_dtor(&router->routes);
152         route_table_dtor(&router->cache_routes);
153 }
154
155 static int find_route(
156         struct stasis_message_router *router,
157         struct stasis_message *message,
158         struct stasis_message_route *route_out)
159 {
160         struct stasis_message_route *route = NULL;
161         struct stasis_message_type *type = stasis_message_type(message);
162         SCOPED_AO2LOCK(lock, router);
163
164         ast_assert(route_out != NULL);
165
166         if (type == stasis_cache_update_type()) {
167                 /* Find a cache route */
168                 struct stasis_cache_update *update =
169                         stasis_message_data(message);
170                 route = route_table_find(&router->cache_routes, update->type);
171         }
172
173         if (route == NULL) {
174                 /* Find a regular route */
175                 route = route_table_find(&router->routes, type);
176         }
177
178         if (route == NULL && router->default_route.callback) {
179                 /* Maybe the default route, then? */
180                 route = &router->default_route;
181         }
182
183         if (!route) {
184                 return -1;
185         }
186
187         *route_out = *route;
188         return 0;
189 }
190
191 static void router_dispatch(void *data,
192                             struct stasis_subscription *sub,
193                             struct stasis_message *message)
194 {
195         struct stasis_message_router *router = data;
196         struct stasis_message_route route;
197
198         if (find_route(router, message, &route) == 0) {
199                 route.callback(route.data, sub, message);
200         }
201
202         if (stasis_subscription_final_message(sub, message)) {
203                 ao2_cleanup(router);
204         }
205 }
206
207 static struct stasis_message_router *stasis_message_router_create_internal(
208         struct stasis_topic *topic, int use_thread_pool, const char *file, int lineno,
209         const char *func)
210 {
211         int res;
212         struct stasis_message_router *router;
213
214         router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
215         if (!router) {
216                 return NULL;
217         }
218
219         res = 0;
220         res |= AST_VECTOR_INIT(&router->routes, 0);
221         res |= AST_VECTOR_INIT(&router->cache_routes, 0);
222         if (res) {
223                 ao2_ref(router, -1);
224
225                 return NULL;
226         }
227
228         if (use_thread_pool) {
229                 router->subscription = __stasis_subscribe_pool(topic, router_dispatch, router, file, lineno, func);
230         } else {
231                 router->subscription = __stasis_subscribe(topic, router_dispatch, router, file, lineno, func);
232         }
233
234         if (!router->subscription) {
235                 ao2_ref(router, -1);
236
237                 return NULL;
238         }
239
240         /* We need to receive subscription change messages so we know when our subscription goes away */
241         stasis_subscription_accept_message_type(router->subscription, stasis_subscription_change_type());
242
243         return router;
244 }
245
246 struct stasis_message_router *__stasis_message_router_create(
247         struct stasis_topic *topic, const char *file, int lineno, const char *func)
248 {
249         return stasis_message_router_create_internal(topic, 0, file, lineno, func);
250 }
251
252 struct stasis_message_router *__stasis_message_router_create_pool(
253         struct stasis_topic *topic, const char *file, int lineno, const char *func)
254 {
255         return stasis_message_router_create_internal(topic, 1, file, lineno, func);
256 }
257
258 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
259 {
260         if (!router) {
261                 return;
262         }
263
264         ao2_lock(router);
265         router->subscription = stasis_unsubscribe(router->subscription);
266         ao2_unlock(router);
267 }
268
269 void stasis_message_router_unsubscribe_and_join(
270         struct stasis_message_router *router)
271 {
272         if (!router) {
273                 return;
274         }
275         stasis_unsubscribe_and_join(router->subscription);
276 }
277
278 int stasis_message_router_is_done(struct stasis_message_router *router)
279 {
280         if (!router) {
281                 /* Null router is about as done as you can get */
282                 return 1;
283         }
284
285         return stasis_subscription_is_done(router->subscription);
286 }
287
288 void stasis_message_router_publish_sync(struct stasis_message_router *router,
289         struct stasis_message *message)
290 {
291         ast_assert(router != NULL);
292
293         ao2_bump(router);
294         stasis_publish_sync(router->subscription, message);
295         ao2_cleanup(router);
296 }
297
298 int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
299         long low_water, long high_water)
300 {
301         int res = -1;
302
303         if (router) {
304                 res = stasis_subscription_set_congestion_limits(router->subscription,
305                         low_water, high_water);
306         }
307         return res;
308 }
309
310 int stasis_message_router_add(struct stasis_message_router *router,
311         struct stasis_message_type *message_type,
312         stasis_subscription_cb callback, void *data)
313 {
314         int res;
315
316         ast_assert(router != NULL);
317
318         if (!message_type) {
319                 /* Cannot route to NULL type. */
320                 return -1;
321         }
322         ao2_lock(router);
323         res = route_table_add(&router->routes, message_type, callback, data);
324         if (!res) {
325                 stasis_subscription_accept_message_type(router->subscription, message_type);
326                 /* Until a specific message type was added we would already drop the message, so being
327                  * selective now doesn't harm us. If we have a default route then we are already forced
328                  * to filter nothing and messages will come in regardless.
329                  */
330                 stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
331         }
332         ao2_unlock(router);
333         return res;
334 }
335
336 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
337         struct stasis_message_type *message_type,
338         stasis_subscription_cb callback, void *data)
339 {
340         int res;
341
342         ast_assert(router != NULL);
343
344         if (!message_type) {
345                 /* Cannot cache a route to NULL type. */
346                 return -1;
347         }
348         ao2_lock(router);
349         res = route_table_add(&router->cache_routes, message_type, callback, data);
350         if (!res) {
351                 stasis_subscription_accept_message_type(router->subscription, stasis_cache_update_type());
352                 stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
353         }
354         ao2_unlock(router);
355         return res;
356 }
357
358 void stasis_message_router_remove(struct stasis_message_router *router,
359         struct stasis_message_type *message_type)
360 {
361         ast_assert(router != NULL);
362
363         if (!message_type) {
364                 /* Cannot remove a NULL type. */
365                 return;
366         }
367         ao2_lock(router);
368         route_table_remove(&router->routes, message_type);
369         ao2_unlock(router);
370 }
371
372 void stasis_message_router_remove_cache_update(
373         struct stasis_message_router *router,
374         struct stasis_message_type *message_type)
375 {
376         ast_assert(router != NULL);
377
378         if (!message_type) {
379                 /* Cannot remove a NULL type. */
380                 return;
381         }
382         ao2_lock(router);
383         route_table_remove(&router->cache_routes, message_type);
384         ao2_unlock(router);
385 }
386
387 int stasis_message_router_set_default(struct stasis_message_router *router,
388         stasis_subscription_cb callback,
389         void *data)
390 {
391         stasis_message_router_set_formatters_default(router, callback, data, STASIS_SUBSCRIPTION_FORMATTER_NONE);
392
393         /* While this implementation can never fail, it used to be able to */
394         return 0;
395 }
396
397 void stasis_message_router_set_formatters_default(struct stasis_message_router *router,
398         stasis_subscription_cb callback,
399         void *data,
400         enum stasis_subscription_message_formatters formatters)
401 {
402         ast_assert(router != NULL);
403         ast_assert(callback != NULL);
404
405         stasis_subscription_accept_formatters(router->subscription, formatters);
406
407         ao2_lock(router);
408         router->default_route.callback = callback;
409         router->default_route.data = data;
410         ao2_unlock(router);
411
412         if (formatters == STASIS_SUBSCRIPTION_FORMATTER_NONE) {
413                 /* Formatters govern what messages the default callback get, so it is only if none is
414                  * specified that we accept all messages regardless.
415                  */
416                 stasis_subscription_set_filter(router->subscription, STASIS_SUBSCRIPTION_FILTER_FORCED_NONE);
417         }
418 }
419
420 void stasis_message_router_accept_formatters(struct stasis_message_router *router,
421         enum stasis_subscription_message_formatters formatters)
422 {
423         ast_assert(router != NULL);
424
425         stasis_subscription_accept_formatters(router->subscription, formatters);
426
427         return;
428 }