BuildSystem: Remove unused variables.
[asterisk/asterisk.git] / main / stasis_message_router.c
index 26d2f2c..41d426b 100644 (file)
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_message_router.h"
-
-/*! Number of hash buckets for the route table. Keep it prime! */
-#define ROUTE_TABLE_BUCKETS 7
+#include "asterisk/vector.h"
 
 /*! \internal */
 struct stasis_message_route {
@@ -47,29 +43,88 @@ struct stasis_message_route {
        void *data;
 };
 
-static void route_dtor(void *obj)
+AST_VECTOR(route_table, struct stasis_message_route);
+
+static struct stasis_message_route *route_table_find(struct route_table *table,
+       struct stasis_message_type *message_type)
 {
-       struct stasis_message_route *route = obj;
+       size_t idx;
+       struct stasis_message_route *route;
+
+       /* While a linear search for routes may seem very inefficient, most
+        * route tables have six routes or less. For such small data, it's
+        * hard to beat a linear search. If we start having larger route
+        * tables, then we can look into containers with more efficient
+        * lookups.
+        */
+       for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
+               route = AST_VECTOR_GET_ADDR(table, idx);
+               if (route->message_type == message_type) {
+                       return route;
+               }
+       }
+
+       return NULL;
+}
+
+/*!
+ * \brief route_table comparator for AST_VECTOR_REMOVE_CMP_UNORDERED()
+ *
+ * \param elem Element to compare against
+ * \param value Value to compare with the vector element.
+ *
+ * \return 0 if element does not match.
+ * \return Non-zero if element matches.
+ */
+#define ROUTE_TABLE_ELEM_CMP(elem, value) ((elem).message_type == (value))
 
-       ao2_cleanup(route->message_type);
-       route->message_type = NULL;
+/*!
+ * \brief route_table vector element cleanup.
+ *
+ * \param elem Element to cleanup
+ *
+ * \return Nothing
+ */
+#define ROUTE_TABLE_ELEM_CLEANUP(elem)  ao2_cleanup((elem).message_type)
+
+static int route_table_remove(struct route_table *table,
+       struct stasis_message_type *message_type)
+{
+       return AST_VECTOR_REMOVE_CMP_UNORDERED(table, message_type, ROUTE_TABLE_ELEM_CMP,
+               ROUTE_TABLE_ELEM_CLEANUP);
 }
 
-static int route_hash(const void *obj, const int flags)
+static int route_table_add(struct route_table *table,
+       struct stasis_message_type *message_type,
+       stasis_subscription_cb callback, void *data)
 {
-       const struct stasis_message_route *route = obj;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
+       struct stasis_message_route route;
+       int res;
+
+       ast_assert(callback != NULL);
+       ast_assert(route_table_find(table, message_type) == NULL);
 
-       return ast_str_hash(stasis_message_type_name(message_type));
+       route.message_type = ao2_bump(message_type);
+       route.callback = callback;
+       route.data = data;
+
+       res = AST_VECTOR_APPEND(table, route);
+       if (res) {
+               ROUTE_TABLE_ELEM_CLEANUP(route);
+       }
+       return res;
 }
 
-static int route_cmp(void *obj, void *arg, int flags)
+static void route_table_dtor(struct route_table *table)
 {
-       const struct stasis_message_route *left = obj;
-       const struct stasis_message_route *right = arg;
-       const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
+       size_t idx;
+       struct stasis_message_route *route;
 
-       return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
+       for (idx = 0; idx < AST_VECTOR_SIZE(table); ++idx) {
+               route = AST_VECTOR_GET_ADDR(table, idx);
+               ROUTE_TABLE_ELEM_CLEANUP(*route);
+       }
+       AST_VECTOR_FREE(table);
 }
 
 /*! \internal */
@@ -77,11 +132,11 @@ struct stasis_message_router {
        /*! Subscription to the upstream topic */
        struct stasis_subscription *subscription;
        /*! Subscribed routes */
-       struct ao2_container *routes;
-       /*! Subscribed routes for \ref stasi_cache_update messages */
-       struct ao2_container *cache_routes;
+       struct route_table routes;
+       /*! Subscribed routes for \ref stasis_cache_update messages */
+       struct route_table cache_routes;
        /*! Route of last resort */
-       struct stasis_message_route *default_route;
+       struct stasis_message_route default_route;
 };
 
 static void router_dtor(void *obj)
