2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis message router implementation.
23 * \author David M. Lee, II <dlee@digium.com>
27 <support_level>core</support_level>
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis_message_router.h"
36 #include "asterisk/vector.h"
39 struct stasis_message_route {
40 /*! Message type handle by this route. */
41 struct stasis_message_type *message_type;
42 /*! Callback function for incoming message processing. */
43 stasis_subscription_cb callback;
44 /*! Data pointer to be handed to the callback. */
48 AST_VECTOR(route_table, struct stasis_message_route);
50 static struct stasis_message_route *route_table_find(struct route_table *table,
51 struct stasis_message_type *message_type)
54 struct stasis_message_route *route;
56 /* While a linear search for routes may seem very inefficient, most
57 * route tables have six routes or less. For such small data, it's
58 * hard to beat a linear search. If we start having larger route
59 * tables, then we can look into containers with more efficient
62 for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
63 route = AST_VECTOR_GET_ADDR(table, idx);
64 if (route->message_type == message_type) {
73 * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
75 * \param elem Element to compare against
76 * \param value Value to compare with the vector element.
78 * \return 0 if element does not match.
79 * \return Non-zero if element matches.
81 #define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
84 * \brief route_table vector element cleanup.
86 * \param elem Element to cleanup
90 #define ROUTE_TABLE_ELEM_CLEANUP(elem) ao2_cleanup((elem).message_type)
92 static int route_table_remove(struct route_table *table,
93 struct stasis_message_type *message_type)
95 return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
96 ROUTE_TABLE_ELEM_CLEANUP);
99 static int route_table_add(struct route_table *table,
100 struct stasis_message_type *message_type,
101 stasis_subscription_cb callback, void *data)
103 struct stasis_message_route route;
106 ast_assert(callback != NULL);
107 ast_assert(route_table_find(table, message_type) == NULL);
109 route.message_type = ao2_bump(message_type);
110 route.callback = callback;
113 res = AST_VECTOR_APPEND(table, route);
115 ROUTE_TABLE_ELEM_CLEANUP(route);
120 static void route_table_dtor(struct route_table *table)
123 struct stasis_message_route *route;
125 for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
126 route = AST_VECTOR_GET_ADDR(table, idx);
127 ROUTE_TABLE_ELEM_CLEANUP(*route);
129 AST_VECTOR_FREE(table);
133 struct stasis_message_router {
134 /*! Subscription to the upstream topic */
135 struct stasis_subscription *subscription;
136 /*! Subscribed routes */
137 struct route_table routes;
138 /*! Subscribed routes for \ref stasis_cache_update messages */
139 struct route_table cache_routes;
140 /*! Route of last resort */
141 struct stasis_message_route default_route;
144 static void router_dtor(void *obj)
146 struct stasis_message_router *router = obj;
148 ast_assert(!stasis_subscription_is_subscribed(router->subscription));
149 ast_assert(stasis_subscription_is_done(router->subscription));
151 router->subscription = NULL;
153 route_table_dtor(&router->routes);
154 route_table_dtor(&router->cache_routes);
157 static int find_route(
158 struct stasis_message_router *router,
159 struct stasis_message *message,
160 struct stasis_message_route *route_out)
162 struct stasis_message_route *route = NULL;
163 struct stasis_message_type *type = stasis_message_type(message);
164 SCOPED_AO2LOCK(lock, router);
166 ast_assert(route_out != NULL);
168 if (type == stasis_cache_update_type()) {
169 /* Find a cache route */
170 struct stasis_cache_update *update =
171 stasis_message_data(message);
172 route = route_table_find(&router->cache_routes, update->type);
176 /* Find a regular route */
177 route = route_table_find(&router->routes, type);
180 if (route == NULL && router->default_route.callback) {
181 /* Maybe the default route, then? */
182 route = &router->default_route;
193 static void router_dispatch(void *data,
194 struct stasis_subscription *sub,
195 struct stasis_message *message)
197 struct stasis_message_router *router = data;
198 struct stasis_message_route route;
200 if (find_route(router, message, &route) == 0) {
201 route.callback(route.data, sub, message);
204 if (stasis_subscription_final_message(sub, message)) {
209 struct stasis_message_router *stasis_message_router_create(
210 struct stasis_topic *topic)
213 RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
215 router = ao2_alloc(sizeof(*router), router_dtor);
221 res |= AST_VECTOR_INIT(&router->routes, 0);
222 res |= AST_VECTOR_INIT(&router->cache_routes, 0);
227 router->subscription = stasis_subscribe(topic, router_dispatch, router);
228 if (!router->subscription) {
236 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
242 stasis_unsubscribe(router->subscription);
245 void stasis_message_router_unsubscribe_and_join(
246 struct stasis_message_router *router)
251 stasis_unsubscribe_and_join(router->subscription);
254 int stasis_message_router_is_done(struct stasis_message_router *router)
257 /* Null router is about as done as you can get */
261 return stasis_subscription_is_done(router->subscription);
264 void stasis_message_router_publish_sync(struct stasis_message_router *router,
265 struct stasis_message *message)
267 ast_assert(router != NULL);
270 stasis_publish_sync(router->subscription, message);
274 int stasis_message_router_add(struct stasis_message_router *router,
275 struct stasis_message_type *message_type,
276 stasis_subscription_cb callback, void *data)
280 ast_assert(router != NULL);
283 /* Cannot route to NULL type. */
287 res = route_table_add(&router->routes, message_type, callback, data);
292 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
293 struct stasis_message_type *message_type,
294 stasis_subscription_cb callback, void *data)
298 ast_assert(router != NULL);
301 /* Cannot cache a route to NULL type. */
305 res = route_table_add(&router->cache_routes, message_type, callback, data);
310 void stasis_message_router_remove(struct stasis_message_router *router,
311 struct stasis_message_type *message_type)
313 ast_assert(router != NULL);
316 /* Cannot remove a NULL type. */
320 route_table_remove(&router->routes, message_type);
324 void stasis_message_router_remove_cache_update(
325 struct stasis_message_router *router,
326 struct stasis_message_type *message_type)
328 ast_assert(router != NULL);
331 /* Cannot remove a NULL type. */
335 route_table_remove(&router->cache_routes, message_type);
339 int stasis_message_router_set_default(struct stasis_message_router *router,
340 stasis_subscription_cb callback,
343 ast_assert(router != NULL);
344 ast_assert(callback != NULL);
347 router->default_route.callback = callback;
348 router->default_route.data = data;
350 /* While this implementation can never fail, it used to be able to */