Add support for a realtime sorcery module.
[asterisk/asterisk.git] / main / stasis_message_router.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis message router implementation.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_message_router.h"
36
37 #define INITIAL_ROUTES_MAX 8
38
39 /*! \internal */
40 struct stasis_message_route {
41         /*! Message type handle by this route. */
42         struct stasis_message_type *message_type;
43         /*! Callback function for incoming message processing. */
44         stasis_subscription_cb callback;
45         /*! Data pointer to be handed to the callback. */
46         void *data;
47 };
48
49 static void route_dtor(void *obj)
50 {
51         struct stasis_message_route *route = obj;
52
53         ao2_cleanup(route->message_type);
54         route->message_type = NULL;
55 }
56
57 /*! \internal */
58 struct stasis_message_router {
59         /*! Subscription to the upstream topic */
60         struct stasis_subscription *subscription;
61         /*! Variable length array of the routes */
62         struct stasis_message_route **routes;
63         /*! Route of last resort */
64         struct stasis_message_route *default_route;
65         /*! Allocated length of the routes array */
66         size_t num_routes_max;
67         /*! Current size of the routes array */
68         size_t num_routes_current;
69 };
70
71 static void router_dtor(void *obj)
72 {
73         struct stasis_message_router *router = obj;
74         size_t i;
75
76         ast_assert(!stasis_subscription_is_subscribed(router->subscription));
77         router->subscription = NULL;
78         for (i = 0; i < router->num_routes_current; ++i) {
79                 ao2_cleanup(router->routes[i]);
80                 router->routes[i] = NULL;
81         }
82         ast_free(router->routes);
83         router->routes = NULL;
84         ao2_cleanup(router->default_route);
85         router->default_route = NULL;
86 }
87
88 static void router_dispatch(void *data,
89                             struct stasis_subscription *sub,
90                             struct stasis_topic *topic,
91                             struct stasis_message *message)
92 {
93         struct stasis_message_router *router = data;
94         RAII_VAR(struct stasis_message_router *, router_needs_cleanup, NULL, ao2_cleanup);
95         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
96         struct stasis_message_type *type = stasis_message_type(message);
97         size_t i;
98
99         {
100                 SCOPED_AO2LOCK(lock, router);
101
102                 /* We don't expect many message types, so a simple loop should
103                  * be adequate, even if the complexity is O(n). Sorting the list
104                  * would be an easy way to bring that down to O(log(n)). Using a
105                  * hashtable/ao2_container could be even better. Just be sure to
106                  * profile before you optimize!
107                  */
108                 route = router->default_route;
109                 for (i = 0; i < router->num_routes_current; ++i) {
110                         if (router->routes[i]->message_type == type) {
111                                 route = router->routes[i];
112                                 break;
113                         }
114                 }
115
116                 /* Ref the route before leaving the scoped lock */
117                 if (route) {
118                         ao2_ref(route, +1);
119                 }
120         }
121
122         if (route) {
123                 route->callback(route->data, sub, topic, message);
124         }
125
126         if (stasis_subscription_final_message(sub, message)) {
127                 router_needs_cleanup = router;
128                 return;
129         }
130
131 }
132
133 struct stasis_message_router *stasis_message_router_create(
134         struct stasis_topic *topic)
135 {
136         RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
137
138         router = ao2_alloc(sizeof(*router), router_dtor);
139         if (!router) {
140                 return NULL;
141         }
142
143         router->num_routes_max = INITIAL_ROUTES_MAX;
144         router->routes = ast_calloc(router->num_routes_max,
145                                     sizeof(*router->routes));
146         if (!router->routes) {
147                 return NULL;
148         }
149
150         router->subscription = stasis_subscribe(topic, router_dispatch, router);
151         if (!router->subscription) {
152                 return NULL;
153         }
154
155         ao2_ref(router, +1);
156         return router;
157 }
158
159 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
160 {
161         if (!router) {
162                 return;
163         }
164
165         stasis_unsubscribe(router->subscription);
166 }
167
168 static struct stasis_message_route *route_create(
169         struct stasis_message_type *message_type,
170         stasis_subscription_cb callback,
171         void *data)
172 {
173         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
174
175         route = ao2_alloc(sizeof(*route), route_dtor);
176         if (!route) {
177                 return NULL;
178         }
179
180         if (message_type) {
181                 ao2_ref(message_type, +1);
182         }
183         route->message_type = message_type;
184         route->callback = callback;
185         route->data = data;
186
187         ao2_ref(route, +1);
188         return route;
189 }
190
191 static int add_route(struct stasis_message_router *router,
192                      struct stasis_message_route *route)
193 {
194         struct stasis_message_route **routes;
195         size_t i;
196         SCOPED_AO2LOCK(lock, router);
197
198         /* Check for route conflicts */
199         for (i = 0; i < router->num_routes_current; ++i) {
200                 if (router->routes[i]->message_type == route->message_type) {
201                         return -1;
202                 }
203         }
204
205         /* Increase list size, if needed */
206         if (router->num_routes_current + 1 > router->num_routes_max) {
207                 routes = realloc(router->routes,
208                                  2 * router->num_routes_max * sizeof(*routes));
209                 if (!routes) {
210                         return -1;
211                 }
212                 router->routes = routes;
213                 router->num_routes_max *= 2;
214         }
215
216
217         ao2_ref(route, +1);
218         router->routes[router->num_routes_current++] = route;
219         return 0;
220 }
221
222 int stasis_message_router_add(struct stasis_message_router *router,
223                               struct stasis_message_type *message_type,
224                               stasis_subscription_cb callback,
225                               void *data)
226 {
227         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
228
229         route = route_create(message_type, callback, data);
230         if (!route) {
231                 return -1;
232         }
233
234         return add_route(router, route);
235 }
236
237 int stasis_message_router_set_default(struct stasis_message_router *router,
238                                       stasis_subscription_cb callback,
239                                       void *data)
240 {
241         SCOPED_AO2LOCK(lock, router);
242         ao2_cleanup(router->default_route);
243         router->default_route = route_create(NULL, callback, data);
244         return router->default_route ? 0 : -1;
245 }