@@ -90,111 +145,120 @@ static void router_dtor(void *obj)
 
        ast_assert(!stasis_subscription_is_subscribed(router->subscription));
        ast_assert(stasis_subscription_is_done(router->subscription));
-       router->subscription = NULL;
-
-       ao2_cleanup(router->routes);
-       router->routes = NULL;
 
-       ao2_cleanup(router->cache_routes);
-       router->cache_routes = NULL;
+       router->subscription = NULL;
 
-       ao2_cleanup(router->default_route);
-       router->default_route = NULL;
+       route_table_dtor(&router->routes);
+       route_table_dtor(&router->cache_routes);
 }
 
-static struct stasis_message_route *find_route(
+static int find_route(
        struct stasis_message_router *router,
-       struct stasis_message *message)
+       struct stasis_message *message,
+       struct stasis_message_route *route_out)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route *route = NULL;
        struct stasis_message_type *type = stasis_message_type(message);
        SCOPED_AO2LOCK(lock, router);
 
+       ast_assert(route_out != NULL);
+
        if (type == stasis_cache_update_type()) {
                /* Find a cache route */
                struct stasis_cache_update *update =
                        stasis_message_data(message);
-               route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
+               route = route_table_find(&router->cache_routes, update->type);
        }
 
        if (route == NULL) {
                /* Find a regular route */
-               route = ao2_find(router->routes, type, OBJ_KEY);
+               route = route_table_find(&router->routes, type);
        }
 
-       if (route == NULL) {
+       if (route == NULL && router->default_route.callback) {
                /* Maybe the default route, then? */
-               if ((route = router->default_route)) {
-                       ao2_ref(route, +1);
-               }
+               route = &router->default_route;
        }
 
-       if (route == NULL) {
-               return NULL;
+       if (!route) {
+               return -1;
        }
 
-       ao2_ref(route, +1);
-       return route;
+       *route_out = *route;
+       return 0;
 }
 
 static void router_dispatch(void *data,
                            struct stasis_subscription *sub,
-                           struct stasis_topic *topic,
                            struct stasis_message *message)
 {
        struct stasis_message_router *router = data;
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       struct stasis_message_route route;
 
-       route = find_route(router, message);
-
-       if (route) {
-               route->callback(route->data, sub, topic, message);
+       if (find_route(router, message, &route) == 0) {
+               route.callback(route.data, sub, message);
        }
 
-
        if (stasis_subscription_final_message(sub, message)) {
                ao2_cleanup(router);
        }
 }
 
-struct stasis_message_router *stasis_message_router_create(
-       struct stasis_topic *topic)
+static struct stasis_message_router *stasis_message_router_create_internal(
+       struct stasis_topic *topic, int use_thread_pool)
 {
-       RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
+       int res;
+       struct stasis_message_router *router;
 
-       router = ao2_alloc(sizeof(*router), router_dtor);
+       router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
        if (!router) {
                return NULL;
        }
 
-       router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
-               route_cmp);
-       if (!router->routes) {
-               return NULL;
-       }
+       res = 0;
+       res |= AST_VECTOR_INIT(&router->routes, 0);
+       res |= AST_VECTOR_INIT(&router->cache_routes, 0);
+       if (res) {
+               ao2_ref(router, -1);
 
-       router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
-               route_hash, route_cmp);
-       if (!router->cache_routes) {
                return NULL;
        }
 
-       router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       if (use_thread_pool) {
+               router->subscription = stasis_subscribe_pool(topic, router_dispatch, router);
+       } else {
+               router->subscription = stasis_subscribe(topic, router_dispatch, router);
+       }
        if (!router->subscription) {
+               ao2_ref(router, -1);
+
                return NULL;
        }
 
-       ao2_ref(router, +1);
        return router;
 }
 
+struct stasis_message_router *stasis_message_router_create(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 0);
+}
+
+struct stasis_message_router *stasis_message_router_create_pool(
+       struct stasis_topic *topic)
+{
+       return stasis_message_router_create_internal(topic, 1);
+}
+
 void stasis_message_router_unsubscribe(struct stasis_message_router *router)
 {
        if (!router) {
                return;
        }
 
-       stasis_unsubscribe(router->subscription);
+       ao2_lock(router);
+       router->subscription = stasis_unsubscribe(router->subscription);
+       ao2_unlock(router);
 }
 
 void stasis_message_router_unsubscribe_and_join(
@@ -216,118 +280,104 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
        return stasis_subscription_is_done(router->subscription);
 }
 
