stasis_state: Create internal stasis_state_proxy object.
authorCorey Farrell <git@cfware.com>
Fri, 20 Sep 2019 14:08:02 +0000 (10:08 -0400)
committerCorey Farrell <git@cfware.com>
Tue, 24 Sep 2019 19:33:33 +0000 (14:33 -0500)
This improves the way which stasis_state reference counting works.
Since manager->states holds onto the proxy object instead of the real
object this allows stasis_state objects to be freed when appropriate
without use of a special state_remove function.  Additionally each
distinct eid associated with the state holds a reference to the state to
prevent early release and potentially allow easier debug of leaks.

Change-Id: I400e0db4b9afa3d5cb4ac7dad60907897e73f9a9

main/stasis_state.c

index aa00f9a..b85462f 100644 (file)
 
 /*!
  * \internal
+ * \brief Used to link a stasis_state to it's manager
+ */
+struct stasis_state_proxy {
+       AO2_WEAKPROXY();
+       /*! The manager that owns and handles this state */
+       struct stasis_state_manager *manager;
+       /*! A unique id for this state object. */
+       char id[0];
+};
+
+/*!
+ * \internal
  * \brief Associates a stasis topic to its last known published message
  *
  * This object's lifetime is tracked by the number of publishers and subscribers to it.
 struct stasis_state {
        /*! The number of state subscribers */
        unsigned int num_subscribers;
-       /*! The manager that owns and handles this state */
+       /*!
+        * \brief The manager that owns and handles this state
+        * \note This reference is owned by stasis_state_proxy
+        */
        struct stasis_state_manager *manager;
        /*! Forwarding information, i.e. this topic to manager's topic */
        struct stasis_forward *forward;
@@ -52,11 +67,11 @@ struct stasis_state {
         */
        AST_VECTOR(, struct ast_eid) eids;
        /*! A unique id for this state object. */
-       char id[0];
+       char *id;
 };
 
-AO2_STRING_FIELD_HASH_FN(stasis_state, id);
-AO2_STRING_FIELD_CMP_FN(stasis_state, id);
+AO2_STRING_FIELD_HASH_FN(stasis_state_proxy, id);
+AO2_STRING_FIELD_CMP_FN(stasis_state_proxy, id);
 
 /*! The number of buckets to use for managed states */
 #define STATE_BUCKETS 57
@@ -112,17 +127,28 @@ static void state_dtor(void *obj)
        state->topic = NULL;
        ao2_cleanup(state->msg);
        state->msg = NULL;
-       ao2_cleanup(state->manager);
-       state->manager = NULL;
 
        /* All eids should have been removed */
        ast_assert(AST_VECTOR_SIZE(&state->eids) == 0);
        AST_VECTOR_FREE(&state->eids);
 }
 
+static void state_proxy_dtor(void *obj) {
+       struct stasis_state_proxy *proxy = obj;
+
+       ao2_cleanup(proxy->manager);
+}
+
+static void state_proxy_sub_cb(void *obj, void *data)
+{
+       struct stasis_state_proxy *proxy = obj;
+
+       ao2_unlink(proxy->manager->states, proxy);
+}
+
 /*!
  * \internal
- * \brief Allocate a stasis state object.
+ * \brief Allocate a stasis state object and add it to the manager.
  *
  * Create and initialize a state structure. It's required that either a state
  * topic, or an id is specified. If a state topic is not given then one will be
@@ -134,37 +160,48 @@ static void state_dtor(void *obj)
  *
  * \return A stasis_state object or NULL
  * \return NULL on error
+ *
+ * \pre manager->states must be locked.
+ * \pre manager->states does not contain an object matching key \a id.
  */
 static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
