Merge "manager_channels.c: Fix allocation failure crash."
authorzuul <zuul@gerrit.asterisk.org>
Tue, 26 Apr 2016 03:00:51 +0000 (22:00 -0500)
committerGerrit Code Review <gerrit2@gerrit.digium.api>
Tue, 26 Apr 2016 03:00:51 +0000 (22:00 -0500)
apps/app_queue.c
configs/samples/sip.conf.sample
funcs/func_odbc.c
include/asterisk/bridge_channel_internal.h
main/bridge.c
main/bridge_channel.c
res/res_agi.c
res/stasis/control.c
res/stasis/control.h
res/stasis/stasis_bridge.c

index 40aed2f..927c496 100644 (file)
@@ -1554,7 +1554,6 @@ struct member {
        struct call_queue *lastqueue;        /*!< Last queue we received a call */
        unsigned int dead:1;                 /*!< Used to detect members deleted in realtime */
        unsigned int delme:1;                /*!< Flag to delete entry on reload */
-       unsigned int call_pending:1;         /*!< TRUE if the Q is attempting to place a call to the member. */
        char rt_uniqueid[80];                /*!< Unique id of realtime member entry */
        unsigned int ringinuse:1;            /*!< Flag to ring queue members even if their status is 'inuse' */
 };
@@ -2289,6 +2288,70 @@ static int get_member_status(struct call_queue *q, int max_penalty, int min_pena
        return -1;
 }
 
+/*
+ * A "pool" of member objects that calls are currently pending on. If an
+ * agent is a member of multiple queues it's possible for that agent to be
+ * called by each of the queues at the same time. This happens because device
+ * state is slow to notify the queue app of one of it's member's being rung.
+ * This "pool" allows us to track which members are currently being rung while
+ * we wait on the device state change.
+ */
+static struct ao2_container *pending_members;
+#define MAX_CALL_ATTEMPT_BUCKETS 353
+
+static int pending_members_hash(const void *obj, const int flags)
+{
+       const struct member *object;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               object = obj;
+               key = object->interface;
+               break;
+       default:
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_case_hash(key);
+}
+
+static int pending_members_cmp(void *obj, void *arg, int flags)
+{
+       const struct member *object_left = obj;
+       const struct member *object_right = arg;
+       const char *right_key = arg;
+       int cmp;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_OBJECT:
+               right_key = object_right->interface;
+               /* Fall through */
+       case OBJ_SEARCH_KEY:
+               cmp = strcasecmp(object_left->interface, right_key);
+               break;
+       case OBJ_SEARCH_PARTIAL_KEY:
+               /* Not supported by container. */
+               ast_assert(0);
+               return 0;
+       default:
+               cmp = 0;
+               break;
+       }
+       if (cmp) {
+               return 0;
+       }
+       return CMP_MATCH;
+}
+
+static void pending_members_remove(struct member *mem)
+{
+       ao2_find(pending_members, mem, OBJ_POINTER | OBJ_NODATA | OBJ_UNLINK);
+}
+
 /*! \brief set a member's status based on device state of that member's state_interface.
  *
  * Lock interface list find sc, iterate through each queues queue_member list for member to
@@ -2298,6 +2361,9 @@ static void update_status(struct call_queue *q, struct member *m, const int stat
 {
        m->status = status;
 
+       /* Whatever the status is clear the member from the pending members pool */
+       pending_members_remove(m);
+
        queue_publish_member_blob(queue_member_status_type(), queue_member_blob_create(q, m));
 }
 
@@ -3157,6 +3223,7 @@ static void member_add_to_queue(struct call_queue *queue, struct member *mem)
  */
 static void member_remove_from_queue(struct call_queue *queue, struct member *mem)
 {
+       pending_members_remove(mem);
        ao2_lock(queue->members);
        ast_devstate_changed(QUEUE_UNKNOWN_PAUSED_DEVSTATE, AST_DEVSTATE_CACHABLE, "Queue:%s_pause_%s", queue->name, mem->interface);
        queue_member_follower_removal(queue, mem);
@@ -4135,41 +4202,6 @@ static int member_status_available(int status)
 
 /*!
  * \internal
- * \brief Clear the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \return Nothing
- */
-static void member_call_pending_clear(struct member *mem)
-{
-       ao2_lock(mem);
-       mem->call_pending = 0;
-       ao2_unlock(mem);
-}
-
-/*!
- * \internal
- * \brief Set the member call pending flag.
- *
- * \param mem Queue member.
- *
- * \retval non-zero if call pending flag was already set.
- */
-static int member_call_pending_set(struct member *mem)
-{
-       int old_pending;
-
-       ao2_lock(mem);
-       old_pending = mem->call_pending;
-       mem->call_pending = 1;
-       ao2_unlock(mem);
-
-       return old_pending;
-}
-
-/*!
- * \internal
  * \brief Determine if can ring a queue entry.
  *
  * \param qe Queue entry to check.
@@ -4210,13 +4242,32 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
        }
 
        if (!call->member->ringinuse) {
-               if (member_call_pending_set(call->member)) {
-                       ast_debug(1, "%s has another call pending, can't receive call\n",
-                               call->interface);
+               struct member *mem;
+
+               ao2_lock(pending_members);
+
+               mem = ao2_find(pending_members, call->member,
+                                 OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
+               if (mem) {
+                       /*
+                        * If found that means this member is currently being attempted
+                        * from another calling thread, so stop trying from this thread
+                        */
+                       ast_debug(1, "%s has another call trying, can't receive call\n",
+                                 call->interface);
+                       ao2_ref(mem, -1);
+                       ao2_unlock(pending_members);
                        return 0;
                }
 
                /*
+                * If not found add it to the container so another queue
+                * won't attempt to call this member at the same time.
+                */
+               ao2_link(pending_members, call->member);
+               ao2_unlock(pending_members);
+
+               /*
                 * The queue member is available.  Get current status to be sure
                 * because the device state and extension state callbacks may
                 * not have updated the status yet.
@@ -4224,7 +4275,7 @@ static int can_ring_entry(struct queue_ent *qe, struct callattempt *call)
                if (!member_status_available(get_queue_member_status(call->member))) {
                        ast_debug(1, "%s actually not available, can't receive call\n",
                                call->interface);
-                       member_call_pending_clear(call->member);
+                       pending_members_remove(call->member);
                        return 0;
                }
        }
@@ -4261,7 +4312,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
                ++*busies;
                return 0;
        }
-       ast_assert(tmp->member->ringinuse || tmp->member->call_pending);
 
        ast_copy_string(tech, tmp->interface, sizeof(tech));
        if ((location = strchr(tech, '/'))) {
@@ -4278,7 +4328,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
                qe->linpos++;
                ao2_unlock(qe->parent);
 
-               member_call_pending_clear(tmp->member);
+               pending_members_remove(tmp->member);
 
                publish_dial_end_event(qe->chan, tmp, NULL, "BUSY");
                tmp->stillgoing = 0;
@@ -4349,7 +4399,7 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
                /* Again, keep going even if there's an error */
                ast_verb(3, "Couldn't call %s\n", tmp->interface);
                do_hang(tmp);
