Migrate a large number of AMI events over to Stasis-Core
[asterisk/asterisk.git] / main / stasis_channels.c
index 363ceb7..d3c543a 100644 (file)
@@ -38,14 +38,25 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 
 #define NUM_MULTI_CHANNEL_BLOB_BUCKETS 7
 
-/*! \brief Message type for channel snapshot messages */
-static struct stasis_message_type *channel_snapshot_type;
-
-/*! \brief Message type for channel blob messages */
-static struct stasis_message_type *channel_blob_type;
-
-/*! \brief Message type for channel dial messages */
-static struct stasis_message_type *channel_dial_type;
+/*!
+ * @{ \brief Define channel message types.
+ */
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_snapshot_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_dial_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_varset_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_user_event_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_request_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_begin_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_dtmf_end_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_start_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_chanspy_stop_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_fax_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_hangup_handler_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_start_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_moh_stop_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_start_type);
+STASIS_MESSAGE_TYPE_DEFN(ast_channel_monitor_stop_type);
+/*! @} */
 
 /*! \brief Topic for all channels */
 struct stasis_topic *channel_topic_all;
@@ -53,21 +64,6 @@ struct stasis_topic *channel_topic_all;
 /*! \brief Caching topic for all channels */
 struct stasis_caching_topic *channel_topic_all_cached;
 
