Add message dump capability to stasis cache layer
authorKinsey Moore <kmoore@digium.com>
Fri, 8 Mar 2013 16:00:14 +0000 (16:00 +0000)
committerKinsey Moore <kmoore@digium.com>
Fri, 8 Mar 2013 16:00:14 +0000 (16:00 +0000)
The cache dump mechanism allows the developer to retreive multiple
items of a given type (or of all types) from the cache residing in a
stasis caching topic in addition to the existing single-item cache
retreival mechanism.  This also adds to the caching unit tests to
ensure that the new cache dump mechanism is functioning properly.

Review: https://reviewboard.asterisk.org/r/2367/
(issue ASTERISK-21097)

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@382705 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/stasis.h
main/stasis_cache.c
tests/test_stasis.c

index a6ab421..e6d2409 100644 (file)
@@ -480,6 +480,17 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
                                        struct stasis_message_type *type,
                                        const char *id);
 
                                        struct stasis_message_type *type,
                                        const char *id);
 
+/*!
+ * \brief Dump cached items to a subscription
+ * \param caching_topic The topic returned from stasis_caching_topic_create().
+ * \param type Type of message to dump (any type if NULL).
+ * \return ao2_container containing all matches (must be unreffed by caller)
+ * \return NULL on allocation error
+ * \since 12
+ */
+struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic,
+                                       struct stasis_message_type *type);
+
 /*! @} */
 
 /*! @{ */
 /*! @} */
 
 /*! @{ */
index 2f4cf52..160daa6 100644 (file)
@@ -205,6 +205,39 @@ struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_top
        return cached_entry->snapshot;
 }
 
        return cached_entry->snapshot;
 }
 
+struct cache_dump_data {
+       struct ao2_container *cached;
+       struct stasis_message_type *type;
+};
+
+static int cache_dump_cb(void *obj, void *arg, int flags)
+{
+       struct cache_dump_data *cache_dump = arg;
+       struct cache_entry *entry = obj;
+
+       if (!cache_dump->type || entry->type == cache_dump->type) {
+               ao2_link(cache_dump->cached, entry->snapshot);
+       }
+
+       return 0;
+}
+
+struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
+{
+       struct cache_dump_data cache_dump;
+
+       ast_assert(caching_topic->cache != NULL);
+
+       cache_dump.type = type;
+       cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
+       if (!cache_dump.cached) {
+               return NULL;
+       }
+
+       ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
+       return cache_dump.cached;
+}
+
 static struct stasis_message_type *__cache_clear_data;
 
 static struct stasis_message_type *cache_clear_data(void)
 static struct stasis_message_type *__cache_clear_data;
 
 static struct stasis_message_type *cache_clear_data(void)
index b052641..3682294 100644 (file)
@@ -564,6 +564,7 @@ AST_TEST_DEFINE(cache)
        RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
        int actual_len;
        struct stasis_cache_update *actual_update;
        RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
        int actual_len;
        struct stasis_cache_update *actual_update;
+       struct ao2_container *cache_dump;
 
        switch (cmd) {
        case TEST_INIT:
 
        switch (cmd) {
        case TEST_INIT:
@@ -599,6 +600,12 @@ AST_TEST_DEFINE(cache)
        actual_len = consumer_wait_for(consumer, 2);
        ast_test_validate(test, 2 == actual_len);
 
        actual_len = consumer_wait_for(consumer, 2);
        ast_test_validate(test, 2 == actual_len);
 
+       /* Dump the cache to ensure that it has the correct number of items in it */
+       cache_dump = stasis_cache_dump(caching_topic, NULL);
+       ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+       ao2_ref(cache_dump, -1);
+       cache_dump = NULL;
+
        /* Check for new snapshot messages */
        ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
        actual_update = stasis_message_data(consumer->messages_rxed[0]);
        /* Check for new snapshot messages */
        ast_test_validate(test, stasis_cache_update() == stasis_message_type(consumer->messages_rxed[0]));
        actual_update = stasis_message_data(consumer->messages_rxed[0]);
@@ -634,6 +641,12 @@ AST_TEST_DEFINE(cache)
        /* stasis_cache_get returned a ref, so unref test_message2_2 */
        ao2_ref(test_message2_2, -1);
 
        /* stasis_cache_get returned a ref, so unref test_message2_2 */
        ao2_ref(test_message2_2, -1);
 
+       /* Dump the cache to ensure that it has the correct number of items in it */
+       cache_dump = stasis_cache_dump(caching_topic, NULL);
+       ast_test_validate(test, 2 == ao2_container_count(cache_dump));
+       ao2_ref(cache_dump, -1);
+       cache_dump = NULL;
+
        /* Clear snapshot 1 */
        test_message1_clear = stasis_cache_clear_create(cache_type, "1");
        ast_test_validate(test, NULL != test_message1_clear);
        /* Clear snapshot 1 */
        test_message1_clear = stasis_cache_clear_create(cache_type, "1");
        ast_test_validate(test, NULL != test_message1_clear);
@@ -648,6 +661,18 @@ AST_TEST_DEFINE(cache)
        ast_test_validate(test, NULL == actual_update->new_snapshot);
        ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
 
        ast_test_validate(test, NULL == actual_update->new_snapshot);
        ast_test_validate(test, NULL == stasis_cache_get(caching_topic, cache_type, "1"));
 
+       /* Dump the cache to ensure that it has the correct number of items in it */
+       cache_dump = stasis_cache_dump(caching_topic, NULL);
+       ast_test_validate(test, 1 == ao2_container_count(cache_dump));
+       ao2_ref(cache_dump, -1);
+       cache_dump = NULL;
+
+       /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
+       cache_dump = stasis_cache_dump(caching_topic, stasis_subscription_change());
+       ast_test_validate(test, 0 == ao2_container_count(cache_dump));
+       ao2_ref(cache_dump, -1);
+       cache_dump = NULL;
+
        return AST_TEST_PASS;
 }
 
        return AST_TEST_PASS;
 }