stasis: No need to keep a stasis type ref in a stasis msg or cache object.
[asterisk/asterisk.git] / main / stasis_cache.c
index c492307..dfb154d 100644 (file)
@@ -29,8 +29,6 @@
 
 #include "asterisk.h"
 
-ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
-
 #include "asterisk/astobj2.h"
 #include "asterisk/hashtab.h"
 #include "asterisk/stasis_internal.h"
@@ -50,6 +48,7 @@ struct stasis_cache {
        snapshot_get_id id_fn;
        cache_aggregate_calc_fn aggregate_calc_fn;
        cache_aggregate_publish_fn aggregate_publish_fn;
+       int registered;
 };
 
 /*! \internal */
@@ -71,6 +70,8 @@ static void stasis_caching_topic_dtor(void *obj)
         * be bad. */
        ast_assert(stasis_subscription_is_done(caching_topic->sub));
 
+       ao2_container_unregister(stasis_topic_name(caching_topic->topic));
+
        ao2_cleanup(caching_topic->sub);
        caching_topic->sub = NULL;
        ao2_cleanup(caching_topic->cache);
@@ -155,7 +156,6 @@ static void cache_entry_dtor(void *obj)
        struct stasis_cache_entry *entry = obj;
        size_t idx;
 
-       ao2_cleanup(entry->key.type);
        entry->key.type = NULL;
        ast_free((char *) entry->key.id);
        entry->key.id = NULL;
@@ -176,7 +176,7 @@ static void cache_entry_dtor(void *obj)
 
 static void cache_entry_compute_hash(struct cache_entry_key *key)
 {
-       key->hash = ast_hashtab_hash_string(stasis_message_type_name(key->type));
+       key->hash = stasis_message_type_hash(key->type);
        key->hash += ast_hashtab_hash_string(key->id);
 }
 
@@ -203,7 +203,16 @@ static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type
                ao2_cleanup(entry);
                return NULL;
        }
-       entry->key.type = ao2_bump(type);
+       /*
+        * Normal ao2 ref counting rules says we should increment the message
+        * type ref here and decrement it in cache_entry_dtor().  However, the
+        * stasis message snapshot is cached here, will always have the same type
+        * as the cache entry, and can legitimately cause the type ref count to
+        * hit the excessive ref count assertion.  Since the cache entry will
+        * always have a snapshot we can get away with not holding a ref here.
+        */
+       ast_assert(type == stasis_message_type(snapshot));
+       entry->key.type = type;
        cache_entry_compute_hash(&entry->key);
 
        is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
@@ -815,7 +824,31 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
        }
 
        msg_type = stasis_message_type(message);
-       if (stasis_cache_clear_type() == msg_type) {
+
+       if (stasis_subscription_change_type() == msg_type) {
+               struct stasis_subscription_change *change = stasis_message_data(message);
+
+               /*
+                * If this change type is an unsubscribe, we need to find the original
+                * subscribe and remove it from the cache otherwise the cache will
+                * continue to grow unabated.
+                */
+               if (strcmp(change->description, "Unsubscribe") == 0) {
+                       struct stasis_cache_entry *sub;
+
+                       ao2_wrlock(caching_topic->cache->entries);
+                       sub = cache_find(caching_topic->cache->entries, stasis_subscription_change_type(), change->uniqueid);
+                       if (sub) {
+                               cache_remove(caching_topic->cache->entries, sub, stasis_message_eid(message));
+                               ao2_cleanup(sub);
+                       }
+                       ao2_unlock(caching_topic->cache->entries);
+                       ao2_cleanup(caching_topic_needs_unref);
+                       return;
+               }
+               msg_put = message;
+               msg = message;
+       } else if (stasis_cache_clear_type() == msg_type) {
                /* Cache clear event. */
                msg_put = NULL;
                msg = stasis_message_data(message);
@@ -842,7 +875,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
                        }
                        ao2_cleanup(update);
                } else {
-                       ast_log(LOG_ERROR,
+                       ast_debug(1,
                                "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
                                stasis_topic_name(caching_topic->topic),
                                stasis_message_type_name(msg_type), msg_id);
@@ -868,11 +901,21 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
        ao2_cleanup(caching_topic_needs_unref);
 }
 
+static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
+{
+       struct stasis_cache_entry *entry = v_obj;
+
+       if (!entry) {
+               return;
+       }
+       prnt(where, "Type: %s  ID: %s  Hash: %u", stasis_message_type_name(entry->key.type),
+               entry->key.id, entry->key.hash);
+}
+
 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
 {
-       RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
-       struct stasis_subscription *sub;
-       RAII_VAR(char *, new_name, NULL, ast_free);
+       struct stasis_caching_topic *caching_topic;
+       char *new_name;
        int ret;
 
        ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
@@ -883,29 +926,41 @@ struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *or
        caching_topic = ao2_alloc_options(sizeof(*caching_topic),
                stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
        if (caching_topic == NULL) {
+               ast_free(new_name);
+
                return NULL;
        }
 
        caching_topic->topic = stasis_topic_create(new_name);
        if (caching_topic->topic == NULL) {
+               ao2_ref(caching_topic, -1);
+               ast_free(new_name);
+
                return NULL;
        }
 
        ao2_ref(cache, +1);
        caching_topic->cache = cache;
+       if (!cache->registered) {
+               if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
+                       ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
+                               cache->entries, new_name);
+               } else {
+                       cache->registered = 1;
+               }
+       }
+       ast_free(new_name);
+
+       caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
+       if (caching_topic->sub == NULL) {
+               ao2_ref(caching_topic, -1);
 
-       sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0);
-       if (sub == NULL) {
                return NULL;
        }
 
        ao2_ref(original_topic, +1);
        caching_topic->original_topic = original_topic;
 
-       /* This is for the reference contained in the subscription above */
-       ao2_ref(caching_topic, +1);
-       caching_topic->sub = sub;
-
        /* The subscription holds the reference, so no additional ref bump. */
        return caching_topic;
 }
@@ -930,4 +985,3 @@ int stasis_cache_init(void)
 
        return 0;
 }
-