BuildSystem: Remove unused variables.
[asterisk/asterisk.git] / main / stasis_message_router.c
index a9e4584..41d426b 100644 (file)
@@ -29,8 +29,6 @@
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/vector.h"
@@ -210,7 +208,7 @@ static struct stasis_message_router *stasis_message_router_create_internal(
        struct stasis_topic *topic, int use_thread_pool)
 {
        int res;
-       RAII_VAR(struct stasis_message_router *, router, NULL, ao2_cleanup);
+       struct stasis_message_router *router;
 
        router = ao2_t_alloc(sizeof(*router), router_dtor, stasis_topic_name(topic));
        if (!router) {
@@ -221,6 +219,8 @@ static struct stasis_message_router *stasis_message_router_create_internal(
        res |= AST_VECTOR_INIT(&router->routes, 0);
        res |= AST_VECTOR_INIT(&router->cache_routes, 0);
        if (res) {
+               ao2_ref(router, -1);
+
                return NULL;
        }
 
@@ -230,10 +230,11 @@ static struct stasis_message_router *stasis_message_router_create_internal(
                router->subscription = stasis_subscribe(topic, router_dispatch, router);
        }
        if (!router->subscription) {
+               ao2_ref(router, -1);
+
                return NULL;
        }
 
-       ao2_ref(router, +1);
        return router;
 }
 
@@ -255,7 +256,9 @@ void stasis_message_router_unsubscribe(struct stasis_message_router *router)
                return;
        }
 
-       stasis_unsubscribe(router->subscription);
+       ao2_lock(router);
+       router->subscription = stasis_unsubscribe(router->subscription);
+       ao2_unlock(router);
 }
 
 void stasis_message_router_unsubscribe_and_join(
@@ -287,6 +290,18 @@ void stasis_message_router_publish_sync(struct stasis_message_router *router,
        ao2_cleanup(router);
 }
 
+int stasis_message_router_set_congestion_limits(struct stasis_message_router *router,
+       long low_water, long high_water)
+{
+       int res = -1;
+
+       if (router) {
+               res = stasis_subscription_set_congestion_limits(router->subscription,
+                       low_water, high_water);
+       }
+       return res;
+}
+
 int stasis_message_router_add(struct stasis_message_router *router,
        struct stasis_message_type *message_type,
        stasis_subscription_cb callback, void *data)