A simplistic router for stasis_message's.
[asterisk/asterisk.git] / main / stasis_message_router.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis message router implementation.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_message_router.h"
36
37 #define INITIAL_ROUTES_MAX 8
38
39 /*! \internal */
40 struct stasis_message_route {
41         /*! Message type handle by this route. */
42         struct stasis_message_type *message_type;
43         /*! Callback function for incoming message processing. */
44         stasis_subscription_cb callback;
45         /*! Data pointer to be handed to the callback. */
46         void *data;
47 };
48
49 static void route_dtor(void *obj)
50 {
51         struct stasis_message_route *route = obj;
52
53         ao2_cleanup(route->message_type);
54         route->message_type = NULL;
55 }
56
57 /*! \internal */
58 struct stasis_message_router {
59         /*! Subscription to the upstream topic */
60         struct stasis_subscription *subscription;
61         /*! Variable length array of the routes */
62         struct stasis_message_route **routes;
63         /*! Route of last resort */
64         struct stasis_message_route *default_route;
65         /*! Allocated length of the routes array */
66         size_t num_routes_max;
67         /*! Current size of the routes array */
68         size_t num_routes_current;
69 };
70
71 static void router_dtor(void *obj)
72 {
73         struct stasis_message_router *router = obj;
74         size_t i;
75
76         ast_assert(!stasis_subscription_is_subscribed(router->subscription));
77         router->subscription = NULL;
78         for (i = 0; i < router->num_routes_current; ++i) {
79                 ao2_cleanup(router->routes[i]);
80                 router->routes[i] = NULL;
81         }
82         ast_free(router->routes);
83         router->routes = NULL;
84         ao2_cleanup(router->default_route);
85         router->default_route = NULL;
86 }
87
88 static void router_dispatch(void *data,
89                             struct stasis_subscription *sub,
90                             struct stasis_topic *topic,
91                             struct stasis_message *message)
92 {
93         struct stasis_message_router *router = data;
94         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
95         struct stasis_message_type *type = stasis_message_type(message);
96         size_t i;
97
98         {
99                 SCOPED_AO2LOCK(lock, router);
100
101                 /* We don't expect many message types, so a simple loop should
102                  * be adequate, even if the complexity is O(n). Sorting the list
103                  * would be an easy way to bring that down to O(log(n)). Using a
104                  * hashtable/ao2_container could be even better. Just be sure to
105                  * profile before you optimize!
106                  */
107                 route = router->default_route;
108                 for (i = 0; i < router->num_routes_current; ++i) {
109                         if (router->routes[i]->message_type == type) {
110                                 route = router->routes[i];
111                                 break;
112                         }
113                 }
114
115                 /* Ref the route before leaving the scoped lock */
116                 if (route) {
117                         ao2_ref(route, +1);
118                 }
119         }
120
121         if (route) {
122                 route->callback(route->data, sub, topic, message);
123         }
124
125         if (stasis_subscription_final_message(sub, message)) {
126                 ao2_cleanup(router);
127                 return;
128         }
129
130 }
131
132 struct stasis_message_router *stasis_message_router_create(
133         struct stasis_topic *topic)
134 {
135         RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
136
137         router = ao2_alloc(sizeof(*router), router_dtor);
138         if (!router) {
139                 return NULL;
140         }
141
142         router->num_routes_max = INITIAL_ROUTES_MAX;
143         router->routes = ast_calloc(router->num_routes_max,
144                                     sizeof(*router->routes));
145         if (!router->routes) {
146                 return NULL;
147         }
148
149         router->subscription = stasis_subscribe(topic, router_dispatch, router);
150         if (!router->subscription) {
151                 return NULL;
152         }
153
154         ao2_ref(router, +1);
155         return router;
156 }
157
158 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
159 {
160         if (!router) {
161                 return;
162         }
163
164         stasis_unsubscribe(router->subscription);
165 }
166
167 static struct stasis_message_route *route_create(
168         struct stasis_message_type *message_type,
169         stasis_subscription_cb callback,
170         void *data)
171 {
172         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
173
174         route = ao2_alloc(sizeof(*route), route_dtor);
175         if (!route) {
176                 return NULL;
177         }
178
179         if (message_type) {
180                 ao2_ref(message_type, +1);
181         }
182         route->message_type = message_type;
183         route->callback = callback;
184         route->data = data;
185
186         ao2_ref(route, +1);
187         return route;
188 }
189
190 static int add_route(struct stasis_message_router *router,
191                      struct stasis_message_route *route)
192 {
193         struct stasis_message_route **routes;
194         size_t i;
195         SCOPED_AO2LOCK(lock, router);
196
197         /* Check for route conflicts */
198         for (i = 0; i < router->num_routes_current; ++i) {
199                 if (router->routes[i]->message_type == route->message_type) {
200                         return -1;
201                 }
202         }
203
204         /* Increase list size, if needed */
205         if (router->num_routes_current + 1 > router->num_routes_max) {
206                 routes = realloc(router->routes,
207                                  2 * router->num_routes_max * sizeof(*routes));
208                 if (!routes) {
209                         return -1;
210                 }
211                 router->routes = routes;
212                 router->num_routes_max *= 2;
213         }
214
215
216         ao2_ref(route, +1);
217         router->routes[router->num_routes_current++] = route;
218         return 0;
219 }
220
221 int stasis_message_router_add(struct stasis_message_router *router,
222                               struct stasis_message_type *message_type,
223                               stasis_subscription_cb callback,
224                               void *data)
225 {
226         RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
227
228         route = route_create(message_type, callback, data);
229         if (!route) {
230                 return -1;
231         }
232
233         return add_route(router, route);
234 }
235
236 int stasis_message_router_set_default(struct stasis_message_router *router,
237                                       stasis_subscription_cb callback,
238                                       void *data)
239 {
240         SCOPED_AO2LOCK(lock, router);
241         ao2_cleanup(router->default_route);
242         router->default_route = route_create(NULL, callback, data);
243         return router->default_route ? 0 : -1;
244 }