stasis.c: Misc code cleanups.
authorRichard Mudgett <rmudgett@digium.com>
Fri, 28 Feb 2014 23:31:58 +0000 (23:31 +0000)
committerRichard Mudgett <rmudgett@digium.com>
Fri, 28 Feb 2014 23:31:58 +0000 (23:31 +0000)
* Remove some unnecessary RAII_VAR() usage.

* Made the struct stasis_subscription ao2 object use the ao2 lock instead
of a redundant join_lock in the struct for ast_cond_wait().

* Removed locks on some ao2 objects that don't need the lock.

* Made the topic pool entries container use the ao2 template functions.

* Add some missing allocation failure checks.

* Add missing cleanup in off nominal path of dispatch_message().
........

Merged revisions 409270 from http://svn.asterisk.org/svn/asterisk/branches/12

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@409271 65c4cc65-6c06-0410-ace0-fbb531ad65f3

main/stasis.c

index 0a5db2f..4d05f18 100644 (file)
@@ -178,28 +178,22 @@ static void topic_dtor(void *obj)
 
 struct stasis_topic *stasis_topic_create(const char *name)
 {
-       RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
+       struct stasis_topic *topic;
        int res = 0;
 
        topic = ao2_alloc(sizeof(*topic), topic_dtor);
-
        if (!topic) {
                return NULL;
        }
 
        topic->name = ast_strdup(name);
-       if (!topic->name) {
-               return NULL;
-       }
-
        res |= AST_VECTOR_INIT(&topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
        res |= AST_VECTOR_INIT(&topic->upstream_topics, 0);
-
-       if (res != 0) {
+       if (!topic->name || res) {
+               ao2_cleanup(topic);
                return NULL;
        }
 
-       ao2_ref(topic, +1);
        return topic;
 }
 
@@ -221,8 +215,6 @@ struct stasis_subscription {
        /*! Data pointer to be handed to the callback. */
        void *data;
 
-       /*! Lock for completion flags \c final_message_{rxed,processed}. */
-       ast_mutex_t join_lock;
        /*! Condition for joining with subscription. */
        ast_cond_t join_cond;
        /*! Flag set when final message for sub has been received.
@@ -248,7 +240,6 @@ static void subscription_dtor(void *obj)
        sub->topic = NULL;
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
-       ast_mutex_destroy(&sub->join_lock);
        ast_cond_destroy(&sub->join_cond);
 }
 
@@ -263,7 +254,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
 {
        /* Notify that the final message has been received */
        if (stasis_subscription_final_message(sub, message)) {
-               SCOPED_MUTEX(lock, &sub->join_lock);
+               SCOPED_AO2LOCK(lock, sub);
+
                sub->final_message_rxed = 1;
                ast_cond_signal(&sub->join_cond);
        }
@@ -273,7 +265,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
 
        /* Notify that the final message has been processed */
        if (stasis_subscription_final_message(sub, message)) {
-               SCOPED_MUTEX(lock, &sub->join_lock);
+               SCOPED_AO2LOCK(lock, sub);
+
                sub->final_message_processed = 1;
                ast_cond_signal(&sub->join_cond);
        }
@@ -294,6 +287,7 @@ struct stasis_subscription *internal_stasis_subscribe(
                return NULL;
        }
 
+       /* The ao2 lock is used for join_cond. */
        sub = ao2_alloc(sizeof(*sub), subscription_dtor);
        if (!sub) {
                return NULL;
@@ -323,7 +317,6 @@ struct stasis_subscription *internal_stasis_subscribe(
        sub->topic = topic;
        sub->callback = callback;
        sub->data = data;
-       ast_mutex_init(&sub->join_lock);
        ast_cond_init(&sub->join_cond, NULL);
 
        if (topic_add_subscription(topic, sub) != 0) {
@@ -385,11 +378,12 @@ struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
-               SCOPED_MUTEX(lock, &subscription->join_lock);
+               SCOPED_AO2LOCK(lock, subscription);
+
                /* Wait until the processed flag has been set */
                while (!subscription->final_message_processed) {
                        ast_cond_wait(&subscription->join_cond,
-                               &subscription->join_lock);
+                               ao2_object_get_lockaddr(subscription));
                }
        }
 }
@@ -397,7 +391,8 @@ void stasis_subscription_join(struct stasis_subscription *subscription)
 int stasis_subscription_is_done(struct stasis_subscription *subscription)
 {
        if (subscription) {
-               SCOPED_MUTEX(lock, &subscription->join_lock);
+               SCOPED_AO2LOCK(lock, subscription);
+
                return subscription->final_message_rxed;
        }
 
@@ -446,6 +441,7 @@ const char *stasis_subscription_uniqueid(const struct stasis_subscription *sub)
 int stasis_subscription_final_message(struct stasis_subscription *sub, struct stasis_message *msg)
 {
        struct stasis_subscription_change *change;
+
        if (stasis_message_type(msg) != stasis_subscription_change_type()) {
                return 0;
        }
@@ -575,9 +571,7 @@ static void dispatch_message(struct stasis_subscription *sub,
         */
        ao2_bump(message);
        if (!synchronous) {
-               if (ast_taskprocessor_push_local(sub->mailbox,
-                                                    dispatch_exec_async,
-                                                    message) != 0) {
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_async, message)) {
                        /* Push failed; ugh. */
                        ast_log(LOG_ERROR, "Dropping async dispatch\n");
                        ao2_cleanup(message);
@@ -590,12 +584,12 @@ static void dispatch_message(struct stasis_subscription *sub,
                std.complete = 0;
                std.task_data = message;
 
-               if (ast_taskprocessor_push_local(sub->mailbox,
-                                                    dispatch_exec_sync,
-                                                    &std)) {
+               if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec_sync, &std)) {
                        /* Push failed; ugh. */
                        ast_log(LOG_ERROR, "Dropping sync dispatch\n");
                        ao2_cleanup(message);
+                       ast_mutex_destroy(&std.lock);
+                       ast_cond_destroy(&std.cond);
                        return;
                }
 
@@ -718,7 +712,7 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
                return NULL;
        }
 
-       forward = ao2_alloc(sizeof(*forward), forward_dtor);
+       forward = ao2_alloc_options(sizeof(*forward), forward_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (!forward) {
                return NULL;
        }
@@ -746,16 +740,18 @@ struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
 static void subscription_change_dtor(void *obj)
 {
        struct stasis_subscription_change *change = obj;
+
        ast_string_field_free_memory(change);
        ao2_cleanup(change->topic);
 }
 
 static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
 {
-       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
+       struct stasis_subscription_change *change;
 
        change = ao2_alloc(sizeof(struct stasis_subscription_change), subscription_change_dtor);
-       if (ast_string_field_init(change, 128)) {
+       if (!change || ast_string_field_init(change, 128)) {
+               ao2_cleanup(change);
                return NULL;
        }
 
@@ -764,51 +760,50 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
        ao2_ref(topic, +1);
        change->topic = topic;
 
-       ao2_ref(change, +1);
        return change;
 }
 
 static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
 {
-       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       struct stasis_subscription_change *change;
+       struct stasis_message *msg;
 
        /* This assumes that we have already unsubscribed */
        ast_assert(stasis_subscription_is_subscribed(sub));
 
        change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
-
        if (!change) {
                return;
        }
 
        msg = stasis_message_create(stasis_subscription_change_type(), change);
-
        if (!msg) {
+               ao2_cleanup(change);
                return;
        }
 
        stasis_publish(topic, msg);
+       ao2_cleanup(msg);
+       ao2_cleanup(change);
 }
 
 static void send_subscription_unsubscribe(struct stasis_topic *topic,
        struct stasis_subscription *sub)
 {
-       RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
-       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       struct stasis_subscription_change *change;
+       struct stasis_message *msg;
 
        /* This assumes that we have already unsubscribed */
        ast_assert(!stasis_subscription_is_subscribed(sub));
 
        change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
-
        if (!change) {
                return;
        }
 
        msg = stasis_message_create(stasis_subscription_change_type(), change);
-
        if (!msg) {
+               ao2_cleanup(change);
                return;
        }
 
@@ -816,6 +811,9 @@ static void send_subscription_unsubscribe(struct stasis_topic *topic,
 
        /* Now we have to dispatch to the subscription itself */
        dispatch_message(sub, msg, 0);
+
+       ao2_cleanup(msg);
+       ao2_cleanup(change);
 }
 
 struct topic_pool_entry {
@@ -826,6 +824,7 @@ struct topic_pool_entry {
 static void topic_pool_entry_dtor(void *obj)
 {
        struct topic_pool_entry *entry = obj;
+
        entry->forward = stasis_forward_cancel(entry->forward);
        ao2_cleanup(entry->topic);
        entry->topic = NULL;
@@ -833,7 +832,8 @@ static void topic_pool_entry_dtor(void *obj)
 
 static struct topic_pool_entry *topic_pool_entry_alloc(void)
 {
-       return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
+       return ao2_alloc_options(sizeof(struct topic_pool_entry), topic_pool_entry_dtor,
+               AO2_ALLOC_OPT_LOCK_NOLOCK);
 }
 
 struct stasis_topic_pool {
@@ -844,6 +844,7 @@ struct stasis_topic_pool {
 static void topic_pool_dtor(void *obj)
 {
        struct stasis_topic_pool *pool = obj;
+
        ao2_cleanup(pool->pool_container);
        pool->pool_container = NULL;
        ao2_cleanup(pool->pool_topic);
@@ -852,28 +853,80 @@ static void topic_pool_dtor(void *obj)
 
 static int topic_pool_entry_hash(const void *obj, const int flags)
 {
-       const char *topic_name = (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
-       return ast_str_case_hash(topic_name);
+       const struct topic_pool_entry *object;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               object = obj;
+               key = stasis_topic_name(object->topic);
+               break;
+       default:
+               /* Hash can only work on something with a full key. */
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_case_hash(key);
 }
 
 static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
 {
-       struct topic_pool_entry *opt1 = obj, *opt2 = arg;
-       const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
-       return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
+       const struct topic_pool_entry *object_left = obj;
+       const struct topic_pool_entry *object_right = arg;
+       const char *right_key = arg;
+       int cmp;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_OBJECT:
+               right_key = stasis_topic_name(object_right->topic);
+               /* Fall through */
+       case OBJ_SEARCH_KEY:
+               cmp = strcasecmp(stasis_topic_name(object_left->topic), right_key);
+               break;
+       case OBJ_SEARCH_PARTIAL_KEY:
+               /* Not supported by container */
+               ast_assert(0);
+               cmp = -1;
+               break;
+       default:
+               /*
+                * What arg points to is specific to this traversal callback
+                * and has no special meaning to astobj2.
+                */
+               cmp = 0;
+               break;
+       }
+       if (cmp) {
+               return 0;
+       }
+       /*
+        * At this point the traversal callback is identical to a sorted
+        * container.
+        */
+       return CMP_MATCH;
 }
 
 struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
 {
-       RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
+       struct stasis_topic_pool *pool;
+
+       pool = ao2_alloc_options(sizeof(*pool), topic_pool_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (!pool) {
                return NULL;
        }
-       pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
+
+       pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS,
+               topic_pool_entry_hash, topic_pool_entry_cmp);
+       if (!pool->pool_container) {
+               ao2_cleanup(pool);
+               return NULL;
+       }
        ao2_ref(pooled_topic, +1);
        pool->pool_topic = pooled_topic;
 
-       ao2_ref(pool, +1);
        return pool;
 }
 
@@ -881,14 +934,13 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
 {
        RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
        SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
-       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
 
+       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
        if (topic_pool_entry) {
                return topic_pool_entry->topic;
        }
 
        topic_pool_entry = topic_pool_entry_alloc();
-
        if (!topic_pool_entry) {
                return NULL;
        }
@@ -903,7 +955,9 @@ struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool,
                return NULL;
        }
 
-       ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
+       if (!ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK)) {
+               return NULL;
+       }
 
        return topic_pool_entry->topic;
 }