Migrate a large number of AMI events over to Stasis-Core
[asterisk/asterisk.git] / main / app.c
index 3001450..9fa501f 100644 (file)
@@ -68,6 +68,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/module.h"
 #include "asterisk/astobj2.h"
 #include "asterisk/stasis.h"
+#include "asterisk/stasis_channels.h"
+#include "asterisk/json.h"
 
 #define MWI_TOPIC_BUCKETS 57
 
@@ -82,11 +84,22 @@ struct zombie {
 
 static AST_LIST_HEAD_STATIC(zombies, zombie);
 
+/*
+ * @{ \brief Define \ref stasis topic objects for MWI
+ */
 static struct stasis_topic *mwi_topic_all;
 static struct stasis_caching_topic *mwi_topic_cached;
 static struct stasis_topic_pool *mwi_topic_pool;
+/* @} */
+
+/*
+ * @{ \brief Define \ref stasis message types for MWI
+ */
+STASIS_MESSAGE_TYPE_DEFN(ast_mwi_state_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_mwi_vm_app_type);
+/* @} */
+
 
-STASIS_MESSAGE_TYPE_DEFN(stasis_mwi_state_type);
 
 static void *shaun_of_the_dead(void *data)
 {
@@ -2657,61 +2670,95 @@ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen uni
 
 static void mwi_state_dtor(void *obj)
 {
-       struct stasis_mwi_state *mwi_state = obj;
+       struct ast_mwi_state *mwi_state = obj;
        ast_string_field_free_memory(mwi_state);
+       ao2_cleanup(mwi_state->snapshot);
+       mwi_state->snapshot = NULL;
 }
 
-struct stasis_topic *stasis_mwi_topic_all(void)
+struct stasis_topic *ast_mwi_topic_all(void)
 {
        return mwi_topic_all;
 }
 
-struct stasis_caching_topic *stasis_mwi_topic_cached(void)
+struct stasis_caching_topic *ast_mwi_topic_cached(void)
 {
        return mwi_topic_cached;
 }
 
-struct stasis_topic *stasis_mwi_topic(const char *uniqueid)
+struct stasis_topic *ast_mwi_topic(const char *uniqueid)
 {
        return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid);
 }
 
-int stasis_publish_mwi_state_full(
+struct ast_mwi_state *ast_mwi_create(const char *mailbox, const char *context)
+{
+       RAII_VAR(struct ast_mwi_state *, mwi_state, NULL, ao2_cleanup);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+
+       ast_assert(!ast_strlen_zero(mailbox));
+       ast_assert(!ast_strlen_zero(context));
+
+       mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
+       if (!mwi_state) {
+               return NULL;
+       }
+
+       if (ast_string_field_init(mwi_state, 256)) {
+               return NULL;
+       }
+       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+       ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid));
+       ast_string_field_set(mwi_state, mailbox, mailbox);
+       ast_string_field_set(mwi_state, context, context);
+
+       ao2_ref(mwi_state, +1);
+       return mwi_state;
+}
+
+
+int ast_publish_mwi_state_full(
                        const char *mailbox,
                        const char *context,
                        int new_msgs,
                        int old_msgs,
+                       const char *channel_id,
                        struct ast_eid *eid)
 {
-       RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup);
+       RAII_VAR(struct ast_mwi_state *, mwi_state, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
        struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
        struct stasis_topic *mailbox_specific_topic;
 
-       ast_assert(!ast_strlen_zero(mailbox));
-       ast_assert(!ast_strlen_zero(context));
-
-       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
-
-       mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
-       if (ast_string_field_init(mwi_state, 256)) {
+       mwi_state = ast_mwi_create(mailbox, context);
+       if (!mwi_state) {
                return -1;
        }
 
-       ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid));
-       ast_string_field_set(mwi_state, mailbox, mailbox);
-       ast_string_field_set(mwi_state, context, context);
        mwi_state->new_msgs = new_msgs;
        mwi_state->old_msgs = old_msgs;
+
+       if (!ast_strlen_zero(channel_id)) {
+               RAII_VAR(struct stasis_message *, chan_message,
+                       stasis_cache_get(ast_channel_topic_all_cached(),
+                                       ast_channel_snapshot_type(),
+                                       channel_id),
+                       ao2_cleanup);
+               if (chan_message) {
+                       mwi_state->snapshot = stasis_message_data(chan_message);
+                       ao2_ref(mwi_state->snapshot, +1);
+               }
+       }
+
        if (eid) {
                mwi_state->eid = *eid;
        } else {
                ast_set_default_eid(&mwi_state->eid);
        }
 
-       message = stasis_message_create(stasis_mwi_state_type(), mwi_state);
+       message = stasis_message_create(ast_mwi_state_type(), mwi_state);
 
-       mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+       mailbox_specific_topic = ast_mwi_topic(ast_str_buffer(uniqueid));
        if (!mailbox_specific_topic) {
                return -1;
        }
@@ -2723,8 +2770,8 @@ int stasis_publish_mwi_state_full(
 
 static const char *mwi_state_get_id(struct stasis_message *message)
 {
-       if (stasis_mwi_state_type() == stasis_message_type(message)) {
-               struct stasis_mwi_state *mwi_state = stasis_message_data(message);
+       if (ast_mwi_state_type() == stasis_message_type(message)) {
+               struct ast_mwi_state *mwi_state = stasis_message_data(message);
                return mwi_state->uniqueid;
        } else if (stasis_subscription_change_type() == stasis_message_type(message)) {
                struct stasis_subscription_change *change = stasis_message_data(message);
@@ -2734,19 +2781,58 @@ static const char *mwi_state_get_id(struct stasis_message *message)
        return NULL;
 }
 
+static void mwi_blob_dtor(void *obj)
+{
+       struct ast_mwi_blob *mwi_blob = obj;
+
+       ao2_cleanup(mwi_blob->mwi_state);
+       ast_json_unref(mwi_blob->blob);
+}
+
+struct stasis_message *ast_mwi_blob_create(struct ast_mwi_state *mwi_state,
+                                              struct stasis_message_type *message_type,
+                                              struct ast_json *blob)
+{
+       RAII_VAR(struct ast_mwi_blob *, obj, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+
+       ast_assert(blob != NULL);
+
+       obj = ao2_alloc(sizeof(*obj), mwi_blob_dtor);
+       if (!obj) {
+               return NULL;
+       }
+
+       obj->mwi_state = mwi_state;
+       ao2_ref(obj->mwi_state, +1);
+       obj->blob = ast_json_ref(blob);
+
+       msg = stasis_message_create(message_type, obj);
+       if (!msg) {
+               return NULL;
+       }
+
+       ao2_ref(msg, +1);
+       return msg;
+}
+
 static void app_exit(void)
 {
        ao2_cleanup(mwi_topic_all);
        mwi_topic_all = NULL;
        mwi_topic_cached = stasis_caching_unsubscribe_and_join(mwi_topic_cached);
-       STASIS_MESSAGE_TYPE_CLEANUP(stasis_mwi_state_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_state_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_mwi_vm_app_type);
        ao2_cleanup(mwi_topic_pool);
        mwi_topic_pool = NULL;
 }
 
 int app_init(void)
 {
-       if (STASIS_MESSAGE_TYPE_INIT(stasis_mwi_state_type) != 0) {
+       if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_state_type) != 0) {
+               return -1;
+       }
+       if (STASIS_MESSAGE_TYPE_INIT(ast_mwi_vm_app_type) != 0) {
                return -1;
        }
        mwi_topic_all = stasis_topic_create("stasis_mwi_topic");