-struct stasis_message_type *ast_channel_dial_type(void)
-{
-       return channel_dial_type;
-}
-
-struct stasis_message_type *ast_channel_blob_type(void)
-{
-       return channel_blob_type;
-}
-
-struct stasis_message_type *ast_channel_snapshot_type(void)
-{
-       return channel_snapshot_type;
-}
-
 struct stasis_topic *ast_channel_topic_all(void)
 {
        return channel_topic_all;
@@ -146,6 +142,7 @@ struct ast_channel_snapshot *ast_channel_snapshot_create(struct ast_channel *cha
                S_COR(ast_channel_connected(chan)->id.name.valid, ast_channel_connected(chan)->id.name.str, ""));
        ast_string_field_set(snapshot, connected_number,
                S_COR(ast_channel_connected(chan)->id.number.valid, ast_channel_connected(chan)->id.number.str, ""));
+       ast_string_field_set(snapshot, language, ast_channel_language(chan));
 
        snapshot->creationtime = ast_channel_creationtime(chan);
        snapshot->state = ast_channel_state(chan);
@@ -186,8 +183,7 @@ void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *pe
        struct ast_channel_snapshot *peer_snapshot;
 
        ast_assert(peer != NULL);
-       blob = ast_json_pack("{s: s, s: s, s: s}",
-                            "type", "dial",
+       blob = ast_json_pack("{s: s, s: s}",
                             "dialstatus", S_OR(dialstatus, ""),
                             "dialstring", S_OR(dialstring, ""));
        if (!blob) {
@@ -220,19 +216,16 @@ void ast_channel_publish_dial(struct ast_channel *caller, struct ast_channel *pe
        publish_message_for_channel_topics(msg, caller);
 }
 
-struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
-                                              struct ast_json *blob)
+static struct stasis_message *create_channel_blob_message(struct ast_channel_snapshot *snapshot,
+               struct stasis_message_type *type,
+               struct ast_json *blob)
+
 {
-       RAII_VAR(struct ast_channel_blob *, obj, NULL, ao2_cleanup);
        RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
-       struct ast_json *type;
-
-       ast_assert(blob != NULL);
+       RAII_VAR(struct ast_channel_blob *, obj, NULL, ao2_cleanup);
 
-       type = ast_json_object_get(blob, "type");
-       if (type == NULL) {
-               ast_log(LOG_ERROR, "Invalid ast_channel_blob; missing type field\n");
-               return NULL;
+       if (blob == NULL) {
+               blob = ast_json_null();
        }
 
        obj = ao2_alloc(sizeof(*obj), channel_blob_dtor);
@@ -240,16 +233,13 @@ struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
                return NULL;
        }
 
-       if (chan) {
-               obj->snapshot = ast_channel_snapshot_create(chan);
-               if (obj->snapshot == NULL) {
-                       return NULL;
-               }
+       if (snapshot) {
+               obj->snapshot = snapshot;
+               ao2_ref(obj->snapshot, +1);
        }
-
        obj->blob = ast_json_ref(blob);
 
-       msg = stasis_message_create(ast_channel_blob_type(), obj);
+       msg = stasis_message_create(type, obj);
        if (!msg) {
                return NULL;
        }
@@ -258,13 +248,27 @@ struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
        return msg;
 }
 
-const char *ast_channel_blob_json_type(struct ast_channel_blob *obj)
+struct stasis_message *ast_channel_cached_blob_create(struct ast_channel *chan,
+                                              struct stasis_message_type *type,
+                                              struct ast_json *blob)
 {
-       if (obj == NULL) {
-               return NULL;
+       RAII_VAR(struct ast_channel_snapshot *, snapshot,
+                       ast_channel_snapshot_get_latest(ast_channel_uniqueid(chan)),
+                       ao2_cleanup);
+
+       return create_channel_blob_message(snapshot, type, blob);
+}
+
+struct stasis_message *ast_channel_blob_create(struct ast_channel *chan,
+       struct stasis_message_type *type, struct ast_json *blob)
+{
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+
+       if (chan) {
+               snapshot = ast_channel_snapshot_create(chan);
        }
 
-       return ast_json_string_get(ast_json_object_get(obj->blob, "type"));
+       return create_channel_blob_message(snapshot, type, blob);
 }
 
 /*! \brief A channel snapshot wrapper object used in \ref ast_multi_channel_blob objects */
@@ -319,7 +323,6 @@ struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *bl
        RAII_VAR(struct ast_multi_channel_blob *, obj,
                        ao2_alloc(sizeof(*obj), multi_channel_blob_dtor),
                        ao2_cleanup);
-       struct ast_json *type;
 
        ast_assert(blob != NULL);
 
@@ -327,12 +330,6 @@ struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *bl
                return NULL;
        }
 
-       type = ast_json_object_get(blob, "type");
-       if (type == NULL) {
-               ast_log(LOG_ERROR, "Invalid ast_multi_channel_blob; missing type field\n");
-               return NULL;
-       }
-
        obj->channel_snapshots = ao2_container_alloc(NUM_MULTI_CHANNEL_BLOB_BUCKETS,
                        channel_role_hash_cb, channel_role_single_cmp_cb);
        if (!obj->channel_snapshots) {
@@ -345,6 +342,28 @@ struct ast_multi_channel_blob *ast_multi_channel_blob_create(struct ast_json *bl
        return obj;
 }
 
+struct ast_channel_snapshot *ast_channel_snapshot_get_latest(const char *uniqueid)
+{
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       struct ast_channel_snapshot *snapshot;
+
+       ast_assert(!ast_strlen_zero(uniqueid));
+
+       message = stasis_cache_get(ast_channel_topic_all_cached(),
+                       ast_channel_snapshot_type(),
+                       uniqueid);
+       if (!message) {
+               return NULL;
+       }
+
+       snapshot = stasis_message_data(message);
+       if (!snapshot) {
+               return NULL;
+       }
+       ao2_ref(snapshot, +1);
+       return snapshot;
+}
+
 static void channel_role_snapshot_dtor(void *obj)
 {
        struct channel_role_snapshot *role_snapshot = obj;
@@ -423,13 +442,23 @@ struct ast_json *ast_multi_channel_blob_get_json(struct ast_multi_channel_blob *
        return obj->blob;
 }
 
-const char *ast_multi_channel_blob_get_type(struct ast_multi_channel_blob *obj)
+void ast_channel_publish_snapshot(struct ast_channel *chan)
 {
-       if (!obj) {
-               return NULL;
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+       snapshot = ast_channel_snapshot_create(chan);
+       if (!snapshot) {
+               return;
+       }
+
+       message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
+       if (!message) {
+               return;
        }
 
-       return ast_json_string_get(ast_json_object_get(obj->blob, "type"));
+       ast_assert(ast_channel_topic(chan) != NULL);
+       stasis_publish(ast_channel_topic(chan), message);
 }
 
 void ast_channel_publish_varset(struct ast_channel *chan, const char *name, const char *value)
@@ -440,8 +469,7 @@ void ast_channel_publish_varset(struct ast_channel *chan, const char *name, cons
        ast_assert(name != NULL);
        ast_assert(value != NULL);
 
-       blob = ast_json_pack("{s: s, s: s, s: s}",
-                            "type", "varset",
+       blob = ast_json_pack("{s: s, s: s}",
                             "variable", name,
                             "value", value);
        if (!blob) {
@@ -449,7 +477,8 @@ void ast_channel_publish_varset(struct ast_channel *chan, const char *name, cons
                return;
        }
 
-       msg = ast_channel_blob_create(chan, ast_json_ref(blob));
+       msg = ast_channel_blob_create(chan, ast_channel_varset_type(),
+               ast_json_ref(blob));
 
        if (!msg) {
                return;
@@ -458,24 +487,131 @@ void ast_channel_publish_varset(struct ast_channel *chan, const char *name, cons
        publish_message_for_channel_topics(msg, chan);
 }
 
+void ast_publish_channel_state(struct ast_channel *chan)
+{
+       RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+
+       ast_assert(chan != NULL);
+       if (!chan) {
+               return;
+       }
+
+       snapshot = ast_channel_snapshot_create(chan);
+       if (!snapshot) {
+               return;
+       }
+
+       message = stasis_message_create(ast_channel_snapshot_type(), snapshot);
+       if (!message) {
+               return;
+       }
+
+       ast_assert(ast_channel_topic(chan) != NULL);
+       stasis_publish(ast_channel_topic(chan), message);
+}
+
+struct ast_json *ast_channel_snapshot_to_json(const struct ast_channel_snapshot *snapshot)
+{
+       RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
+
+       if (snapshot == NULL) {
+               return NULL;
+       }
+
+       json_chan = ast_json_pack("{ s: s, s: s, s: s, s: s, s: s, s: s, s: s,"
+                                 "  s: s, s: s, s: s, s: s, s: o, s: o, s: o,"
+                                 "  s: o"
+                                 "}",
+                                 "name", snapshot->name,
+                                 "state", ast_state2str(snapshot->state),
+                                 "accountcode", snapshot->accountcode,
+                                 "peeraccount", snapshot->peeraccount,
+                                 "userfield", snapshot->userfield,
+                                 "uniqueid", snapshot->uniqueid,
+                                 "linkedid", snapshot->linkedid,
+                                 "parkinglot", snapshot->parkinglot,
+                                 "hangupsource", snapshot->hangupsource,
+                                 "appl", snapshot->appl,
+                                 "data", snapshot->data,
+                                 "dialplan", ast_json_dialplan_cep(snapshot->context, snapshot->exten, snapshot->priority),
+                                 "caller", ast_json_name_number(snapshot->caller_name, snapshot->caller_number),
+                                 "connected", ast_json_name_number(snapshot->connected_name, snapshot->connected_number),
+                                 "creationtime", ast_json_timeval(snapshot->creationtime, NULL));
+
+       return ast_json_ref(json_chan);
+}
+
+int ast_channel_snapshot_cep_equal(
+       const struct ast_channel_snapshot *old_snapshot,
+       const struct ast_channel_snapshot *new_snapshot)
+{
+       ast_assert(old_snapshot != NULL);
+       ast_assert(new_snapshot != NULL);
+
+       /* We actually get some snapshots with CEP set, but before the
+        * application is set. Since empty application is invalid, we treat
+        * setting the application from nothing as a CEP change.
+        */
+       if (ast_strlen_zero(old_snapshot->appl) &&
+           !ast_strlen_zero(new_snapshot->appl)) {
+               return 0;
+       }
+
+       return old_snapshot->priority == new_snapshot->priority &&
+               strcmp(old_snapshot->context, new_snapshot->context) == 0 &&
+               strcmp(old_snapshot->exten, new_snapshot->exten) == 0;
+}
+
+int ast_channel_snapshot_caller_id_equal(
+       const struct ast_channel_snapshot *old_snapshot,
+       const struct ast_channel_snapshot *new_snapshot)
+{
+       ast_assert(old_snapshot != NULL);
+       ast_assert(new_snapshot != NULL);
+       return strcmp(old_snapshot->caller_number, new_snapshot->caller_number) == 0 &&
+               strcmp(old_snapshot->caller_name, new_snapshot->caller_name) == 0;
+}
+
 void ast_stasis_channels_shutdown(void)
 {
-       ao2_cleanup(channel_snapshot_type);
-       channel_snapshot_type = NULL;
-       ao2_cleanup(channel_blob_type);
-       channel_blob_type = NULL;
-       ao2_cleanup(channel_dial_type);
-       channel_dial_type = NULL;
+       channel_topic_all_cached = stasis_caching_unsubscribe_and_join(channel_topic_all_cached);
        ao2_cleanup(channel_topic_all);
        channel_topic_all = NULL;
-       channel_topic_all_cached = stasis_caching_unsubscribe(channel_topic_all_cached);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_snapshot_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dial_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_varset_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_user_event_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_request_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_begin_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_dtmf_end_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_start_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_chanspy_stop_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_fax_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_hangup_handler_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_start_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_moh_stop_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_start_type);
+       STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_monitor_stop_type);
 }
 
 void ast_stasis_channels_init(void)
 {
-       channel_snapshot_type = stasis_message_type_create("ast_channel_snapshot");
-       channel_blob_type = stasis_message_type_create("ast_channel_blob");
-       channel_dial_type = stasis_message_type_create("ast_channel_dial");
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_snapshot_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_dial_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_varset_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_user_event_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_request_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_begin_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_dtmf_end_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_start_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_chanspy_stop_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_fax_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_hangup_handler_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_start_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_moh_stop_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_start_type);
+       STASIS_MESSAGE_TYPE_INIT(ast_channel_monitor_stop_type);
        channel_topic_all = stasis_topic_create("ast_channel_topic_all");
        channel_topic_all_cached = stasis_caching_topic_create(channel_topic_all, channel_snapshot_get_id);
 }