Remove required type field from channel blobs
[asterisk/asterisk.git] / res / res_stasis.c
index 3527ada..3d003e4 100644 (file)
@@ -40,6 +40,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/stasis_channels.h"
 #include "asterisk/strings.h"
 
+/*! Time to wait for a frame in the application */
+#define MAX_WAIT_MS 200
+
 /*!
  * \brief Number of buckets for the Stasis application hash table.  Remember to
  * keep it a prime number!
@@ -147,7 +150,67 @@ static void app_send(struct app *app, struct ast_json *message)
        app->handler(app->data, app->name, message);
 }
 
+typedef void* (*stasis_app_command_cb)(struct stasis_app_control *control,
+                                      struct ast_channel *chan,
+                                      void *data);
+
+struct stasis_app_command {
+       ast_mutex_t lock;
+       ast_cond_t condition;
+       stasis_app_command_cb callback;
+       void *data;
+       void *retval;
+       int is_done:1;
+};
+
+static void command_dtor(void *obj)
+{
+       struct stasis_app_command *command = obj;
+       ast_mutex_destroy(&command->lock);
+       ast_cond_destroy(&command->condition);
+}
+
+static struct stasis_app_command *command_create(stasis_app_command_cb callback,
+                                                void *data)
+{
+       RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
+
+       command = ao2_alloc(sizeof(*command), command_dtor);
+       if (!command) {
+               return NULL;
+       }
+
+       ast_mutex_init(&command->lock);
+       ast_cond_init(&command->condition, 0);
+       command->callback = callback;
+       command->data = data;
+
+       ao2_ref(command, +1);
+       return command;
+}
+
+static void command_complete(struct stasis_app_command *command, void *retval)
+{
+       SCOPED_MUTEX(lock, &command->lock);
+
+       command->is_done = 1;
+       command->retval = retval;
+       ast_cond_signal(&command->condition);
+}
+
+static void *wait_for_command(struct stasis_app_command *command)
+{
+       SCOPED_MUTEX(lock, &command->lock);
+       while (!command->is_done) {
+               ast_cond_wait(&command->condition, &command->lock);
+       }
+
+       return command->retval;
+}
+
 struct stasis_app_control {
+       /*! Queue of commands to dispatch on the channel */
+       struct ao2_container *command_queue;
        /*!
         * When set, /c app_stasis should exit and continue in the dialplan.
         */
@@ -167,11 +230,24 @@ static struct stasis_app_control *control_create(const char *uniqueid)
                return NULL;
        }
 
+       control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL);
+
        strncpy(control->channel_id, uniqueid, size - sizeof(*control));
 
        return control;
 }
 
+static void *exec_command(struct stasis_app_control *control,
+                         struct stasis_app_command *command)
+{
+       ao2_lock(control);
+       ao2_ref(command, +1);
+       ao2_link(control->command_queue, command);
+       ao2_unlock(control);
+
+       return wait_for_command(command);
+}
+
 /*! AO2 hash function for \ref stasis_app_control */
 static int control_hash(const void *obj, const int flags)
 {
@@ -199,13 +275,20 @@ static int control_compare(void *lhs, void *rhs, int flags)
 struct stasis_app_control *stasis_app_control_find_by_channel(
        const struct ast_channel *chan)
 {
-       RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
        if (chan == NULL) {
                return NULL;
        }
 
+       return stasis_app_control_find_by_channel_id(
+               ast_channel_uniqueid(chan));
+}
+
+struct stasis_app_control *stasis_app_control_find_by_channel_id(
+       const char *channel_id)
+{
+       RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
        controls = app_controls();
-       return ao2_find(controls, ast_channel_uniqueid(chan), OBJ_KEY);
+       return ao2_find(controls, channel_id, OBJ_KEY);
 }
 
 /*!
@@ -219,12 +302,12 @@ struct stasis_app_control *stasis_app_control_find_by_channel(
  */
 static int control_continue_test_and_reset(struct stasis_app_control *control)
 {
-        int r;
-        SCOPED_AO2LOCK(lock, control);
+       int r;
+       SCOPED_AO2LOCK(lock, control);
 
-        r = control->continue_to_dialplan;
-        control->continue_to_dialplan = 0;
-        return r;
+       r = control->continue_to_dialplan;
+       control->continue_to_dialplan = 0;
+       return r;
 }
 
 void stasis_app_control_continue(struct stasis_app_control *control)
@@ -233,6 +316,33 @@ void stasis_app_control_continue(struct stasis_app_control *control)
        control->continue_to_dialplan = 1;
 }
 