-               member_call_pending_clear(tmp->member);
+               pending_members_remove(tmp->member);
                ++*busies;
                return 0;
        }
@@ -4369,7 +4419,6 @@ static int ring_entry(struct queue_ent *qe, struct callattempt *tmp, int *busies
 
        ast_verb(3, "Called %s\n", tmp->interface);
 
-       member_call_pending_clear(tmp->member);
        return 1;
 }
 
@@ -9599,7 +9648,7 @@ static int manager_queues_summary(struct mansession *s, const struct message *m)
                ao2_lock(q);
 
                /* List queue properties */
-               if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+               if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
                        /* Reset the necessary local variables if no queuefilter is set*/
                        qmemcount = 0;
                        qmemavail = 0;
@@ -9677,7 +9726,7 @@ static int manager_queues_status(struct mansession *s, const struct message *m)
                ao2_lock(q);
 
                /* List queue properties */
-               if (ast_strlen_zero(queuefilter) || !strcmp(q->name, queuefilter)) {
+               if (ast_strlen_zero(queuefilter) || !strcasecmp(q->name, queuefilter)) {
                        sl = ((q->callscompleted > 0) ? 100 * ((float)q->callscompletedinsl / (float)q->callscompleted) : 0);
                        astman_append(s, "Event: QueueParams\r\n"
                                "Queue: %s\r\n"
@@ -10934,6 +10983,7 @@ static int unload_module(void)
        ast_extension_state_del(0, extension_state_cb);
 
        ast_unload_realtime("queue_members");
+       ao2_cleanup(pending_members);
        ao2_cleanup(queues);
        queues = NULL;
        return 0;
@@ -10962,6 +11012,13 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
+       pending_members = ao2_container_alloc(
+               MAX_CALL_ATTEMPT_BUCKETS, pending_members_hash, pending_members_cmp);
+       if (!pending_members) {
+               unload_module();
+               return AST_MODULE_LOAD_DECLINE;
+       }
+
        use_weight = 0;
 
        if (reload_handler(0, &mask, NULL)) {
index d89a2a1..8f28e26 100644 (file)
@@ -1509,7 +1509,6 @@ srvlookup=yes                   ; Enable DNS SRV lookups on outbound calls
 ;allow=ulaw
 ;allow=alaw
 ;mailbox=1234@default,1233@default ; Subscribe to status of multiple mailboxes
-;registertrying=yes              ; Send a 100 Trying when the device registers.
 
 ;[snom]
 ;type=friend                     ; Friends place calls and receive calls
index 23930ed..ca15d70 100644 (file)
@@ -137,6 +137,163 @@ struct odbc_datastore {
        char names[0];
 };
 
+/* \brief Data source name
+ *
+ * This holds data that pertains to a DSN
+ */
+struct dsn {
+       /*! A connection to the database */
+       struct odbc_obj *connection;
+       /*! The name of the DSN as defined in res_odbc.conf */
+       char name[0];
+};
+
+#define DSN_BUCKETS 37
+
+struct ao2_container *dsns;
+
+static int dsn_hash(const void *obj, const int flags)
+{
+       const struct dsn *object;
+       const char *key;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_KEY:
+               key = obj;
+               break;
+       case OBJ_SEARCH_OBJECT:
+               object = obj;
+               key = object->name;
+               break;
+       default:
+               ast_assert(0);
+               return 0;
+       }
+       return ast_str_hash(key);
+}
+
+static int dsn_cmp(void *obj, void *arg, int flags)
+{
+       const struct dsn *object_left = obj;
+       const struct dsn *object_right = arg;
+       const char *right_key = arg;
+       int cmp;
+
+       switch (flags & OBJ_SEARCH_MASK) {
+       case OBJ_SEARCH_OBJECT:
+               right_key = object_right->name;
+               /* Fall through */
+       case OBJ_SEARCH_KEY:
+               cmp = strcmp(object_left->name, right_key);
+               break;
+       case OBJ_SEARCH_PARTIAL_KEY:
+               cmp = strncmp(object_left->name, right_key, strlen(right_key));
+               break;
+       default:
+               cmp = 0;
+               break;
+       }
+
+       if (cmp) {
+               return 0;
+       }
+
+       return CMP_MATCH;
+}
+
+static void dsn_destructor(void *obj)
+{
+       struct dsn *dsn = obj;
+
+       if (dsn->connection) {
+               ast_odbc_release_obj(dsn->connection);
+       }
+}
+
+/*!
+ * \brief Create a DSN and connect to the database
+ *
+ * \param name The name of the DSN as found in res_odbc.conf
+ * \retval NULL Fail
+ * \retval non-NULL The newly-created structure
+ */
+static struct dsn *create_dsn(const char *name)
+{
+       struct dsn *dsn;
+
+       dsn = ao2_alloc(sizeof(*dsn) + strlen(name) + 1, dsn_destructor);
+       if (!dsn) {
+               return NULL;
+       }
+
+       /* Safe */
+       strcpy(dsn->name, name);
+
+       dsn->connection = ast_odbc_request_obj(name, 0);
+       if (!dsn->connection) {
+               ao2_ref(dsn, -1);
+               return NULL;
+       }
+
+       if (!ao2_link_flags(dsns, dsn, OBJ_NOLOCK)) {
+               ao2_ref(dsn, -1);
+               return NULL;
+       }
+
+       return dsn;
+}
+
+/*!
+ * \brief Retrieve a DSN, or create it if it does not exist.
+ *
+ * The created DSN is returned locked. This should be inconsequential
+ * to callers in most cases.
+ *
+ * When finished with the returned structure, the caller must call
+ * \ref release_dsn
+ *
+ * \param name Name of the DSN as found in res_odbc.conf
+ * \retval NULL Unable to retrieve or create the DSN
+ * \retval non-NULL The retrieved/created locked DSN
+ */
+static struct dsn *get_dsn(const char *name)
+{
+       struct dsn *dsn;
+
+       ao2_lock(dsns);
+       dsn = ao2_find(dsns, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
+       if (!dsn) {
+               dsn = create_dsn(name);
+       }
+       ao2_unlock(dsns);
+
+       if (!dsn) {
+               return NULL;
+       }
+
+       ao2_lock(dsn->connection);
+
+       return dsn;
+}
+
+/*!
+ * \brief Unlock and unreference a DSN
+ *
+ * \param dsn The dsn to unlock and unreference
+ * \return NULL
+ */
+static void *release_dsn(struct dsn *dsn)
+{
+       if (!dsn) {
+               return NULL;
+       }
+
+       ao2_unlock(dsn->connection);
+       ao2_ref(dsn, -1);
+
+       return NULL;
+}
+
 static AST_RWLIST_HEAD_STATIC(queries, acf_odbc_query);
 
 static int resultcount = 0;
@@ -214,7 +371,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
        struct odbc_obj *obj = NULL;
        struct acf_odbc_query *query;
        char *t, varname[15];
-       int i, dsn, bogus_chan = 0;
+       int i, dsn_num, bogus_chan = 0;
        int transactional = 0;
        AST_DECLARE_APP_ARGS(values,
                AST_APP_ARG(field)[100];
@@ -227,6 +384,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
        struct ast_str *buf = ast_str_thread_get(&sql_buf, 16);
        struct ast_str *insertbuf = ast_str_thread_get(&sql2_buf, 16);
        const char *status = "FAILURE";
+       struct dsn *dsn = NULL;
 
        if (!buf || !insertbuf) {
                return -1;
@@ -324,17 +482,21 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
         * to multiple DSNs.  We MUST have a single handle all the way through the
         * transaction, or else we CANNOT enforce atomicity.
         */
-       for (dsn = 0; dsn < 5; dsn++) {
-               if (!ast_strlen_zero(query->writehandle[dsn])) {
+       for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+               if (!ast_strlen_zero(query->writehandle[dsn_num])) {
                        if (transactional) {
                                /* This can only happen second time through or greater. */
                                ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
                        }
 
-                       if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+                       if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
                                transactional = 1;
                        } else {
-                               obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+                               dsn = get_dsn(query->writehandle[dsn_num]);
+                               if (!dsn) {
+                                       continue;
+                               }
+                               obj = dsn->connection;
                                transactional = 0;
                        }
 
@@ -342,10 +504,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
                                break;
                        }
 
-                       if (obj && !transactional) {
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
-                       }
+                       dsn = release_dsn(dsn);
                }
        }
 
@@ -358,25 +517,25 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
                        status = "SUCCESS";
 
                } else if (query->sql_insert) {
-                       if (obj && !transactional) {
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
-                       }
+                       dsn = release_dsn(dsn);
 
-                       for (transactional = 0, dsn = 0; dsn < 5; dsn++) {
-                               if (!ast_strlen_zero(query->writehandle[dsn])) {
+                       for (transactional = 0, dsn_num = 0; dsn_num < 5; dsn_num++) {
+                               if (!ast_strlen_zero(query->writehandle[dsn_num])) {
                                        if (transactional) {
                                                /* This can only happen second time through or greater. */
                                                ast_log(LOG_WARNING, "Transactions do not work well with multiple DSNs for 'writehandle'\n");
                                        } else if (obj) {
-                                               ast_odbc_release_obj(obj);
-                                               obj = NULL;
+                                               dsn = release_dsn(dsn);
                                        }
 
-                                       if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn]))) {
+                                       if ((obj = ast_odbc_retrieve_transaction_obj(chan, query->writehandle[dsn_num]))) {
                                                transactional = 1;
                                        } else {
-                                               obj = ast_odbc_request_obj(query->writehandle[dsn], 0);
+                                               dsn = get_dsn(query->writehandle[dsn_num]);
+                                               if (!dsn) {
+                                                       continue;
+                                               }
+                                               obj = dsn->connection;
                                                transactional = 0;
                                        }
                                        if (obj) {
@@ -406,10 +565,7 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
                pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
        }
 
-       if (obj && !transactional) {
-               ast_odbc_release_obj(obj);
-               obj = NULL;
-       }
+       dsn = release_dsn(dsn);
 
        if (!bogus_chan) {
                ast_autoservice_stop(chan);
@@ -420,11 +576,10 @@ static int acf_odbc_write(struct ast_channel *chan, const char *cmd, char *s, co
 
 static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, char *buf, size_t len)
 {
-       struct odbc_obj *obj = NULL;
        struct acf_odbc_query *query;
        char varname[15], rowcount[12] = "-1";
        struct ast_str *colnames = ast_str_thread_get(&colnames_buf, 16);
-       int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn, bogus_chan = 0;
+       int res, x, y, buflen = 0, escapecommas, rowlimit = 1, multirow = 0, dsn_num, bogus_chan = 0;
        AST_DECLARE_APP_ARGS(args,
                AST_APP_ARG(field)[100];
        );
@@ -436,6 +591,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
        struct odbc_datastore_row *row = NULL;
        struct ast_str *sql = ast_str_thread_get(&sql_buf, 16);
        const char *status = "FAILURE";
+       struct dsn *dsn = NULL;
 
        if (!sql || !colnames) {
                if (chan) {
@@ -523,28 +679,23 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
        }
        AST_RWLIST_UNLOCK(&queries);
 
-       for (dsn = 0; dsn < 5; dsn++) {
-               if (!ast_strlen_zero(query->readhandle[dsn])) {
-                       obj = ast_odbc_request_obj(query->readhandle[dsn], 0);
-                       if (obj) {
-                               stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql));
+       for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+               if (!ast_strlen_zero(query->readhandle[dsn_num])) {
+                       dsn = get_dsn(query->readhandle[dsn_num]);
+                       if (!dsn) {
+                               continue;
                        }
+                       stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql));
                }
                if (stmt) {
                        break;
                }
-               if (obj) {
-                       ast_odbc_release_obj(obj);
-                       obj = NULL;
-               }
+               dsn = release_dsn(dsn);
        }
 
        if (!stmt) {
                ast_log(LOG_ERROR, "Unable to execute query [%s]\n", ast_str_buffer(sql));
-               if (obj) {
-                       ast_odbc_release_obj(obj);
-                       obj = NULL;
-               }
+               dsn = release_dsn(dsn);
                if (!bogus_chan) {
                        pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                        ast_autoservice_stop(chan);
@@ -558,8 +709,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                ast_log(LOG_WARNING, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
                SQLCloseCursor(stmt);
                SQLFreeHandle (SQL_HANDLE_STMT, stmt);
-               ast_odbc_release_obj(obj);
-               obj = NULL;
+               dsn = release_dsn(dsn);
                if (!bogus_chan) {
                        pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                        ast_autoservice_stop(chan);
@@ -583,8 +733,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                }
                SQLCloseCursor(stmt);
                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-               ast_odbc_release_obj(obj);
-               obj = NULL;
+               dsn = release_dsn(dsn);
                if (!bogus_chan) {
                        pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                        pbx_builtin_setvar_helper(chan, "ODBCSTATUS", status);
@@ -607,8 +756,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                                odbc_datastore_free(resultset);
                                SQLCloseCursor(stmt);
                                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                               dsn = release_dsn(dsn);
                                if (!bogus_chan) {
                                        pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
                                        ast_autoservice_stop(chan);
@@ -640,8 +788,7 @@ static int acf_odbc_read(struct ast_channel *chan, const char *cmd, char *s, cha
                                                odbc_datastore_free(resultset);
                                                SQLCloseCursor(stmt);
                                                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                                               ast_odbc_release_obj(obj);
-                                               obj = NULL;
+                                               dsn = release_dsn(dsn);
                                                if (!bogus_chan) {
                                                        pbx_builtin_setvar_helper(chan, "ODBCROWS", rowcount);
                                                        pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
@@ -750,8 +897,7 @@ end_acf_read:
                                odbc_datastore_free(resultset);
                                SQLCloseCursor(stmt);
                                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                               dsn = release_dsn(dsn);
                                pbx_builtin_setvar_helper(chan, "ODBCSTATUS", "MEMERROR");
                                ast_autoservice_stop(chan);
                                return -1;
@@ -764,8 +910,7 @@ end_acf_read:
        }
        SQLCloseCursor(stmt);
        SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-       ast_odbc_release_obj(obj);
-       obj = NULL;
+       dsn = release_dsn(dsn);
        if (resultset && !multirow) {
                /* Fetch the first resultset */
                if (!acf_fetch(chan, "", buf, buf, len)) {
@@ -1192,8 +1337,8 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
 
        if (a->argc == 5 && !strcmp(a->argv[4], "exec")) {
                /* Execute the query */
-               struct odbc_obj *obj = NULL;
-               int dsn, executed = 0;
+               struct dsn *dsn = NULL;
+               int dsn_num, executed = 0;
                SQLHSTMT stmt;
                int rows = 0, res, x;
                SQLSMALLINT colcount = 0, collength;
@@ -1207,19 +1352,18 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                        return CLI_SUCCESS;
                }
 
-               for (dsn = 0; dsn < 5; dsn++) {
-                       if (ast_strlen_zero(query->readhandle[dsn])) {
+               for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+                       if (ast_strlen_zero(query->readhandle[dsn_num])) {
                                continue;
                        }
-                       ast_debug(1, "Found handle %s\n", query->readhandle[dsn]);
-                       if (!(obj = ast_odbc_request_obj(query->readhandle[dsn], 0))) {
+                       dsn = get_dsn(query->readhandle[dsn_num]);
+                       if (!dsn) {
                                continue;
                        }
+                       ast_debug(1, "Found handle %s\n", query->readhandle[dsn_num]);
 
-                       ast_debug(1, "Got obj\n");
-                       if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                       if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+                               dsn = release_dsn(dsn);
                                continue;
                        }
 
@@ -1230,8 +1374,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                                ast_cli(a->fd, "SQL Column Count error!\n[%s]\n\n", ast_str_buffer(sql));
                                SQLCloseCursor(stmt);
                                SQLFreeHandle (SQL_HANDLE_STMT, stmt);
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                               dsn = release_dsn(dsn);
                                AST_RWLIST_UNLOCK(&queries);
                                return CLI_SUCCESS;
                        }
@@ -1240,10 +1383,9 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                        if ((res != SQL_SUCCESS) && (res != SQL_SUCCESS_WITH_INFO)) {
                                SQLCloseCursor(stmt);
                                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                               dsn = release_dsn(dsn);
                                if (res == SQL_NO_DATA) {
-                                       ast_cli(a->fd, "Returned %d rows.  Query executed on handle %d:%s [%s]\n", rows, dsn, query->readhandle[dsn], ast_str_buffer(sql));
+                                       ast_cli(a->fd, "Returned %d rows.  Query executed on handle %d:%s [%s]\n", rows, dsn_num, query->readhandle[dsn_num], ast_str_buffer(sql));
                                        break;
                                } else {
                                        ast_cli(a->fd, "Error %d in FETCH [%s]\n", res, ast_str_buffer(sql));
@@ -1270,8 +1412,7 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                                                ast_cli(a->fd, "SQL Get Data error %d!\n[%s]\n\n", res, ast_str_buffer(sql));
                                                SQLCloseCursor(stmt);
                                                SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                                               ast_odbc_release_obj(obj);
-                                               obj = NULL;
+                                               dsn = release_dsn(dsn);
                                                AST_RWLIST_UNLOCK(&queries);
                                                return CLI_SUCCESS;
                                        }
@@ -1289,15 +1430,11 @@ static char *cli_odbc_read(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                        }
                        SQLCloseCursor(stmt);
                        SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                       ast_odbc_release_obj(obj);
-                       obj = NULL;
-                       ast_cli(a->fd, "Returned %d row%s.  Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn, query->readhandle[dsn]);
+                       dsn = release_dsn(dsn);
+                       ast_cli(a->fd, "Returned %d row%s.  Query executed on handle %d [%s]\n", rows, rows == 1 ? "" : "s", dsn_num, query->readhandle[dsn_num]);
                        break;
                }
-               if (obj) {
-                       ast_odbc_release_obj(obj);
-                       obj = NULL;
-               }
+               dsn = release_dsn(dsn);
 
                if (!executed) {
                        ast_cli(a->fd, "Failed to execute query. [%s]\n", ast_str_buffer(sql));
@@ -1420,30 +1557,29 @@ static char *cli_odbc_write(struct ast_cli_entry *e, int cmd, struct ast_cli_arg
 
        if (a->argc == 6 && !strcmp(a->argv[5], "exec")) {
                /* Execute the query */
-               struct odbc_obj *obj = NULL;
-               int dsn, executed = 0;
+               struct dsn *dsn;
+               int dsn_num, executed = 0;
                SQLHSTMT stmt;
                SQLLEN rows = -1;
 
-               for (dsn = 0; dsn < 5; dsn++) {
-                       if (ast_strlen_zero(query->writehandle[dsn])) {
+               for (dsn_num = 0; dsn_num < 5; dsn_num++) {
+                       if (ast_strlen_zero(query->writehandle[dsn_num])) {
                                continue;
                        }
-                       if (!(obj = ast_odbc_request_obj(query->writehandle[dsn], 0))) {
+                       dsn = get_dsn(query->writehandle[dsn_num]);
+                       if (!dsn) {
                                continue;
                        }
-                       if (!(stmt = ast_odbc_direct_execute(obj, generic_execute, ast_str_buffer(sql)))) {
-                               ast_odbc_release_obj(obj);
-                               obj = NULL;
+                       if (!(stmt = ast_odbc_direct_execute(dsn->connection, generic_execute, ast_str_buffer(sql)))) {
+                               dsn = release_dsn(dsn);
                                continue;
                        }
 
                        SQLRowCount(stmt, &rows);
                        SQLCloseCursor(stmt);
                        SQLFreeHandle(SQL_HANDLE_STMT, stmt);
-                       ast_odbc_release_obj(obj);
-                       obj = NULL;
-                       ast_cli(a->fd, "Affected %d rows.  Query executed on handle %d [%s]\n", (int)rows, dsn, query->writehandle[dsn]);
+                       dsn = release_dsn(dsn);
+                       ast_cli(a->fd, "Affected %d rows.  Query executed on handle %d [%s]\n", (int)rows, dsn_num, query->writehandle[dsn_num]);
                        executed = 1;
                        break;
                }
@@ -1470,6 +1606,11 @@ static int load_module(void)
        char *catg;
        struct ast_flags config_flags = { 0 };
 
+       dsns = ao2_container_alloc(DSN_BUCKETS, dsn_hash, dsn_cmp);
+       if (!dsns) {
+               return AST_MODULE_LOAD_DECLINE;
+       }
+
        res |= ast_custom_function_register(&fetch_function);
        res |= ast_register_application_xml(app_odbcfinish, exec_odbcfinish);
        AST_RWLIST_WRLOCK(&queries);
@@ -1478,6 +1619,7 @@ static int load_module(void)
        if (!cfg || cfg == CONFIG_STATUS_FILEINVALID) {
                ast_log(LOG_NOTICE, "Unable to load config for func_odbc: %s\n", config);
                AST_RWLIST_UNLOCK(&queries);
+               ao2_ref(dsns, -1);
                return AST_MODULE_LOAD_DECLINE;
        }
 
@@ -1531,6 +1673,8 @@ static int unload_module(void)
        AST_RWLIST_WRLOCK(&queries);
 
        AST_RWLIST_UNLOCK(&queries);
+
+       ao2_ref(dsns, -1);
        return res;
 }
 
index 7f7d5a8..fb8e781 100644 (file)
@@ -151,47 +151,20 @@ int bridge_channel_internal_push_full(struct ast_bridge_channel *bridge_channel,
 void bridge_channel_internal_pull(struct ast_bridge_channel *bridge_channel);
 
 /*!
- * \brief Internal bridge channel wait condition and associated result.
- */
-struct bridge_channel_internal_cond {
-       /*! Lock for the data structure */
-       ast_mutex_t lock;
-       /*! Wait condition */
-       ast_cond_t cond;
-       /*! Wait until done */
-       int done;
-       /*! The bridge channel */
-       struct ast_bridge_channel *bridge_channel;
-};
-
-/*!
- * \internal
- * \brief Wait for the expected signal.
- * \since 13.5.0
- *
- * \param cond the wait object
- *
- * \return Nothing
- */
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond);
-
-/*!
- * \internal
- * \brief Signal the condition wait.
- * \since 13.5.0
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
  *
- * \param cond the wait object
+ * \param chan Channel imparted that we need to signal.
  *
  * \return Nothing
  */
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
+void bridge_channel_impart_signal(struct ast_channel *chan);
 
 /*!
  * \internal
  * \brief Join the bridge_channel to the bridge (blocking)
  *
  * \param bridge_channel The Channel in the bridge
- * \param cond data used for signaling
  *
  * \note The bridge_channel->swap holds a channel reference for the swap
  * channel going into the bridging system.  The ref ensures that the swap
@@ -206,8 +179,7 @@ void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond);
  * \retval 0 bridge channel successfully joined the bridge
  * \retval -1 bridge channel failed to join the bridge
  */
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
-                                struct bridge_channel_internal_cond *cond);
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel);
 
 /*!
  * \internal
index 64c750b..ce6b960 100644 (file)
@@ -1483,6 +1483,150 @@ void ast_bridge_notify_masquerade(struct ast_channel *chan)
        ao2_ref(bridge_channel, -1);
 }
 
+/*!
+ * \brief Internal bridge impart wait condition and associated conditional.
+ */
+struct bridge_channel_impart_cond {
+       AST_LIST_ENTRY(bridge_channel_impart_cond) node;
+       /*! Lock for the data structure */
+       ast_mutex_t lock;
+       /*! Wait condition */
+       ast_cond_t cond;
+       /*! Wait until done */
+       int done;
+};
+
+AST_LIST_HEAD_NOLOCK(bridge_channel_impart_ds_head, bridge_channel_impart_cond);
+
+/*!
+ * \internal
+ * \brief Signal imparting threads to wake up.
+ * \since 13.9.0
+ *
+ * \param ds_head List of imparting threads to wake up.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_signal(struct bridge_channel_impart_ds_head *ds_head)
+{
+       if (ds_head) {
+               struct bridge_channel_impart_cond *cond;
+
+               while ((cond = AST_LIST_REMOVE_HEAD(ds_head, node))) {
+                       ast_mutex_lock(&cond->lock);
+                       cond->done = 1;
+                       ast_cond_signal(&cond->cond);
+                       ast_mutex_unlock(&cond->lock);
+               }
+       }
+}
+
+static void bridge_channel_impart_ds_head_dtor(void *doomed)
+{
+       bridge_channel_impart_ds_head_signal(doomed);
+       ast_free(doomed);
+}
+
+/*!
+ * \internal
+ * \brief Fixup the bridge impart datastore.
+ * \since 13.9.0
+ *
+ * \param data Bridge impart datastore data to fixup from old_chan.
+ * \param old_chan The datastore is moving from this channel.
+ * \param new_chan The datastore is moving to this channel.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_ds_head_fixup(void *data, struct ast_channel *old_chan, struct ast_channel *new_chan)
+{
+       /*
+        * Signal any waiting impart threads.  The masquerade is going to kill
+        * old_chan and we don't need to be waiting on new_chan.
+        */
+       bridge_channel_impart_ds_head_signal(data);
+}
+
+static const struct ast_datastore_info bridge_channel_impart_ds_info = {
+       .type = "bridge-impart-ds",
+       .destroy = bridge_channel_impart_ds_head_dtor,
+       .chan_fixup = bridge_channel_impart_ds_head_fixup,
+};
+
+/*!
+ * \internal
+ * \brief Add impart wait datastore conditional to channel.
+ * \since 13.9.0
+ *
+ * \param chan Channel to add the impart wait conditional.
+ * \param cond Imparting conditional to add.
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int bridge_channel_impart_add(struct ast_channel *chan, struct bridge_channel_impart_cond *cond)
+{
+       struct ast_datastore *datastore;
+       struct bridge_channel_impart_ds_head *ds_head;
+
+       ast_channel_lock(chan);
+
+       datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+       if (!datastore) {
+               datastore = ast_datastore_alloc(&bridge_channel_impart_ds_info, NULL);
+               if (!datastore) {
+                       ast_channel_unlock(chan);
+                       return -1;
+               }
+               ds_head = ast_calloc(1, sizeof(*ds_head));
+               if (!ds_head) {
+                       ast_channel_unlock(chan);
+                       ast_datastore_free(datastore);
+                       return -1;
+               }
+               datastore->data = ds_head;
+               ast_channel_datastore_add(chan, datastore);
+       } else {
+               ds_head = datastore->data;
+               ast_assert(ds_head != NULL);
+       }
+
+       AST_LIST_INSERT_TAIL(ds_head, cond, node);
+
+       ast_channel_unlock(chan);
+       return 0;
+}
+
+void bridge_channel_impart_signal(struct ast_channel *chan)
+{
+       struct ast_datastore *datastore;
+
+       ast_channel_lock(chan);
+       datastore = ast_channel_datastore_find(chan, &bridge_channel_impart_ds_info, NULL);
+       if (datastore) {
+               bridge_channel_impart_ds_head_signal(datastore->data);
+       }
+       ast_channel_unlock(chan);
+}
+
+/*!
+ * \internal
+ * \brief Block imparting channel thread until signaled.
+ * \since 13.9.0
+ *
+ * \param cond Imparting conditional to wait for.
+ *
+ * \return Nothing
+ */
+static void bridge_channel_impart_wait(struct bridge_channel_impart_cond *cond)
+{
+       ast_mutex_lock(&cond->lock);
+       while (!cond->done) {
+               ast_cond_wait(&cond->cond, &cond->lock);
+       }
+       ast_mutex_unlock(&cond->lock);
+}
+
 /*
  * XXX ASTERISK-21271 make ast_bridge_join() require features to be allocated just like ast_bridge_impart() and not expect the struct back.
  *
@@ -1551,7 +1695,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
        }
 
        if (!res) {
-               res = bridge_channel_internal_join(bridge_channel, NULL);
+               res = bridge_channel_internal_join(bridge_channel);
        }
 
        /* Cleanup all the data in the bridge channel after it leaves the bridge. */
@@ -1568,6 +1712,7 @@ int ast_bridge_join(struct ast_bridge *bridge,
 
 join_exit:;
        ast_bridge_run_after_callback(chan);
+       bridge_channel_impart_signal(chan);
        if (!(ast_channel_softhangup_internal_flag(chan) & AST_SOFTHANGUP_ASYNCGOTO)
                && !ast_bridge_setup_after_goto(chan)) {
                /* Claim the after bridge goto is an async goto destination. */
@@ -1581,14 +1726,13 @@ join_exit:;
 /*! \brief Thread responsible for imparted bridged channels to be departed */
 static void *bridge_channel_depart_thread(void *data)
 {
-       struct bridge_channel_internal_cond *cond = data;
-       struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+       struct ast_bridge_channel *bridge_channel = data;
 
        if (bridge_channel->callid) {
                ast_callid_threadassoc_add(bridge_channel->callid);
        }
 
-       bridge_channel_internal_join(bridge_channel, cond);
+       bridge_channel_internal_join(bridge_channel);
 
        /*
         * cleanup
@@ -1601,6 +1745,8 @@ static void *bridge_channel_depart_thread(void *data)
        bridge_channel->features = NULL;
 
        ast_bridge_discard_after_callback(bridge_channel->chan, AST_BRIDGE_AFTER_CB_REASON_DEPART);
+       /* If join failed there will be impart threads waiting. */
+       bridge_channel_impart_signal(bridge_channel->chan);
        ast_bridge_discard_after_goto(bridge_channel->chan);
 
        return NULL;
@@ -1609,15 +1755,14 @@ static void *bridge_channel_depart_thread(void *data)
 /*! \brief Thread responsible for independent imparted bridged channels */
 static void *bridge_channel_ind_thread(void *data)
 {
-       struct bridge_channel_internal_cond *cond = data;
-       struct ast_bridge_channel *bridge_channel = cond->bridge_channel;
+       struct ast_bridge_channel *bridge_channel = data;
        struct ast_channel *chan;
 
        if (bridge_channel->callid) {
                ast_callid_threadassoc_add(bridge_channel->callid);
        }
 
-       bridge_channel_internal_join(bridge_channel, cond);
+       bridge_channel_internal_join(bridge_channel);
        chan = bridge_channel->chan;
 
        /* cleanup */
@@ -1634,15 +1779,18 @@ static void *bridge_channel_ind_thread(void *data)
        ao2_ref(bridge_channel, -1);
 
        ast_bridge_run_after_callback(chan);
+       /* If join failed there will be impart threads waiting. */
+       bridge_channel_impart_signal(chan);
        ast_bridge_run_after_goto(chan);
        return NULL;
 }
 
-int ast_bridge_impart(struct ast_bridge *bridge,
+static int bridge_impart_internal(struct ast_bridge *bridge,
        struct ast_channel *chan,
        struct ast_channel *swap,
        struct ast_bridge_features *features,
-       enum ast_bridge_impart_flags flags)
+       enum ast_bridge_impart_flags flags,
+       struct bridge_channel_impart_cond *cond)
 {
        int res = 0;
        struct ast_bridge_channel *bridge_channel;
@@ -1701,27 +1849,20 @@ int ast_bridge_impart(struct ast_bridge *bridge,
 
        /* Actually create the thread that will handle the channel */
        if (!res) {
-               struct bridge_channel_internal_cond cond = {
-                       .done = 0,
-                       .bridge_channel = bridge_channel
-               };
-               ast_mutex_init(&cond.lock);
-               ast_cond_init(&cond.cond, NULL);
-
+               res = bridge_channel_impart_add(chan, cond);
+       }
+       if (!res) {
                if ((flags & AST_BRIDGE_IMPART_CHAN_MASK) == AST_BRIDGE_IMPART_CHAN_INDEPENDENT) {
                        res = ast_pthread_create_detached(&bridge_channel->thread, NULL,
-                               bridge_channel_ind_thread, &cond);
+                               bridge_channel_ind_thread, bridge_channel);
                } else {
                        res = ast_pthread_create(&bridge_channel->thread, NULL,
-                               bridge_channel_depart_thread, &cond);
+                               bridge_channel_depart_thread, bridge_channel);
                }
 
                if (!res) {
-                       bridge_channel_internal_wait(&cond);
+                       bridge_channel_impart_wait(cond);
                }
-
-               ast_cond_destroy(&cond.cond);
-               ast_mutex_destroy(&cond.lock);
        }
 
        if (res) {
@@ -1742,6 +1883,32 @@ int ast_bridge_impart(struct ast_bridge *bridge,
        return 0;
 }
 
+int ast_bridge_impart(struct ast_bridge *bridge,
+       struct ast_channel *chan,
+       struct ast_channel *swap,
+       struct ast_bridge_features *features,
+       enum ast_bridge_impart_flags flags)
+{
+       struct bridge_channel_impart_cond cond = {
+               .done = 0,
+       };
+       int res;
+
+       ast_mutex_init(&cond.lock);
+       ast_cond_init(&cond.cond, NULL);
+
+       res = bridge_impart_internal(bridge, chan, swap, features, flags, &cond);
+       if (res) {
+               /* Impart failed.  Signal any other waiting impart threads */
+               bridge_channel_impart_signal(chan);
+       }
+
+       ast_cond_destroy(&cond.cond);
+       ast_mutex_destroy(&cond.lock);
+
+       return res;
+}
+
 int ast_bridge_depart(struct ast_channel *chan)
 {
        struct ast_bridge_channel *bridge_channel;
index 66f26ee..db4ecfe 100644 (file)
@@ -2637,27 +2637,7 @@ static void bridge_channel_event_join_leave(struct ast_bridge_channel *bridge_ch
        ao2_iterator_destroy(&iter);
 }
 
-void bridge_channel_internal_wait(struct bridge_channel_internal_cond *cond)
-{
-       ast_mutex_lock(&cond->lock);
-       while (!cond->done) {
-               ast_cond_wait(&cond->cond, &cond->lock);
-       }
-       ast_mutex_unlock(&cond->lock);
-}
-
-void bridge_channel_internal_signal(struct bridge_channel_internal_cond *cond)
-{
-       if (cond) {
-               ast_mutex_lock(&cond->lock);
-               cond->done = 1;
-               ast_cond_signal(&cond->cond);
-               ast_mutex_unlock(&cond->lock);
-       }
-}
-
-int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
-                                struct bridge_channel_internal_cond *cond)
+int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel)
 {
        int res = 0;
        struct ast_bridge_features *channel_features;
@@ -2687,7 +2667,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
                        bridge_channel->bridge->uniqueid,
                        bridge_channel,
                        ast_channel_name(bridge_channel->chan));
-               bridge_channel_internal_signal(cond);
                return -1;
        }
        ast_channel_internal_bridge_set(bridge_channel->chan, bridge_channel->bridge);
@@ -2722,8 +2701,6 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
        }
        bridge_reconfigured(bridge_channel->bridge, !bridge_channel->inhibit_colp);
 
-       bridge_channel_internal_signal(cond);
-
        if (bridge_channel->state == BRIDGE_CHANNEL_STATE_WAIT) {
                /*
                 * Indicate a source change since this channel is entering the
@@ -2735,6 +2712,7 @@ int bridge_channel_internal_join(struct ast_bridge_channel *bridge_channel,
                        ast_indicate(bridge_channel->chan, AST_CONTROL_SRCCHANGE);
                }
 
+               bridge_channel_impart_signal(bridge_channel->chan);
                ast_bridge_unlock(bridge_channel->bridge);
 
                /* Must release any swap ref after unlocking the bridge. */
index f6ce749..e8249e2 100644 (file)
@@ -3736,6 +3736,24 @@ static enum agi_result agi_handle_command(struct ast_channel *chan, AGI *agi, ch
 
        return AGI_RESULT_SUCCESS;
 }
+
+AST_LIST_HEAD_NOLOCK(deferred_frames, ast_frame);
+
+static void queue_deferred_frames(struct deferred_frames *deferred_frames,
+       struct ast_channel *chan)
+{
+       struct ast_frame *f;
+
+       if (!AST_LIST_EMPTY(deferred_frames)) {
+               ast_channel_lock(chan);
+               while ((f = AST_LIST_REMOVE_HEAD(deferred_frames, frame_list))) {
+                       ast_queue_frame_head(chan, f);
+                       ast_frfree(f);
+               }
+               ast_channel_unlock(chan);
+       }
+}
+
 static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi, int pid, int *status, int dead, int argc, char *argv[])
 {
        struct ast_channel *c;
@@ -3754,6 +3772,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
        const char *sighup_str;
        const char *exit_on_hangup_str;
        int exit_on_hangup;
+       struct deferred_frames deferred_frames;
+
+       AST_LIST_HEAD_INIT_NOLOCK(&deferred_frames);
 
        ast_channel_lock(chan);
        sighup_str = pbx_builtin_getvar_helper(chan, "AGISIGHUP");
@@ -3815,8 +3836,20 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
                                        /* Write, ignoring errors */
                                        if (write(agi->audio, f->data.ptr, f->datalen) < 0) {
                                        }
+                                       ast_frfree(f);
+                               } else if (ast_is_deferrable_frame(f)) {
+                                       struct ast_frame *dup_f;
+
+                                       if ((dup_f = ast_frisolate(f))) {
+                                               AST_LIST_INSERT_HEAD(&deferred_frames, dup_f, frame_list);
+                                       }
+
+                                       if (dup_f != f) {
+                                               ast_frfree(f);
+                                       }
+                               } else {
+                                       ast_frfree(f);
                                }
-                               ast_frfree(f);
                        }
                } else if (outfd > -1) {
                        size_t len = sizeof(buf);
@@ -3864,6 +3897,8 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
                                buf[buflen - 1] = '\0';
                        }
 
+                       queue_deferred_frames(&deferred_frames, chan);
+
                        if (agidebug)
                                ast_verbose("<%s>AGI Rx << %s\n", ast_channel_name(chan), buf);
                        cmd_status = agi_handle_command(chan, agi, buf, dead);
@@ -3885,6 +3920,9 @@ static enum agi_result run_agi(struct ast_channel *chan, char *request, AGI *agi
                        }
                }
        }
+
+       queue_deferred_frames(&deferred_frames, chan);
+
        if (agi->speech) {
                ast_speech_destroy(agi->speech);
        }
index 3c5b750..aa6866a 100644 (file)
@@ -903,11 +903,8 @@ static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
                ast_bridge_after_cb_reason_string(reason));
 }
 
-int control_add_channel_to_bridge(
-       struct stasis_app_control *control,
-       struct ast_channel *chan, void *data)
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap)
 {
-       struct ast_bridge *bridge = data;
        int res;
 
        if (!control || !bridge) {
@@ -960,7 +957,7 @@ int control_add_channel_to_bridge(
 
                res = ast_bridge_impart(bridge,
                        chan,
-                       NULL, /* swap channel */
+                       swap,
                        NULL, /* features */
                        AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
                if (res != 0) {
@@ -976,6 +973,11 @@ int control_add_channel_to_bridge(
        return 0;
 }
 
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data)
+{
+       return control_swap_channel_in_bridge(control, data, chan, NULL);
+}
+
 int stasis_app_control_add_channel_to_bridge(
        struct stasis_app_control *control, struct ast_bridge *bridge)
 {
index 1d37a49..868a809 100644 (file)
@@ -111,12 +111,20 @@ struct stasis_app *control_app(struct stasis_app_control *control);
  * \brief Command callback for adding a channel to a bridge
  *
  * \param control The control for chan
- * \param channel The channel on which commands should be executed
- * \param bridge Data to be passed to the callback
+ * \param chan The channel on which commands should be executed
+ * \param data Bridge to be passed to the callback
+ */
+int control_add_channel_to_bridge(struct stasis_app_control *control, struct ast_channel *chan, void *data);
+
+/*!
+ * \brief Command for swapping a channel in a bridge
+ *
+ * \param control The control for chan
+ * \param chan The channel on which commands should be executed
+ * \param bridge Bridge to be passed to the callback
+ * \param swap Channel to swap with when joining the bridge
  */
-int control_add_channel_to_bridge(
-       struct stasis_app_control *control,
-       struct ast_channel *chan, void *obj);
+int control_swap_channel_in_bridge(struct stasis_app_control *control, struct ast_bridge *bridge, struct ast_channel *chan, struct ast_channel *swap);
 
 /*!
  * \brief Stop playing silence to a channel right now.
index bfd2872..aa21ec2 100644 (file)
@@ -76,24 +76,54 @@ static void bridge_stasis_run_cb(struct ast_channel *chan, void *data)
        pbx_exec(chan, app_stasis, app_name);
 }
 
-static int add_channel_to_bridge(
+struct defer_bridge_add_obj {
+       /*! Bridge to join (has ref) */
+       struct ast_bridge *bridge;
+       /*!
+        * \brief Channel to swap with in the bridge. (has ref)
+        *
+        * \note NULL if not swapping with a channel.
+        */
+       struct ast_channel *swap;
+};
+
+static void defer_bridge_add_dtor(void *obj)
+{
+       struct defer_bridge_add_obj *defer = obj;
+
+       ao2_cleanup(defer->bridge);
+       ast_channel_cleanup(defer->swap);
+}
+
+static int defer_bridge_add(
        struct stasis_app_control *control,
        struct ast_channel *chan, void *obj)
 {
-       struct ast_bridge *bridge = obj;
-       int res;
+       struct defer_bridge_add_obj *defer = obj;
 
-       res = control_add_channel_to_bridge(control,
-               chan, bridge);
-       return res;
+       return control_swap_channel_in_bridge(control, defer->bridge, chan, defer->swap);
 }
 
 static void bridge_stasis_queue_join_action(struct ast_bridge *self,
-       struct ast_bridge_channel *bridge_channel)
+       struct ast_bridge_channel *bridge_channel, struct ast_bridge_channel *swap)
 {
+       struct defer_bridge_add_obj *defer;
+
+       defer = ao2_alloc_options(sizeof(*defer), defer_bridge_add_dtor,
+               AO2_ALLOC_OPT_LOCK_NOLOCK);
+       if (!defer) {
+               return;
+       }
+       ao2_ref(self, +1);
+       defer->bridge = self;
+       if (swap) {
+               ast_channel_ref(swap->chan);
+               defer->swap = swap->chan;
+       }
+
        ast_channel_lock(bridge_channel->chan);
-       command_prestart_queue_command(bridge_channel->chan, add_channel_to_bridge,
-               ao2_bump(self), __ao2_cleanup);
+       command_prestart_queue_command(bridge_channel->chan, defer_bridge_add,
+               defer, __ao2_cleanup);
        ast_channel_unlock(bridge_channel->chan);
 }
 
@@ -179,11 +209,7 @@ static int bridge_stasis_push(struct ast_bridge *self, struct ast_bridge_channel
                        return -1;
                }
 
-               bridge_stasis_queue_join_action(self, bridge_channel);
-               if (swap) {
-                       /* nudge the swap channel out of the bridge */
-                       ast_bridge_channel_leave_bridge(swap, BRIDGE_CHANNEL_STATE_END_NO_DISSOLVE, 0);
-               }
+               bridge_stasis_queue_join_action(self, bridge_channel, swap);
 
                /* Return -1 so the push fails and the after-bridge callback gets called
                 * This keeps the bridging framework from putting the channel into the bridge