-
-static struct stasis_message_route *route_create(
-       struct stasis_message_type *message_type,
-       stasis_subscription_cb callback,
-       void *data)
+void stasis_message_router_publish_sync(struct stasis_message_router *router,
+       struct stasis_message *message)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
-
-       route = ao2_alloc(sizeof(*route), route_dtor);
-       if (!route) {
-               return NULL;
-       }
-
-       if (message_type) {
-               ao2_ref(message_type, +1);
-       }
-       route->message_type = message_type;
-       route->callback = callback;
-       route->data = data;
+       ast_assert(router != NULL);
 
-       ao2_ref(route, +1);
-       return route;
+       ao2_bump(router);
+       stasis_publish_sync(router->subscription, message);
+       ao2_cleanup(router);
 }
 
-static int add_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+       long low_water, long high_water)
 {
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
+       int res = -1;
 
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
+       if (router) {
+               res = stasis_subscription_set_congestion_limits(router->subscription,
+                       low_water, high_water);
        }
-
-       ao2_link(router->routes, route);
-       return 0;
-}
-
-static int add_cache_route(struct stasis_message_router *router,
-                    struct stasis_message_route *route)
-{
-       RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
-       SCOPED_AO2LOCK(lock, router);
-
-       existing_route = ao2_find(router->cache_routes, route->message_type,
-               OBJ_KEY);
-
-       if (existing_route) {
-               ast_log(LOG_ERROR, "Cannot add route; route exists\n");
-               return -1;
-       }
-
-       ao2_link(router->cache_routes, route);
-       return 0;
+       return res;
 }
 
 int stasis_message_router_add(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       int res;
 
-       route = route_create(message_type, callback, data);
-       if (!route) {
+       ast_assert(router != NULL);
+
+       if (!message_type) {
+               /* Cannot route to NULL type. */
                return -1;
        }
-
-       return add_route(router, route);
+       ao2_lock(router);
+       res = route_table_add(&router->routes, message_type, callback, data);
+       ao2_unlock(router);
+       return res;
 }
 
 int stasis_message_router_add_cache_update(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)
 {
-       RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
+       int res;
 
-       route = route_create(message_type, callback, data);
-       if (!route) {
+       ast_assert(router != NULL);
+
+       if (!message_type) {
+               /* Cannot cache a route to NULL type. */
                return -1;
        }
-
-       return add_cache_route(router, route);
+       ao2_lock(router);
+       res = route_table_add(&router->cache_routes, message_type, callback, data);
+       ao2_unlock(router);
+       return res;
 }
 
 void stasis_message_router_remove(struct stasis_message_router *router,
        struct stasis_message_type *message_type)
 {
-       SCOPED_AO2LOCK(lock, router);
+       ast_assert(router != NULL);
 
-       ao2_find(router->routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       if (!message_type) {
+               /* Cannot remove a NULL type. */
+               return;
+       }
+       ao2_lock(router);
+       route_table_remove(&router->routes, message_type);
+       ao2_unlock(router);
 }
 
 void stasis_message_router_remove_cache_update(
        struct stasis_message_router *router,
        struct stasis_message_type *message_type)
 {
-       SCOPED_AO2LOCK(lock, router);
+       ast_assert(router != NULL);
 
-       ao2_find(router->cache_routes, message_type,
-               OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
+       if (!message_type) {
+               /* Cannot remove a NULL type. */
+               return;
+       }
+       ao2_lock(router);
+       route_table_remove(&router->cache_routes, message_type);
+       ao2_unlock(router);
 }
 
 int stasis_message_router_set_default(struct stasis_message_router *router,
-                                     stasis_subscription_cb callback,
-                                     void *data)
+       stasis_subscription_cb callback,
+       void *data)
 {
-       SCOPED_AO2LOCK(lock, router);
-       ao2_cleanup(router->default_route);
-       router->default_route = route_create(NULL, callback, data);
-       return router->default_route ? 0 : -1;
+       ast_assert(router != NULL);
+       ast_assert(callback != NULL);
+
+       ao2_lock(router);
+       router->default_route.callback = callback;
+       router->default_route.data = data;
+       ao2_unlock(router);
+       /* While this implementation can never fail, it used to be able to */
+       return 0;
 }