-       struct stasis_topic *state_topic, const char *id)
+       struct stasis_topic *state_topic, const char *id,
+       const char *file, int line, const char *func)
 {
-       struct stasis_state *state;
+       struct stasis_state_proxy *proxy = NULL;
+       struct stasis_state *state = NULL;
+
+       if (!id) {
+               /* If not given an id, then a state topic is required */
+               ast_assert(state_topic != NULL);
+
+               /* Get the id we'll key off of from the state topic */
+               id = state_id_by_topic(manager->all_topic, state_topic);
+       }
+
+       state = __ao2_alloc(sizeof(*state), state_dtor, AO2_ALLOC_OPT_LOCK_MUTEX, id, file, line, func);
+       if (!state) {
+               goto error_return;
+       }
 
        if (!state_topic) {
                char *name;
 
-               /* If not given a state topic, then an id is required */
-               ast_assert(id != NULL);
-
                /*
                 * To provide further detail and to ensure that the topic is unique within the
                 * scope of the system we prefix it with the manager's topic name, which should
                 * itself already be unique.
                 */
                if (ast_asprintf(&name, "%s/%s", stasis_topic_name(manager->all_topic), id) < 0) {
-                       ast_log(LOG_ERROR, "Unable to create state topic name '%s/%s'\n",
-                                       stasis_topic_name(manager->all_topic), id);
-                       return NULL;
+                       goto error_return;
                }
 
-               state_topic = stasis_topic_create(name);
+               state->topic = stasis_topic_create(name);
 
-               if (!state_topic) {
-                       ast_log(LOG_ERROR, "Unable to create state topic '%s'\n", name);
-                       ast_free(name);
-                       return NULL;
-               }
                ast_free(name);
+               if (!state->topic) {
+                       goto error_return;
+               }
        } else {
                /*
                 * Since the state topic was passed in, go ahead and bump its reference.
@@ -172,87 +209,57 @@ static struct stasis_state *state_alloc(struct stasis_state_manager *manager,
                 * state allocation error.
                 */
                ao2_ref(state_topic, +1);
+               state->topic = state_topic;
        }
 
-       if (!id) {
-               /* If not given an id, then a state topic is required */
-               ast_assert(state_topic != NULL);
-
-               /* Get the id we'll key off of from the state topic */
-               id = state_id_by_topic(manager->all_topic, state_topic);
+       proxy = ao2_t_weakproxy_alloc(sizeof(*proxy) + strlen(id) + 1, state_proxy_dtor, id);
+       if (!proxy) {
+               goto error_return;
        }
 
-       state = ao2_alloc(sizeof(*state) + strlen(id) + 1, state_dtor);
-       if (!state) {
-               ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
-                               id, stasis_topic_name(manager->all_topic));
-               ao2_ref(state_topic, -1);
-               return NULL;
-       }
+       strcpy(proxy->id, id); /* Safe */
 
-       strcpy(state->id, id); /* Safe */
-       state->topic = state_topic; /* ref already bumped above */
+       state->id = proxy->id;
+       proxy->manager = ao2_bump(manager);
+       state->manager = proxy->manager; /* state->manager is owned by the proxy */
 
        state->forward = stasis_forward_all(state->topic, manager->all_topic);
        if (!state->forward) {
-               ast_log(LOG_ERROR, "Unable to add state '%s' forward in manager '%s'\n",
-                               id, stasis_topic_name(manager->all_topic));
-               ao2_ref(state, -1);
-               return NULL;
+               goto error_return;
        }
 
        if (AST_VECTOR_INIT(&state->eids, 2)) {
-               ast_log(LOG_ERROR, "Unable to initialize eids for state '%s' in manager '%s'\n",
-                               id, stasis_topic_name(manager->all_topic));
-               ao2_ref(state, -1);
-               return NULL;
+               goto error_return;
        }
 
-       state->manager = ao2_bump(manager);
-
-       return state;
-}
-
-/*!
- * \internal
- * \brief Create a state object, and add it to the manager.
- *
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
- * \param manager The manager to be added to
- * \param state_topic A state topic to be managed (if NULL id is required)
- * \param id The unique id for the state (if NULL state_topic is required)
- *
- * \return The added state object
- * \return NULL on error
- */
-static struct stasis_state *state_add(struct stasis_state_manager *manager,
-       struct stasis_topic *state_topic, const char *id)
-{
-       struct stasis_state *state = state_alloc(manager, state_topic, id);
+       if (ao2_t_weakproxy_set_object(proxy, state, OBJ_NOLOCK, "weakproxy link")) {
+               goto error_return;
+       }
 
-       if (!state) {
-               return NULL;
+       if (ao2_weakproxy_subscribe(proxy, state_proxy_sub_cb, NULL, OBJ_NOLOCK)) {
+               goto error_return;
        }
 
-       if (!ao2_link_flags(manager->states, state, OBJ_NOLOCK)) {
-               ast_log(LOG_ERROR, "Unable to add state '%s' to manager '%s'\n",
-                               state->id ? state->id : "", stasis_topic_name(manager->all_topic));
-               ao2_ref(state, -1);
-               return NULL;
+       if (!ao2_link_flags(manager->states, proxy, OBJ_NOLOCK)) {
+               goto error_return;
        }
 
+       ao2_ref(proxy, -1);
+
        return state;
+
+error_return:
+       ast_log(LOG_ERROR, "Unable to allocate state '%s' in manager '%s'\n",
+                       id, stasis_topic_name(manager->all_topic));
+       ao2_cleanup(state);
+       ao2_cleanup(proxy);
+       return NULL;
 }
 
 /*!
  * \internal
  * \brief Find a state by id, or create one if not found and add it to the manager.
  *
- * \note Locking on the states container is specifically not done here, thus
- * appropriate locks should be applied prior to this function being called.
- *
  * \param manager The manager to be added to
  * \param state_topic A state topic to be managed (if NULL id is required)
  * \param id The unique id for the state (if NULL state_topic is required)
@@ -260,18 +267,26 @@ static struct stasis_state *state_add(struct stasis_state_manager *manager,
  * \return The added state object
  * \return NULL on error
  */
-static struct stasis_state *state_find_or_add(struct stasis_state_manager *manager,
-       struct stasis_topic *state_topic, const char *id)
+#define state_find_or_add(mgr, top, id) __state_find_or_add(mgr, top, id, __FILE__, __LINE__, __PRETTY_FUNCTION__)
+static struct stasis_state *__state_find_or_add(struct stasis_state_manager *manager,
+       struct stasis_topic *state_topic, const char *id,
+       const char *file, int line, const char *func)
 {
        struct stasis_state *state;
 
+       ao2_lock(manager->states);
        if (ast_strlen_zero(id)) {
                id = state_id_by_topic(manager->all_topic, state_topic);
        }
 
-       state = ao2_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY | OBJ_NOLOCK, "");
+       if (!state) {
+               state = state_alloc(manager, state_topic, id, file, line, func);
+       }
+
+       ao2_unlock(manager->states);
 
-       return state ? state : state_add(manager, state_topic, id);
+       return state;
 }
 
 static void state_manager_dtor(void *obj)
@@ -317,7 +332,7 @@ struct stasis_state_manager *stasis_state_manager_create(const char *topic_name)
        }
 
        manager->states = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
