vector: Update API to be more flexible.
[asterisk/asterisk.git] / main / stasis.c
index eabdfdc..db95986 100644 (file)
@@ -140,10 +140,10 @@ STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
 struct stasis_topic {
        char *name;
        /*! Variable length array of the subscribers */
-       ast_vector(struct stasis_subscription *) subscribers;
+       ast_vector(, struct stasis_subscription *) subscribers;
 
        /*! Topics forwarding into this topic */
-       ast_vector(struct stasis_topic *) upstream_topics;
+       ast_vector(, struct stasis_topic *) upstream_topics;
 };
 
 /* Forward declarations for the tightly-coupled subscription object */
@@ -152,18 +152,28 @@ static int topic_add_subscription(struct stasis_topic *topic,
 
 static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
 
+/*! \brief Lock two topics. */
+#define topic_lock_both(topic1, topic2) \
+       do { \
+               ao2_lock(topic1); \
+               while (ao2_trylock(topic2)) { \
+                       AO2_DEADLOCK_AVOIDANCE(topic1); \
+               } \
+       } while (0)
+
 static void topic_dtor(void *obj)
 {
        struct stasis_topic *topic = obj;
 
        /* Subscribers hold a reference to topics, so they should all be
         * unsubscribed before we get here. */
-       ast_assert(ast_vector_size(topic->subscribers) == 0);
+       ast_assert(ast_vector_size(&topic->subscribers) == 0);
+
        ast_free(topic->name);
        topic->name = NULL;
 
-       ast_vector_free(topic->subscribers);
-       ast_vector_free(topic->upstream_topics);
+       ast_vector_free(&topic->subscribers);
+       ast_vector_free(&topic->upstream_topics);
 }
 
 struct stasis_topic *stasis_topic_create(const char *name)
@@ -182,8 +192,8 @@ struct stasis_topic *stasis_topic_create(const char *name)
                return NULL;
        }
 
-       res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
-       res |= ast_vector_init(topic->upstream_topics, 0);
+       res |= ast_vector_init(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
+       res |= ast_vector_init(&topic->upstream_topics, 0);
 
        if (res != 0) {
                return NULL;
@@ -280,6 +290,10 @@ struct stasis_subscription *internal_stasis_subscribe(
 {
        RAII_VAR(struct stasis_subscription *, sub, NULL, ao2_cleanup);
 
+       if (!topic) {
+               return NULL;
+       }
+
        sub = ao2_alloc(sizeof(*sub), subscription_dtor);
        if (!sub) {
                return NULL;
@@ -414,8 +428,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
                struct stasis_topic *topic = sub->topic;
                SCOPED_AO2LOCK(lock_topic, topic);
 
-               for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
-                       if (ast_vector_get(topic->subscribers, i) == sub) {
+               for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) {
+                       if (ast_vector_get(&topic->subscribers, i) == sub) {
                                return 1;
                        }
                }
@@ -466,11 +480,11 @@ static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subs
         *
         * If we bumped the refcount here, the owner would have to unsubscribe
         * and cleanup, which is a bit awkward. */
-       ast_vector_append(topic->subscribers, sub);
+       ast_vector_append(&topic->subscribers, sub);
 
-       for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+       for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) {
                topic_add_subscription(
-                       ast_vector_get(topic->upstream_topics, idx), sub);
+                       ast_vector_get(&topic->upstream_topics, idx), sub);
        }
 
        return 0;
@@ -481,12 +495,13 @@ static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_s
        size_t idx;
        SCOPED_AO2LOCK(lock_topic, topic);
 
-       for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
+       for (idx = 0; idx < ast_vector_size(&topic->upstream_topics); ++idx) {
                topic_remove_subscription(
-                       ast_vector_get(topic->upstream_topics, idx), sub);
+                       ast_vector_get(&topic->upstream_topics, idx), sub);
        }
 
-       return ast_vector_remove_elem_unordered(topic->subscribers, sub);
+       return ast_vector_remove_elem_unordered(&topic->subscribers, sub,
+               AST_VECTOR_ELEM_CLEANUP_NOOP);
 }
 
 /*!
@@ -512,7 +527,7 @@ static void dispatch_message(struct stasis_subscription *sub,
                ao2_bump(message);
                if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
                        /* Push failed; ugh. */
-                       ast_log(LOG_DEBUG, "Dropping dispatch\n");
+                       ast_log(LOG_ERROR, "Dropping dispatch\n");
                        ao2_cleanup(message);
                }
        } else {
@@ -521,26 +536,28 @@ static void dispatch_message(struct stasis_subscription *sub,
        }
 }
 