+static int OK = 0;
+static int FAIL = -1;
+
+static void *__app_control_answer(struct stasis_app_control *control,
+                                 struct ast_channel *chan, void *data)
+{
+       ast_debug(3, "%s: Answering", control->channel_id);
+       return __ast_answer(chan, 0, 1) == 0 ? &OK : &FAIL;
+}
+
+int stasis_app_control_answer(struct stasis_app_control *control)
+{
+       RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
+       int *retval;
+
+       ast_debug(3, "%s: Sending answer command\n", control->channel_id);
+
+       command = command_create(__app_control_answer, NULL);
+       retval = exec_command(control, command);
+
+       if (*retval != 0) {
+               ast_log(LOG_WARNING, "Failed to answer channel");
+       }
+
+       return *retval;
+}
+
 static struct ast_json *app_event_create(
        const char *event_name,
        const struct ast_channel_snapshot *snapshot,
@@ -360,19 +470,17 @@ static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
        app_send(app, msg);
 }
 
-static void blob_handler(struct app *app, struct ast_channel_blob *blob)
-{
-       /* To simplify events, we'll only generate on DTMF end */
-       if (strcmp(ast_channel_blob_json_type(blob), "dtmf_end") == 0) {
-               dtmf_handler(app, blob);
-       }
-}
-
 static void sub_handler(void *data, struct stasis_subscription *sub,
                        struct stasis_topic *topic,
                        struct stasis_message *message)
 {
        struct app *app = data;
+
+       if (stasis_subscription_final_message(sub, message)) {
+               ao2_cleanup(data);
+               return;
+       }
+
        if (ast_channel_snapshot_type() == stasis_message_type(message)) {
                RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
                struct ast_channel_snapshot *snapshot =
@@ -383,13 +491,12 @@ static void sub_handler(void *data, struct stasis_subscription *sub,
                        return;
                }
                app_send(app, msg);
-       } else if (ast_channel_blob_type() == stasis_message_type(message)) {
+       } else if (ast_channel_dtmf_end_type() == stasis_message_type(message)) {
+               /* To simplify events, we'll only generate on DTMF end */
                struct ast_channel_blob *blob = stasis_message_data(message);
-               blob_handler(app, blob);
-       }
-       if (stasis_subscription_final_message(sub, message)) {
-               ao2_cleanup(data);
+               dtmf_handler(app, blob);
        }
+
 }
 
 /*!
@@ -410,6 +517,26 @@ static void control_unlink(struct stasis_app_control *control)
        ao2_cleanup(control);
 }
 
+static void dispatch_commands(struct stasis_app_control *control,
+                             struct ast_channel *chan)
+{
+       struct ao2_iterator i;
+       void *obj;
+
+       SCOPED_AO2LOCK(lock, control);
+
+       i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
+
+       while ((obj = ao2_iterator_next(&i))) {
+               RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
+               void *retval = command->callback(control, chan, command->data);
+               command_complete(command, retval);
+       }
+
+       ao2_iterator_destroy(&i);
+}
+
+
 /*! /brief Stasis dialplan application callback */
 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                    char *argv[])
@@ -458,8 +585,38 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                return res;
        }
 
-       while (!hungup && !control_continue_test_and_reset(control) && ast_waitfor(chan, -1) > -1) {
-               RAII_VAR(struct ast_frame *, f, ast_read(chan), ast_frame_dtor);
+       while (1) {
+               RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
+               int r;
+
+               if (hungup) {
+                       ast_debug(3, "%s: Hangup\n",
+                                 ast_channel_uniqueid(chan));
+                       break;
+               }
+
+               if (control_continue_test_and_reset(control)) {
+                       ast_debug(3, "%s: Continue\n",
+                                 ast_channel_uniqueid(chan));
+                       break;
+               }
+
+               r = ast_waitfor(chan, MAX_WAIT_MS);
+
+               if (r < 0) {
+                       ast_debug(3, "%s: Poll error\n",
+                                 ast_channel_uniqueid(chan));
+                       break;
+               }
+
+               dispatch_commands(control, chan);
+
+               if (r == 0) {
+                       /* Timeout */
+                       continue;
+               }
+
+               f = ast_read(chan);
                if (!f) {
                        ast_debug(3, "%s: No more frames. Must be done, I guess.\n", ast_channel_uniqueid(chan));
                        break;
@@ -468,8 +625,6 @@ int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
                switch (f->frametype) {
                case AST_FRAME_CONTROL:
                        if (f->subclass.integer == AST_CONTROL_HANGUP) {
-                               ast_debug(3, "%s: Received hangup\n",
-                                         ast_channel_uniqueid(chan));
                                hungup = 1;
                        }
                        break;