-               STATE_BUCKETS, stasis_state_hash_fn, NULL, stasis_state_cmp_fn);
+               STATE_BUCKETS, stasis_state_proxy_hash_fn, NULL, stasis_state_proxy_cmp_fn);
        if (!manager->states) {
                ao2_ref(manager, -1);
                return NULL;
@@ -356,10 +371,7 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
        struct stasis_topic *topic;
        struct stasis_state *state;
 
-       ao2_lock(manager->states);
        state = state_find_or_add(manager, NULL, id);
-       ao2_unlock(manager->states);
-
        if (!state) {
                return NULL;
        }
@@ -369,53 +381,6 @@ struct stasis_topic *stasis_state_topic(struct stasis_state_manager *manager, co
        return topic;
 }
 
-/*!
- * \internal
- * \brief Remove a state from the stasis manager
- *
- * State should only be removed from the manager under the following conditions:
- *
- *   There are no more subscribers to it
- *   There are no more explicit publishers publishing to it
- *   There are no more implicit publishers publishing to it
- *
- * Subscribers and explicit publishers hold a reference to the state object itself, so
- * once a state's reference count drops to 2 (1 for the manager, 1 passed in) then we
- * know there are no more subscribers or explicit publishers. Implicit publishers are
- * tracked by eids, so once that container is empty no more implicit publishers exist
- * for the state either. Only then can a state be removed.
- *
- * \param state The state to remove
- */
-static void state_remove(struct stasis_state *state)
-{
-       ao2_lock(state);
-
-       /*
-        * The manager's state container also needs to be locked here prior to checking
-        * the state's reference count, and potentially removing since we don't want its
-        * count to possibly increase between the check and unlinking.
-        */
-       ao2_lock(state->manager->states);
-
-       /*
-        * If there are only 2 references left then it's the one owned by the manager,
-        * and the one passed in to this function. However, before removing it from the
-        * manager we need to also check that no eid is associated with the given state.
-        * If an eid still remains then this means that an implicit publisher is still
-        * publishing to this state.
-        */
-       if (ao2_ref(state, 0) == 2 && AST_VECTOR_SIZE(&state->eids) == 0) {
-               ao2_unlink_flags(state->manager->states, state, 0);
-       }
-
-       ao2_unlock(state->manager->states);
-       ao2_unlock(state);
-
-       /* Now it's safe to remove the reference that is held on the given state */
-       ao2_ref(state, -1);
-}
-
 struct stasis_state_subscriber {
        /*! The stasis state subscribed to */
        struct stasis_state *state;
@@ -441,7 +406,7 @@ static void subscriber_dtor(void *obj)
        --sub->state->num_subscribers;
        ao2_unlock(sub->state);
 
-       state_remove(sub->state);
+       ao2_ref(sub->state, -1);
 }
 
 struct stasis_state_subscriber *stasis_state_add_subscriber(
@@ -457,14 +422,11 @@ struct stasis_state_subscriber *stasis_state_add_subscriber(
                return NULL;
        }
 
-       ao2_lock(manager->states);
        sub->state = state_find_or_add(manager, NULL, id);
        if (!sub->state) {
-               ao2_unlock(manager->states);
                ao2_ref(sub, -1);
                return NULL;
        }
-       ao2_unlock(manager->states);
 
        ao2_lock(sub->state);
        ++sub->state->num_subscribers;
@@ -563,7 +525,7 @@ static void publisher_dtor(void *obj)
 {
        struct stasis_state_publisher *pub = obj;
 
-       state_remove(pub->state);
+       ao2_ref(pub->state, -1);
 }
 
 struct stasis_state_publisher *stasis_state_add_publisher(
@@ -578,14 +540,11 @@ struct stasis_state_publisher *stasis_state_add_publisher(
                return NULL;
        }
 
-       ao2_lock(manager->states);
        pub->state = state_find_or_add(manager, NULL, id);
        if (!pub->state) {
-               ao2_unlock(manager->states);
                ao2_ref(pub, -1);
                return NULL;
        }
-       ao2_unlock(manager->states);
 
        return pub;
 }
@@ -639,7 +598,10 @@ static void state_find_or_add_eid(struct stasis_state *state, const struct ast_e
        }
 
        if (i == AST_VECTOR_SIZE(&state->eids)) {
-               AST_VECTOR_APPEND(&state->eids, *eid);
+               if (!AST_VECTOR_APPEND(&state->eids, *eid)) {
+                       /* This ensures state cannot be freed if it has any eids */
+                       ao2_ref(state, +1);
+               }
        }
 }
 
@@ -666,6 +628,8 @@ static void state_find_and_remove_eid(struct stasis_state *state, const struct a
        for (i = 0; i < AST_VECTOR_SIZE(&state->eids); ++i) {
                if (!ast_eid_cmp(AST_VECTOR_GET_ADDR(&state->eids, i), eid)) {
                        AST_VECTOR_REMOVE_UNORDERED(&state->eids, i);
+                       /* Balance the reference from state_find_or_add_eid */
+                       ao2_ref(state, -1);
                        return;
                }
        }
@@ -676,10 +640,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
 {
        struct stasis_state *state;
 
-       ao2_lock(manager->states);
        state = state_find_or_add(manager, NULL, id);
-       ao2_unlock(manager->states);
-
        if (!state) {
                return;
        }
@@ -697,7 +658,7 @@ void stasis_state_publish_by_id(struct stasis_state_manager *manager, const char
 void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
        const char *id, const struct ast_eid *eid, struct stasis_message *msg)
 {
-       struct stasis_state *state = ao2_find(manager->states, id, OBJ_SEARCH_KEY);
+       struct stasis_state *state = ao2_weakproxy_find(manager->states, id, OBJ_SEARCH_KEY, "");
 
        if (!state) {
                /*
@@ -721,7 +682,7 @@ void stasis_state_remove_publish_by_id(struct stasis_state_manager *manager,
        state_find_and_remove_eid(state, eid);
        ao2_unlock(state);
 
-       state_remove(state);
+       ao2_ref(state, -1);
 }
 
 int stasis_state_add_observer(struct stasis_state_manager *manager,
@@ -744,10 +705,8 @@ void stasis_state_remove_observer(struct stasis_state_manager *manager,
        AST_VECTOR_RW_UNLOCK(&manager->observers);
 }
 
-static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
+static int handle_stasis_state(struct stasis_state *state, on_stasis_state handler, void *data)
 {
-       struct stasis_state *state = obj;
-       on_stasis_state handler = arg;
        struct stasis_message *msg;
        int res;
 
@@ -764,24 +723,41 @@ static int handle_stasis_state(void *obj, void *arg, void *data, int flags)
        return res;
 }
 
+static int handle_stasis_state_proxy(void *obj, void *arg, void *data, int flags)
+{
+       struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+
+       if (state) {
+               int res;
+               res = handle_stasis_state(state, arg, data);
+               ao2_ref(state, -1);
+               return res;
+       }
+
+       return 0;
+}
+
 void stasis_state_callback_all(struct stasis_state_manager *manager, on_stasis_state handler,
        void *data)
 {
        ast_assert(handler != NULL);
 
        ao2_callback_data(manager->states, OBJ_MULTIPLE | OBJ_NODATA,
-               handle_stasis_state, handler, data);
+               handle_stasis_state_proxy, handler, data);
 }
 
 static int handle_stasis_state_subscribed(void *obj, void *arg, void *data, int flags)
 {
-       struct stasis_state *state = obj;
+       struct stasis_state *state = ao2_weakproxy_get_object(obj, 0);
+       int res = 0;
 
-       if (state->num_subscribers) {
-               return handle_stasis_state(obj, arg, data, flags);
+       if (state && state->num_subscribers) {
+               res = handle_stasis_state(state, arg, data);
        }
 
-       return 0;
+       ao2_cleanup(state);
+
+       return res;
 }
 
 void stasis_state_callback_subscribed(struct stasis_state_manager *manager, on_stasis_state handler,