Refactor CEL bridge events on top of Stasis-Core
[asterisk/asterisk.git] / main / cel.c
index 5c4435f..a5e19f1 100644 (file)
@@ -60,6 +60,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_channels.h"
 #include "asterisk/stasis_bridging.h"
 #include "asterisk/bridging.h"
+#include "asterisk/parking.h"
 
 /*** DOCUMENTATION
        <configInfo name="cel" language="en_US">
@@ -96,6 +97,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
                                                <enum name="BRIDGE_START"/>
                                                <enum name="BRIDGE_END"/>
                                                <enum name="BRIDGE_UPDATE"/>
+                                               <enum name="BRIDGE_TO_CONF"/>
                                                <enum name="CONF_START"/>
                                                <enum name="CONF_END"/>
                                                <enum name="PARK_START"/>
@@ -133,6 +135,9 @@ static struct stasis_subscription *cel_channel_forwarder;
 /*! Subscription for forwarding the channel caching topic */
 static struct stasis_subscription *cel_bridge_forwarder;
 
+/*! Subscription for forwarding the parking topic */
+static struct stasis_subscription *cel_parking_forwarder;
+
 /*! Container for primary channel/bridge ID listing for 2 party bridges */
 static struct ao2_container *bridge_primaries;
 
@@ -301,6 +306,7 @@ static const char * const cel_event_types[CEL_MAX_EVENT_IDS] = {
        [AST_CEL_BRIDGE_START]     = "BRIDGE_START",
        [AST_CEL_BRIDGE_END]       = "BRIDGE_END",
        [AST_CEL_BRIDGE_UPDATE]    = "BRIDGE_UPDATE",
+       [AST_CEL_BRIDGE_TO_CONF]   = "BRIDGE_TO_CONF",
        [AST_CEL_CONF_START]       = "CONF_START",
        [AST_CEL_CONF_END]         = "CONF_END",
        [AST_CEL_PARK_START]       = "PARK_START",
@@ -321,36 +327,41 @@ static const char * const cel_event_types[CEL_MAX_EVENT_IDS] = {
 
 struct bridge_assoc {
        AST_DECLARE_STRING_FIELDS(
-               AST_STRING_FIELD(channel_id);   /*!< UniqueID of the primary/dialing channel */
-               AST_STRING_FIELD(bridge_id);    /*!< UniqueID of the bridge */
-               AST_STRING_FIELD(secondary_id); /*!< UniqueID of the secondary/dialed channel */
+               AST_STRING_FIELD(bridge_id);           /*!< UniqueID of the bridge */
+               AST_STRING_FIELD(secondary_name);      /*!< UniqueID of the secondary/dialed channel */
        );
+       struct ast_channel_snapshot *primary_snapshot; /*!< The snapshot for the initiating channel in the bridge */
+       int track_as_conf;                             /*!< Whether this bridge will be treated like a conference in CEL terms */
 };
 
 static void bridge_assoc_dtor(void *obj)
 {
        struct bridge_assoc *assoc = obj;
        ast_string_field_free_memory(assoc);
+       ao2_cleanup(assoc->primary_snapshot);
+       assoc->primary_snapshot = NULL;
 }
 
-static struct bridge_assoc *bridge_assoc_alloc(const char *channel_id, const char *bridge_id, const char *secondary_id)
+static struct bridge_assoc *bridge_assoc_alloc(struct ast_channel_snapshot *primary, const char *bridge_id, const char *secondary_name)
 {
        RAII_VAR(struct bridge_assoc *, assoc, ao2_alloc(sizeof(*assoc), bridge_assoc_dtor), ao2_cleanup);
-       if (!assoc || ast_string_field_init(assoc, 64)) {
+       if (!primary || !assoc || ast_string_field_init(assoc, 64)) {
                return NULL;
        }
 
-       ast_string_field_set(assoc, channel_id, channel_id);
        ast_string_field_set(assoc, bridge_id, bridge_id);
-       ast_string_field_set(assoc, secondary_id, secondary_id);
+       ast_string_field_set(assoc, secondary_name, secondary_name);
+
+       assoc->primary_snapshot = primary;
+       ao2_ref(primary, +1);
 
        ao2_ref(assoc, +1);
        return assoc;
 }
 
-static int add_bridge_primary(const char *channel_id, const char *bridge_id, const char *secondary_id)
+static int add_bridge_primary(struct ast_channel_snapshot *primary, const char *bridge_id, const char *secondary_name)
 {
-       RAII_VAR(struct bridge_assoc *, assoc, bridge_assoc_alloc(channel_id, bridge_id, secondary_id), ao2_cleanup);
+       RAII_VAR(struct bridge_assoc *, assoc, bridge_assoc_alloc(primary, bridge_id, secondary_name), ao2_cleanup);
        if (!assoc) {
                return -1;
        }
@@ -370,7 +381,7 @@ static int bridge_assoc_hash(const void *obj, int flags)
        const struct bridge_assoc *assoc = obj;
        const char *uniqueid = obj;
        if (!(flags & OBJ_KEY)) {
-               uniqueid = assoc->channel_id;
+               uniqueid = assoc->primary_snapshot->uniqueid;
        }
 
        return ast_str_hash(uniqueid);
@@ -380,9 +391,9 @@ static int bridge_assoc_hash(const void *obj, int flags)
 static int bridge_assoc_cmp(void *obj, void *arg, int flags)
 {
        struct bridge_assoc *assoc1 = obj, *assoc2 = arg;
-       const char *assoc2_id = arg, *assoc1_id = assoc1->channel_id;
+       const char *assoc2_id = arg, *assoc1_id = assoc1->primary_snapshot->uniqueid;
        if (!(flags & OBJ_KEY)) {
-               assoc2_id = assoc2->channel_id;
+               assoc2_id = assoc2->primary_snapshot->uniqueid;
        }
 
        return !strcmp(assoc1_id, assoc2_id) ? CMP_MATCH | CMP_STOP : 0;
@@ -645,12 +656,12 @@ static int cel_track_app(const char *const_app)
 
 static int report_event_snapshot(struct ast_channel_snapshot *snapshot,
                enum ast_cel_event_type event_type, const char *userdefevname,
-               const char *extra)
+               const char *extra, const char *peer2_name)
 {
        struct timeval eventtime;
        struct ast_event *ev;
        char *linkedid = ast_strdupa(snapshot->linkedid);
-       char *peer_name = "";
+       const char *peer_name = peer2_name;
        RAII_VAR(struct bridge_assoc *, assoc, NULL, ao2_cleanup);
        RAII_VAR(struct cel_config *, cfg, ao2_global_obj_ref(cel_configs), ao2_cleanup);
 
@@ -662,12 +673,10 @@ static int report_event_snapshot(struct ast_channel_snapshot *snapshot,
                return 0;
        }
 
-       assoc = ao2_find(bridge_primaries, snapshot->uniqueid, OBJ_KEY);
-       if (assoc) {
-               RAII_VAR(struct ast_channel_snapshot *, bridged_snapshot, NULL, ao2_cleanup);
-               bridged_snapshot = ast_channel_snapshot_get_latest(assoc->secondary_id);
-               if (bridged_snapshot) {
-                       peer_name = ast_strdupa(bridged_snapshot->name);
+       if (ast_strlen_zero(peer_name)) {
+               assoc = ao2_find(bridge_primaries, snapshot->uniqueid, OBJ_KEY);
+               if (assoc) {
+                       peer_name = assoc->secondary_name;
                }
        }
 
@@ -712,7 +721,7 @@ static int report_event_snapshot(struct ast_channel_snapshot *snapshot,
                AST_EVENT_IE_CEL_LINKEDID, AST_EVENT_IE_PLTYPE_STR, snapshot->linkedid,
                AST_EVENT_IE_CEL_USERFIELD, AST_EVENT_IE_PLTYPE_STR, snapshot->userfield,
                AST_EVENT_IE_CEL_EXTRA, AST_EVENT_IE_PLTYPE_STR, S_OR(extra, ""),
-               AST_EVENT_IE_CEL_PEER, AST_EVENT_IE_PLTYPE_STR, peer_name,
+               AST_EVENT_IE_CEL_PEER, AST_EVENT_IE_PLTYPE_STR, S_OR(peer_name, ""),
                AST_EVENT_IE_END);
 
        if (ev && ast_event_queue(ev)) {
@@ -744,7 +753,7 @@ static void check_retire_linkedid(struct ast_channel_snapshot *snapshot)
         * before unreffing the channel we have a refcount of 3, we're done. Unlink and report. */
        if (ao2_ref(lid, -1) == 3) {
                ast_str_container_remove(linkedids, lid);
-               report_event_snapshot(snapshot, AST_CEL_LINKEDID_END, NULL, NULL);
+               report_event_snapshot(snapshot, AST_CEL_LINKEDID_END, NULL, NULL, NULL);
        }
        ao2_ref(lid, -1);
 }
@@ -1083,13 +1092,13 @@ static void cel_channel_state_change(
        int is_hungup, was_hungup;
 
        if (!new_snapshot) {
-               report_event_snapshot(old_snapshot, AST_CEL_CHANNEL_END, NULL, NULL);
+               report_event_snapshot(old_snapshot, AST_CEL_CHANNEL_END, NULL, NULL, NULL);
                check_retire_linkedid(old_snapshot);
                return;
        }
 
        if (!old_snapshot) {
-               report_event_snapshot(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL);
+               report_event_snapshot(new_snapshot, AST_CEL_CHANNEL_START, NULL, NULL, NULL);
                return;
        }
 
@@ -1107,12 +1116,12 @@ static void cel_channel_state_change(
                        new_snapshot->hangupcause,
                        new_snapshot->hangupsource,
                        dialstatus);
-               report_event_snapshot(new_snapshot, AST_CEL_HANGUP, NULL, ast_str_buffer(extra_str));
+               report_event_snapshot(new_snapshot, AST_CEL_HANGUP, NULL, ast_str_buffer(extra_str), NULL);
                return;
        }
 
        if (old_snapshot->state != new_snapshot->state && new_snapshot->state == AST_STATE_UP) {
-               report_event_snapshot(new_snapshot, AST_CEL_ANSWER, NULL, NULL);
+               report_event_snapshot(new_snapshot, AST_CEL_ANSWER, NULL, NULL, NULL);
                return;
        }
 }
@@ -1141,12 +1150,12 @@ static void cel_channel_app_change(
 
        /* old snapshot has an application, end it */
        if (old_snapshot && !ast_strlen_zero(old_snapshot->appl)) {
-               report_event_snapshot(old_snapshot, AST_CEL_APP_END, NULL, NULL);
+               report_event_snapshot(old_snapshot, AST_CEL_APP_END, NULL, NULL, NULL);
        }
 
        /* new snapshot has an application, start it */
        if (new_snapshot && !ast_strlen_zero(new_snapshot->appl)) {
-               report_event_snapshot(new_snapshot, AST_CEL_APP_START, NULL, NULL);
+               report_event_snapshot(new_snapshot, AST_CEL_APP_START, NULL, NULL, NULL);
        }
 }
 
@@ -1156,6 +1165,47 @@ cel_channel_snapshot_monitor cel_channel_monitors[] = {
        cel_channel_linkedid_change,
 };
 
+static void update_bridge_primary(struct ast_channel_snapshot *snapshot)
+{
+       RAII_VAR(struct bridge_assoc *, assoc, NULL, ao2_cleanup);
+
+       if (!snapshot) {
+               return;
+       }
+
+       assoc = ao2_find(bridge_primaries, snapshot->uniqueid, OBJ_KEY);
+       if (!assoc) {
+               return;
+       }
+
+       ao2_cleanup(assoc->primary_snapshot);
+       ao2_ref(snapshot, +1);
+       assoc->primary_snapshot = snapshot;
+}
+
+static int bridge_match_cb(void *obj, void *arg, int flags)
+{
+       struct bridge_assoc *assoc = obj;
+       char *bridge_id = arg;
+       ast_assert(flags & OBJ_KEY);
+       if (!strcmp(bridge_id, assoc->bridge_id)) {
+               return CMP_MATCH;
+       }
+       return 0;
+}
+
+static struct bridge_assoc *find_bridge_primary_by_bridge_id(const char *bridge_id)
+{
+       char *dup_id = ast_strdupa(bridge_id);
+       return ao2_callback(bridge_primaries, OBJ_KEY, bridge_match_cb, dup_id);
+}
+
+static void clear_bridge_primary(const char *bridge_id)
+{
+       char *dup_id = ast_strdupa(bridge_id);
+       ao2_callback(bridge_primaries, OBJ_KEY | OBJ_NODATA | OBJ_UNLINK | OBJ_MULTIPLE, bridge_match_cb, dup_id);
+}
+
 static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
        struct stasis_topic *topic,
        struct stasis_message *message)
@@ -1169,9 +1219,77 @@ static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
                old_snapshot = stasis_message_data(update->old_snapshot);
                new_snapshot = stasis_message_data(update->new_snapshot);
 
+               update_bridge_primary(new_snapshot);
+
                for (i = 0; i < ARRAY_LEN(cel_channel_monitors); ++i) {
                        cel_channel_monitors[i](old_snapshot, new_snapshot);
                }
+       } else if (ast_bridge_snapshot_type() == update->type) {
+               RAII_VAR(struct bridge_assoc *, assoc, NULL, ao2_cleanup);
+               struct ast_bridge_snapshot *old_snapshot;
+               struct ast_bridge_snapshot *new_snapshot;
+
+               update = stasis_message_data(message);
+
+               old_snapshot = stasis_message_data(update->old_snapshot);
+               new_snapshot = stasis_message_data(update->new_snapshot);
+
+               if (!old_snapshot) {
+                       return;
+               }
+
+               if (!new_snapshot) {
+                       clear_bridge_primary(old_snapshot->uniqueid);
+                       return;
+               }
+
+               if (old_snapshot->capabilities == new_snapshot->capabilities) {
+                       return;
+               }
+
+               /* handle 1:1/native -> multimix */
+               if ((old_snapshot->capabilities & (AST_BRIDGE_CAPABILITY_1TO1MIX | AST_BRIDGE_CAPABILITY_NATIVE))
+                       && (new_snapshot->capabilities & AST_BRIDGE_CAPABILITY_MULTIMIX)) {
+                       assoc = find_bridge_primary_by_bridge_id(new_snapshot->uniqueid);
+
+                       /* this bridge will no longer be treated like a bridge, so mark the bridge_assoc as such */
+                       assoc->track_as_conf = 1;
+                       report_event_snapshot(assoc->primary_snapshot, AST_CEL_BRIDGE_TO_CONF, NULL, NULL, assoc->secondary_name);
+                       return;
+               }
+
+               /* handle multimix -> 1:1/native */
+               if ((old_snapshot->capabilities & AST_BRIDGE_CAPABILITY_MULTIMIX)
+                       && (new_snapshot->capabilities & (AST_BRIDGE_CAPABILITY_1TO1MIX | AST_BRIDGE_CAPABILITY_NATIVE))) {
+                       struct ao2_iterator i;
+                       RAII_VAR(char *, channel_id, NULL, ao2_cleanup);
+                       RAII_VAR(struct ast_channel_snapshot *, chan_snapshot, NULL, ao2_cleanup);
+
+                       assoc = find_bridge_primary_by_bridge_id(new_snapshot->uniqueid);
+                       if (assoc) {
+                               assoc->track_as_conf = 1;
+                               return;
+                       }
+
+                       /* get the first item in the container */
+                       i = ao2_iterator_init(new_snapshot->channels, 0);
+                       while ((channel_id = ao2_iterator_next(&i))) {
+                               break;
+                       }
+                       ao2_iterator_destroy(&i);
+
+                       /* create a bridge_assoc for this bridge and mark it as being tracked appropriately */
+                       chan_snapshot = ast_channel_snapshot_get_latest(channel_id);
+                       ast_assert(chan_snapshot != NULL);
+                       assoc = bridge_assoc_alloc(chan_snapshot, new_snapshot->uniqueid, chan_snapshot->name);
+                       if (!assoc) {
+                               return;
+                       }
+                       assoc->track_as_conf = 1;
+
+                       ao2_link(bridge_primaries, assoc);
+                       return;
+               }
        }
 }
 
@@ -1185,9 +1303,16 @@ static void cel_bridge_enter_cb(
        struct ast_channel_snapshot *chan_snapshot = blob->channel;
 
        if (snapshot->capabilities & (AST_BRIDGE_CAPABILITY_1TO1MIX | AST_BRIDGE_CAPABILITY_NATIVE)) {
+               RAII_VAR(struct bridge_assoc *, assoc, find_bridge_primary_by_bridge_id(snapshot->uniqueid), ao2_cleanup);
+               if (assoc && assoc->track_as_conf) {
+                       report_event_snapshot(chan_snapshot, AST_CEL_CONF_ENTER, NULL, NULL, NULL);
+                       return;
+               }
+
                if (ao2_container_count(snapshot->channels) == 2) {
                        struct ao2_iterator i;
                        RAII_VAR(char *, channel_id, NULL, ao2_cleanup);
+                       RAII_VAR(struct ast_channel_snapshot *, latest_primary, NULL, ao2_cleanup);
 
                        /* get the name of the channel in the container we don't already know the name of */
                        i = ao2_iterator_init(snapshot->channels, 0);
@@ -1200,8 +1325,12 @@ static void cel_bridge_enter_cb(
                        }
                        ao2_iterator_destroy(&i);
 
-                       add_bridge_primary(channel_id, snapshot->uniqueid, chan_snapshot->uniqueid);
+                       latest_primary = ast_channel_snapshot_get_latest(channel_id);
+                       add_bridge_primary(latest_primary, snapshot->uniqueid, chan_snapshot->name);
+                       report_event_snapshot(latest_primary, AST_CEL_BRIDGE_START, NULL, NULL, chan_snapshot->name);
                }
+       } else if (snapshot->capabilities & AST_BRIDGE_CAPABILITY_MULTIMIX) {
+               report_event_snapshot(chan_snapshot, AST_CEL_CONF_ENTER, NULL, NULL, NULL);
        }
 }
 
@@ -1214,29 +1343,55 @@ static void cel_bridge_leave_cb(
        struct ast_bridge_snapshot *snapshot = blob->bridge;
        struct ast_channel_snapshot *chan_snapshot = blob->channel;
 
-       if ((snapshot->capabilities | AST_BRIDGE_CAPABILITY_1TO1MIX)
-               || (snapshot->capabilities | AST_BRIDGE_CAPABILITY_NATIVE)) {
-               if (ao2_container_count(snapshot->channels) == 1) {
-                       RAII_VAR(struct bridge_assoc *, ao2_primary, ao2_find(bridge_primaries, chan_snapshot->uniqueid, OBJ_KEY), ao2_cleanup);
-                       RAII_VAR(char *, channel_id_in_bridge, NULL, ao2_cleanup);
-                       const char *primary;
-                       struct ao2_iterator i;
+       if (snapshot->capabilities & (AST_BRIDGE_CAPABILITY_1TO1MIX | AST_BRIDGE_CAPABILITY_NATIVE)) {
+               RAII_VAR(struct bridge_assoc *, assoc,
+                       find_bridge_primary_by_bridge_id(snapshot->uniqueid),
+                       ao2_cleanup);
 
-                       /* get the only item in the container */
-                       i = ao2_iterator_init(snapshot->channels, 0);
-                       while ((channel_id_in_bridge = ao2_iterator_next(&i))) {
-                               break;
-                       }
-                       ao2_iterator_destroy(&i);
+               if (!assoc) {
+                       return;
+               }
 
-                       if (ao2_primary) {
-                               primary = chan_snapshot->uniqueid;
-                       } else {
-                               primary = channel_id_in_bridge;
-                       }
+               if (assoc->track_as_conf) {
+                       report_event_snapshot(chan_snapshot, AST_CEL_CONF_EXIT, NULL, NULL, NULL);
+                       return;
+               }
 
-                       remove_bridge_primary(primary);
+               if (ao2_container_count(snapshot->channels) == 1) {
+                       report_event_snapshot(assoc->primary_snapshot, AST_CEL_BRIDGE_END, NULL, NULL, assoc->secondary_name);
+                       remove_bridge_primary(assoc->primary_snapshot->uniqueid);
+                       return;
                }
+       } else if (snapshot->capabilities & AST_BRIDGE_CAPABILITY_MULTIMIX) {
+               report_event_snapshot(chan_snapshot, AST_CEL_CONF_EXIT, NULL, NULL, NULL);
+       }
+}
+
+static void cel_parking_cb(
+       void *data, struct stasis_subscription *sub,
+       struct stasis_topic *topic,
+       struct stasis_message *message)
+{
+       struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
+
+       switch (parked_payload->event_type) {
+       case PARKED_CALL:
+               report_event_snapshot(parked_payload->parkee, AST_CEL_PARK_START, NULL,
+                       parked_payload->parkinglot,
+                       S_COR(parked_payload->parker, parked_payload->parker->name, NULL));
+               break;
+       case PARKED_CALL_TIMEOUT:
+               report_event_snapshot(parked_payload->parkee, AST_CEL_PARK_END, NULL, "ParkedCallTimeOut", NULL);
+               break;
+       case PARKED_CALL_GIVEUP:
+               report_event_snapshot(parked_payload->parkee, AST_CEL_PARK_END, NULL, "ParkedCallGiveUp", NULL);
+               break;
+       case PARKED_CALL_UNPARKED:
+               report_event_snapshot(parked_payload->parkee, AST_CEL_PARK_END, NULL, "ParkedCallUnparked", NULL);
+               break;
+       case PARKED_CALL_FAILED:
+               report_event_snapshot(parked_payload->parkee, AST_CEL_PARK_END, NULL, "ParkedCallFailed", NULL);
+               break;
        }
 }
 
@@ -1270,6 +1425,7 @@ static void ast_cel_engine_term(void)
        cel_state_topic = NULL;
        cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
        cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
+       cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
        ao2_cleanup(linkedids);
        linkedids = NULL;
        ast_cli_unregister(&cli_status);
@@ -1305,6 +1461,11 @@ int ast_cel_engine_init(void)
                return -1;
        }
 
+       bridge_primaries = ao2_container_alloc(BRIDGE_PRIMARY_BUCKETS, bridge_assoc_hash, bridge_assoc_cmp);
+       if (!bridge_primaries) {
+               return -1;
+       }
+
        cel_state_topic = stasis_topic_create("cel_state_topic");
        if (!cel_state_topic) {
                return -1;
@@ -1324,6 +1485,13 @@ int ast_cel_engine_init(void)
                return -1;
        }
 
+       cel_parking_forwarder = stasis_forward_all(
+               ast_parking_topic(),
+               cel_state_topic);
+       if (!cel_parking_forwarder) {
+               return -1;
+       }
+
        cel_state_router = stasis_message_router_create(cel_state_topic);
        if (!cel_state_router) {
                return -1;
@@ -1339,19 +1507,6 @@ int ast_cel_engine_init(void)
                cel_dial_cb,
                NULL);
 
-       /* If somehow we failed to add any routes, just shut down the whole
-        * thing and fail it.
-        */
-       if (ret) {
-               ast_cel_engine_term();
-               return -1;
-       }
-
-       bridge_primaries = ao2_container_alloc(BRIDGE_PRIMARY_BUCKETS, bridge_assoc_hash, bridge_assoc_cmp);
-       if (!bridge_primaries) {
-               return -1;
-       }
-
        ret |= stasis_message_router_add(cel_state_router,
                ast_channel_entered_bridge_type(),
                cel_bridge_enter_cb,
@@ -1362,6 +1517,11 @@ int ast_cel_engine_init(void)
                cel_bridge_leave_cb,
                NULL);
 
+       ret |= stasis_message_router_add(cel_state_router,
+               ast_parked_call_type(),
+               cel_parking_cb,
+               NULL);
+
        /* If somehow we failed to add any routes, just shut down the whole
         * thing and fail it.
         */
@@ -1370,7 +1530,7 @@ int ast_cel_engine_init(void)
                return -1;
        }
 
-       ast_register_atexit(ast_cel_engine_term);
+       ast_register_cleanup(ast_cel_engine_term);
 
        return 0;
 }