ARI: Add ability to raise arbitrary User Events
[asterisk/asterisk.git] / main / stasis_message_router.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis message router implementation.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_message_router.h"
36 #include "asterisk/vector.h"
37
38 /*! \internal */
39 struct stasis_message_route {
40         /*! Message type handle by this route. */
41         struct stasis_message_type *message_type;
42         /*! Callback function for incoming message processing. */
43         stasis_subscription_cb callback;
44         /*! Data pointer to be handed to the callback. */
45         void *data;
46 };
47
48 AST_VECTOR(route_table, struct stasis_message_route);
49
50 static struct stasis_message_route *route_table_find(struct route_table *table,
51         struct stasis_message_type *message_type)
52 {
53         size_t idx;
54         struct stasis_message_route *route;
55
56         /* While a linear search for routes may seem very inefficient, most
57          * route tables have six routes or less. For such small data, it's
58          * hard to beat a linear search. If we start having larger route
59          * tables, then we can look into containers with more efficient
60          * lookups.
61          */
62         for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
63                 route = AST_VECTOR_GET_ADDR(table, idx);
64                 if (route->message_type == message_type) {
65                         return route;
66                 }
67         }
68
69         return NULL;
70 }
71
72 /*!
73  * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
74  *
75  * \param elem Element to compare against
76  * \param value Value to compare with the vector element.
77  *
78  * \return 0 if element does not match.
79  * \return Non-zero if element matches.
80  */
81 #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
82
83 /*!
84  * \brief route_table vector element cleanup.
85  *
86  * \param elem Element to cleanup
87  *
88  * \return Nothing
89  */
90 #define ROUTE_TABLE_ELEM_CLEANUP(elem)  ao2_cleanup((elem).message_type)
91
92 static int route_table_remove(struct route_table *table,
93         struct stasis_message_type *message_type)
94 {
95         return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
96                 ROUTE_TABLE_ELEM_CLEANUP);
97 }
98
99 static int route_table_add(struct route_table *table,
100         struct stasis_message_type *message_type,
101         stasis_subscription_cb callback, void *data)
102 {
103         struct stasis_message_route route;
104         int res;
105
106         ast_assert(callback != NULL);
107         ast_assert(route_table_find(table, message_type) == NULL);
108
109         route.message_type = ao2_bump(message_type);
110         route.callback = callback;
111         route.data = data;
112
113         res = AST_VECTOR_APPEND(table, route);
114         if (res) {
115                 ROUTE_TABLE_ELEM_CLEANUP(route);
116         }
117         return res;
118 }
119
120 static void route_table_dtor(struct route_table *table)
121 {
122         size_t idx;
123         struct stasis_message_route *route;
124
125         for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
126                 route = AST_VECTOR_GET_ADDR(table, idx);
127                 ROUTE_TABLE_ELEM_CLEANUP(*route);
128         }
129         AST_VECTOR_FREE(table);
130 }
131
132 /*! \internal */
133 struct stasis_message_router {
134         /*! Subscription to the upstream topic */
135         struct stasis_subscription *subscription;
136         /*! Subscribed routes */
137         struct route_table routes;
138         /*! Subscribed routes for \ref stasis_cache_update messages */
139         struct route_table cache_routes;
140         /*! Route of last resort */
141         struct stasis_message_route default_route;
142 };
143
144 static void router_dtor(void *obj)
145 {
146         struct stasis_message_router *router = obj;
147
148         ast_assert(!stasis_subscription_is_subscribed(router->subscription));
149         ast_assert(stasis_subscription_is_done(router->subscription));
150
151         router->subscription = NULL;
152
153         route_table_dtor(&router->routes);
154         route_table_dtor(&router->cache_routes);
155 }
156
157 static int find_route(
158         struct stasis_message_router *router,
159         struct stasis_message *message,
160         struct stasis_message_route *route_out)
161 {
162         struct stasis_message_route *route = NULL;
163         struct stasis_message_type *type = stasis_message_type(message);
164         SCOPED_AO2LOCK(lock, router);
165
166         ast_assert(route_out != NULL);
167
168         if (type == stasis_cache_update_type()) {
169                 /* Find a cache route */
170                 struct stasis_cache_update *update =
171                         stasis_message_data(message);
172                 route = route_table_find(&router->cache_routes, update->type);
173         }
174
175         if (route == NULL) {
176                 /* Find a regular route */
177                 route = route_table_find(&router->routes, type);
178         }
179
180         if (route == NULL && router->default_route.callback) {
181                 /* Maybe the default route, then? */
182                 route = &router->default_route;
183         }
184
185         if (!route) {
186                 return -1;
187         }
188
189         *route_out = *route;
190         return 0;
191 }
192
193 static void router_dispatch(void *data,
194                             struct stasis_subscription *sub,
195                             struct stasis_message *message)
196 {
197         struct stasis_message_router *router = data;
198         struct stasis_message_route route;
199
200         if (find_route(router, message, &route) == 0) {
201                 route.callback(route.data, sub, message);
202         }
203
204         if (stasis_subscription_final_message(sub, message)) {
205                 ao2_cleanup(router);
206         }
207 }
208
209 struct stasis_message_router *stasis_message_router_create(
210         struct stasis_topic *topic)
211 {
212         int res;
213         RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
214
215         router = ao2_alloc(sizeof(*router), router_dtor);
216         if (!router) {
217                 return NULL;
218         }
219
220         res = 0;
221         res |= AST_VECTOR_INIT(&router->routes, 0);
222         res |= AST_VECTOR_INIT(&router->cache_routes, 0);
223         if (res) {
224                 return NULL;
225         }
226
227         router->subscription = stasis_subscribe(topic, router_dispatch, router);
228         if (!router->subscription) {
229                 return NULL;
230         }
231
232         ao2_ref(router, +1);
233         return router;
234 }
235
236 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
237 {
238         if (!router) {
239                 return;
240         }
241
242         stasis_unsubscribe(router->subscription);
243 }
244
245 void stasis_message_router_unsubscribe_and_join(
246         struct stasis_message_router *router)
247 {
248         if (!router) {
249                 return;
250         }
251         stasis_unsubscribe_and_join(router->subscription);
252 }
253
254 int stasis_message_router_is_done(struct stasis_message_router *router)
255 {
256         if (!router) {
257                 /* Null router is about as done as you can get */
258                 return 1;
259         }
260
261         return stasis_subscription_is_done(router->subscription);
262 }
263
264 void stasis_message_router_publish_sync(struct stasis_message_router *router,
265         struct stasis_message *message)
266 {
267         ast_assert(router != NULL);
268
269         ao2_bump(router);
270         stasis_publish_sync(router->subscription, message);
271         ao2_cleanup(router);
272 }
273
274 int stasis_message_router_add(struct stasis_message_router *router,
275         struct stasis_message_type *message_type,
276         stasis_subscription_cb callback, void *data)
277 {
278         int res;
279
280         ast_assert(router != NULL);
281
282         if (!message_type) {
283                 /* Cannot route to NULL type. */
284                 return -1;
285         }
286         ao2_lock(router);
287         res = route_table_add(&router->routes, message_type, callback, data);
288         ao2_unlock(router);
289         return res;
290 }
291
292 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
293         struct stasis_message_type *message_type,
294         stasis_subscription_cb callback, void *data)
295 {
296         int res;
297
298         ast_assert(router != NULL);
299
300         if (!message_type) {
301                 /* Cannot cache a route to NULL type. */
302                 return -1;
303         }
304         ao2_lock(router);
305         res = route_table_add(&router->cache_routes, message_type, callback, data);
306         ao2_unlock(router);
307         return res;
308 }
309
310 void stasis_message_router_remove(struct stasis_message_router *router,
311         struct stasis_message_type *message_type)
312 {
313         ast_assert(router != NULL);
314
315         if (!message_type) {
316                 /* Cannot remove a NULL type. */
317                 return;
318         }
319         ao2_lock(router);
320         route_table_remove(&router->routes, message_type);
321         ao2_unlock(router);
322 }
323
324 void stasis_message_router_remove_cache_update(
325         struct stasis_message_router *router,
326         struct stasis_message_type *message_type)
327 {
328         ast_assert(router != NULL);
329
330         if (!message_type) {
331                 /* Cannot remove a NULL type. */
332                 return;
333         }
334         ao2_lock(router);
335         route_table_remove(&router->cache_routes, message_type);
336         ao2_unlock(router);
337 }
338
339 int stasis_message_router_set_default(struct stasis_message_router *router,
340         stasis_subscription_cb callback,
341         void *data)
342 {
343         ast_assert(router != NULL);
344         ast_assert(callback != NULL);
345
346         ao2_lock(router);
347         router->default_route.callback = callback;
348         router->default_route.data = data;
349         ao2_unlock(router);
350         /* While this implementation can never fail, it used to be able to */
351         return 0;
352 }