-void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
+void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
 {
        size_t i;
-       /* The topic may be unref'ed by the subscription invocation.
-        * Make sure we hold onto a reference while dispatching. */
-       RAII_VAR(struct stasis_topic *, topic, ao2_bump(_topic),
-               ao2_cleanup);
-       SCOPED_AO2LOCK(lock, topic);
 
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
 
-       for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
-               struct stasis_subscription *sub =
-                       ast_vector_get(topic->subscribers, i);
+       /*
+        * The topic may be unref'ed by the subscription invocation.
+        * Make sure we hold onto a reference while dispatching.
+        */
+       ao2_ref(topic, +1);
+       ao2_lock(topic);
+       for (i = 0; i < ast_vector_size(&topic->subscribers); ++i) {
+               struct stasis_subscription *sub = ast_vector_get(&topic->subscribers, i);
 
                ast_assert(sub != NULL);
 
                dispatch_message(sub, message);
        }
+       ao2_unlock(topic);
+       ao2_ref(topic, -1);
 }
 
 /*!
@@ -570,23 +587,26 @@ static void forward_dtor(void *obj)
 
 struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
 {
-       if (forward) {
-               int idx;
+       int idx;
+       struct stasis_topic *from;
+       struct stasis_topic *to;
 
-               struct stasis_topic *from = forward->from_topic;
-               struct stasis_topic *to = forward->to_topic;
+       if (!forward) {
+               return NULL;
+       }
 
-               SCOPED_AO2LOCK(to_lock, to);
+       from = forward->from_topic;
+       to = forward->to_topic;
 
-               ast_vector_remove_elem_unordered(to->upstream_topics, from);
+       topic_lock_both(to, from);
+       ast_vector_remove_elem_unordered(&to->upstream_topics, from,
+               AST_VECTOR_ELEM_CLEANUP_NOOP);
 
-               ao2_lock(from);
-               for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
-                       topic_remove_subscription(
-                               from, ast_vector_get(to->subscribers, idx));
-               }
-               ao2_unlock(from);
+       for (idx = 0; idx < ast_vector_size(&to->subscribers); ++idx) {
+               topic_remove_subscription(from, ast_vector_get(&to->subscribers, idx));
        }
+       ao2_unlock(from);
+       ao2_unlock(to);
 
        ao2_cleanup(forward);
 
@@ -596,6 +616,8 @@ struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
 struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
        struct stasis_topic *to_topic)
 {
+       int res;
+       size_t idx;
        RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
 
        if (!from_topic || !to_topic) {
@@ -610,23 +632,19 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
        forward->from_topic = ao2_bump(from_topic);
        forward->to_topic = ao2_bump(to_topic);
 
-       {
-               SCOPED_AO2LOCK(lock, to_topic);
-               int res;
-
-               res = ast_vector_append(to_topic->upstream_topics, from_topic);
-               if (res != 0) {
-                       return NULL;
-               }
+       topic_lock_both(to_topic, from_topic);
+       res = ast_vector_append(&to_topic->upstream_topics, from_topic);
+       if (res != 0) {
+               ao2_unlock(from_topic);
+               ao2_unlock(to_topic);
+               return NULL;
+       }
 
-               {
-                       SCOPED_AO2LOCK(lock, from_topic);
-                       size_t idx;
-                       for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
-                               topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
-                       }
-               }
+       for (idx = 0; idx < ast_vector_size(&to_topic->subscribers); ++idx) {
+               topic_add_subscription(from_topic, ast_vector_get(&to_topic->subscribers, idx));
        }
+       ao2_unlock(from_topic);
+       ao2_unlock(to_topic);
 
        return ao2_bump(forward);
 }