Merge changes from team/russell/iax2_frame_queue
authorRussell Bryant <russell@russellbryant.com>
Tue, 27 Nov 2007 23:50:25 +0000 (23:50 +0000)
committerRussell Bryant <russell@russellbryant.com>
Tue, 27 Nov 2007 23:50:25 +0000 (23:50 +0000)
This patch is an optimization for chan_iax2.  This module is now heavily
multi-threaded.  However, there is still a good number of globally shared
resources that prevent things from happen asynchronously.  One of those things
was the global IAX frame queue.  This queue was used to hold frames that have
been deferred for transmitting by another thread, and frames that may need to
get retransmitted.

I changed the frame queue to be per-call, since almost all of the frame queue
handling only cares about frames specific to a call number.

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@89887 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_iax2.c

index f2cb139..60e5237 100644 (file)
@@ -644,7 +644,7 @@ struct chan_iax2_pvt {
  * on module unload.  This is because all active calls are destroyed, and
  * all frames in this queue will get destroyed as a part of that process.
  */
-static AST_LIST_HEAD_STATIC(frame_queue, iax_frame);
+static AST_LIST_HEAD(, iax_frame) frame_queue[IAX_MAX_CALLS];
 
 /*!
  * This module will get much higher performance when doing a lot of
@@ -2125,13 +2125,12 @@ retry:
                        ast_queue_hangup(owner);
                }
 
-               AST_LIST_LOCK(&frame_queue);
-               AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+               AST_LIST_LOCK(&frame_queue[pvt->callno]);
+               AST_LIST_TRAVERSE(&frame_queue[pvt->callno], cur, list) {
                        /* Cancel any pending transmissions */
-                       if (cur->callno == pvt->callno) 
-                               cur->retries = -1;
+                       cur->retries = -1;
                }
-               AST_LIST_UNLOCK(&frame_queue);
+               AST_LIST_UNLOCK(&frame_queue[pvt->callno]);
 
                if (pvt->reg)
                        pvt->reg->callno = 0;
@@ -2241,9 +2240,12 @@ static void __attempt_transmit(const void *data)
        /* Do not try again */
        if (freeme) {
                /* Don't attempt delivery, just remove it from the queue */
-               AST_LIST_LOCK(&frame_queue);
-               AST_LIST_REMOVE(&frame_queue, f, list);
-               AST_LIST_UNLOCK(&frame_queue);
+               if (callno) {
+                       /* XXX Note that there should never be a frame without a callno ... */
+                       AST_LIST_LOCK(&frame_queue[callno]);
+                       AST_LIST_REMOVE(&frame_queue[callno], f, list);
+                       AST_LIST_UNLOCK(&frame_queue[callno]);
+               }
                f->retrans = -1;
                /* Free the IAX frame */
                iax2_frame_free(f);
@@ -2507,7 +2509,7 @@ static char *complete_iax2_show_peer(const char *line, const char *word, int pos
 static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
        struct iax_frame *cur;
-       int cnt = 0, dead = 0, final = 0;
+       int cnt = 0, dead = 0, final = 0, i;
 
        switch (cmd) {
        case CLI_INIT:
@@ -2523,15 +2525,17 @@ static char *handle_cli_iax2_show_stats(struct ast_cli_entry *e, int cmd, struct
        if (a->argc != 3)
                return CLI_SHOWUSAGE;
 
-       AST_LIST_LOCK(&frame_queue);
-       AST_LIST_TRAVERSE(&frame_queue, cur, list) {
-               if (cur->retries < 0)
-                       dead++;
-               if (cur->final)
-                       final++;
-               cnt++;
+       for (i = 0; i < IAX_MAX_CALLS; i++) {
+               AST_LIST_LOCK(&frame_queue[i]);
+               AST_LIST_TRAVERSE(&frame_queue[i], cur, list) {
+                       if (cur->retries < 0)
+                               dead++;
+                       if (cur->final)
+                               final++;
+                       cnt++;
+               }
+               AST_LIST_UNLOCK(&frame_queue[i]);
        }
-       AST_LIST_UNLOCK(&frame_queue);
 
        ast_cli(a->fd, "    IAX Statistics\n");
        ast_cli(a->fd, "---------------------\n");
@@ -2884,9 +2888,9 @@ static int iax2_transmit(struct iax_frame *fr)
        /* By setting this to 0, the network thread will send it for us, and
           queue retransmission if necessary */
        fr->sentyet = 0;
-       AST_LIST_LOCK(&frame_queue);
-       AST_LIST_INSERT_TAIL(&frame_queue, fr, list);
-       AST_LIST_UNLOCK(&frame_queue);
+       AST_LIST_LOCK(&frame_queue[fr->callno]);
+       AST_LIST_INSERT_TAIL(&frame_queue[fr->callno], fr, list);
+       AST_LIST_UNLOCK(&frame_queue[fr->callno]);
        /* Wake up the network and scheduler thread */
        if (netthreadid != AST_PTHREADT_NULL)
                pthread_kill(netthreadid, SIGURG);
@@ -6223,15 +6227,14 @@ static int complete_transfer(int callno, struct iax_ies *ies)
        pvt->lastsent = 0;
        pvt->nextpred = 0;
        pvt->pingtime = DEFAULT_RETRY_TIME;
-       AST_LIST_LOCK(&frame_queue);
-       AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+       AST_LIST_LOCK(&frame_queue[callno]);
+       AST_LIST_TRAVERSE(&frame_queue[callno], cur, list) {
                /* We must cancel any packets that would have been transmitted
                   because now we're talking to someone new.  It's okay, they
                   were transmitted to someone that didn't care anyway. */
-               if (callno == cur->callno) 
-                       cur->retries = -1;
+               cur->retries = -1;
        }
-       AST_LIST_UNLOCK(&frame_queue);
+       AST_LIST_UNLOCK(&frame_queue[callno]);
        return 0; 
 }
 
@@ -6828,8 +6831,8 @@ static void vnak_retransmit(int callno, int last)
 {
        struct iax_frame *f;
 
-       AST_LIST_LOCK(&frame_queue);
-       AST_LIST_TRAVERSE(&frame_queue, f, list) {
+       AST_LIST_LOCK(&frame_queue[callno]);
+       AST_LIST_TRAVERSE(&frame_queue[callno], f, list) {
                /* Send a copy immediately */
                if ((f->callno == callno) && iaxs[f->callno] &&
                        ((unsigned char ) (f->oseqno - last) < 128) &&
@@ -6837,7 +6840,7 @@ static void vnak_retransmit(int callno, int last)
                        send_packet(f);
                }
        }
-       AST_LIST_UNLOCK(&frame_queue);
+       AST_LIST_UNLOCK(&frame_queue[callno]);
 }
 
 static void __iax2_poke_peer_s(const void *data)
@@ -7775,17 +7778,17 @@ static int socket_process(struct iax2_thread *thread)
                                        if (iaxdebug)
                                                ast_debug(1, "Cancelling transmission of packet %d\n", x);
                                        call_to_destroy = 0;
-                                       AST_LIST_LOCK(&frame_queue);
-                                       AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+                                       AST_LIST_LOCK(&frame_queue[fr->callno]);
+                                       AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
                                                /* If it's our call, and our timestamp, mark -1 retries */
-                                               if ((fr->callno == cur->callno) && (x == cur->oseqno)) {
+                                               if (x == cur->oseqno) {
                                                        cur->retries = -1;
                                                        /* Destroy call if this is the end */
                                                        if (cur->final)
                                                                call_to_destroy = fr->callno;
                                                }
                                        }
-                                       AST_LIST_UNLOCK(&frame_queue);
+                                       AST_LIST_UNLOCK(&frame_queue[fr->callno]);
                                        if (call_to_destroy) {
                                                if (iaxdebug)
                                                        ast_debug(1, "Really destroying %d, having been acked on final message\n", call_to_destroy);
@@ -7995,13 +7998,13 @@ retryowner:
                        case IAX_COMMAND_TXACC:
                                if (iaxs[fr->callno]->transferring == TRANSFER_BEGIN) {
                                        /* Ack the packet with the given timestamp */
-                                       AST_LIST_LOCK(&frame_queue);
-                                       AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+                                       AST_LIST_LOCK(&frame_queue[fr->callno]);
+                                       AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
                                                /* Cancel any outstanding txcnt's */
-                                               if ((fr->callno == cur->callno) && (cur->transfer))
+                                               if (cur->transfer)
                                                        cur->retries = -1;
                                        }
-                                       AST_LIST_UNLOCK(&frame_queue);
+                                       AST_LIST_UNLOCK(&frame_queue[fr->callno]);
                                        memset(&ied1, 0, sizeof(ied1));
                                        iax_ie_append_short(&ied1, IAX_IE_CALLNO, iaxs[fr->callno]->callno);
                                        send_command(iaxs[fr->callno], AST_FRAME_IAX, IAX_COMMAND_TXREADY, 0, ied1.buf, ied1.pos, -1);
@@ -8868,13 +8871,13 @@ retryowner2:
                                break;  
                        case IAX_COMMAND_TXMEDIA:
                                if (iaxs[fr->callno]->transferring == TRANSFER_READY) {
-                                       AST_LIST_LOCK(&frame_queue);
-                                       AST_LIST_TRAVERSE(&frame_queue, cur, list) {
+                                       AST_LIST_LOCK(&frame_queue[fr->callno]);
+                                       AST_LIST_TRAVERSE(&frame_queue[fr->callno], cur, list) {
                                                /* Cancel any outstanding frames and start anew */
-                                               if ((fr->callno == cur->callno) && (cur->transfer))
+                                               if (cur->transfer)
                                                        cur->retries = -1;
                                        }
-                                       AST_LIST_UNLOCK(&frame_queue);
+                                       AST_LIST_UNLOCK(&frame_queue[fr->callno]);
                                        /* Start sending our media to the transfer address, but otherwise leave the call as-is */
                                        iaxs[fr->callno]->transferring = TRANSFER_MEDIAPASS;
                                }
@@ -9551,25 +9554,17 @@ static void *sched_thread(void *ignore)
        return NULL;
 }
 
-static void *network_thread(void *ignore)
+static int transmit_queued_frames(void)
 {
-       /* Our job is simple: Send queued messages, retrying if necessary.  Read frames 
-          from the network, and queue them for delivery to the channels */
-       int res, count, wakeup;
+       int wakeup = -1, count = 0, i;
        struct iax_frame *f;
 
-       if (timingfd > -1)
-               ast_io_add(io, timingfd, timing_read, AST_IO_IN | AST_IO_PRI, NULL);
-       
-       for(;;) {
-               pthread_testcancel();
-
-               /* Go through the queue, sending messages which have not yet been
-                  sent, and scheduling retransmissions if appropriate */
-               AST_LIST_LOCK(&frame_queue);
-               count = 0;
-               wakeup = -1;
-               AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue, f, list) {
+       for (i = 0; i < IAX_MAX_CALLS; i++) {
+               /* As an optimization, only lock this frame queue if it is non-empty. */
+               if (AST_LIST_EMPTY(&frame_queue[i]))
+                       continue;
+               AST_LIST_LOCK(&frame_queue[i]);
+               AST_LIST_TRAVERSE_SAFE_BEGIN(&frame_queue[i], f, list) {
                        if (f->sentyet)
                                continue;
                        
@@ -9578,16 +9573,16 @@ static void *network_thread(void *ignore)
                                wakeup = 1;
                                continue;
                        }
-
+       
                        f->sentyet = 1;
-
+       
                        if (iaxs[f->callno]) {
                                send_packet(f);
                                count++;
                        } 
-
+       
                        ast_mutex_unlock(&iaxsl[f->callno]);
-
+       
                        if (f->retries < 0) {
                                /* This is not supposed to be retransmitted */
                                AST_LIST_REMOVE_CURRENT(list);
@@ -9600,11 +9595,33 @@ static void *network_thread(void *ignore)
                        }
                }
                AST_LIST_TRAVERSE_SAFE_END;
-               AST_LIST_UNLOCK(&frame_queue);
+               AST_LIST_UNLOCK(&frame_queue[i]);
+       }
+
+       if (count >= 20)
+               ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
+
+       return wakeup;
+}
+
+
+static void *network_thread(void *ignore)
+{
+       /* Our job is simple: Send queued messages, retrying if necessary.  Read frames 
+          from the network, and queue them for delivery to the channels */
+       int res, wakeup;
+
+       if (timingfd > -1)
+               ast_io_add(io, timingfd, timing_read, AST_IO_IN | AST_IO_PRI, NULL);
+       
+       for(;;) {
+               pthread_testcancel();
+
+               /* Go through the queue, sending messages which have not yet been
+                  sent, and scheduling retransmissions if appropriate */
+               wakeup = transmit_queued_frames();
 
                pthread_testcancel();
-               if (count >= 20)
-                       ast_debug(1, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
 
                /* Now do the IO, and run scheduled tasks */
                res = ast_io_wait(io, wakeup);
@@ -11455,12 +11472,10 @@ static int __unload_module(void)
        /* Grab the sched lock resource to keep it away from threads about to die */
        /* Cancel the network thread, close the net socket */
        if (netthreadid != AST_PTHREADT_NULL) {
-               AST_LIST_LOCK(&frame_queue);
                ast_mutex_lock(&sched_lock);
                pthread_cancel(netthreadid);
                ast_cond_signal(&sched_cond);
                ast_mutex_unlock(&sched_lock);  /* Release the schedule lock resource */
-               AST_LIST_UNLOCK(&frame_queue);
                pthread_join(netthreadid, NULL);
        }
        if (schedthreadid != AST_PTHREADT_NULL) {
@@ -11509,8 +11524,10 @@ static int __unload_module(void)
        sched_context_destroy(sched);
        reload_firmware(1);
 
-       for (x = 0; x < IAX_MAX_CALLS; x++)
+       for (x = 0; x < IAX_MAX_CALLS; x++) {
                ast_mutex_destroy(&iaxsl[x]);
+               AST_LIST_HEAD_DESTROY(&frame_queue[x]);
+       }
 
        ao2_ref(peers, -1);
        ao2_ref(users, -1);
@@ -11574,8 +11591,10 @@ static int load_module(void)
 
        memset(iaxs, 0, sizeof(iaxs));
 
-       for (x=0;x<IAX_MAX_CALLS;x++)
+       for (x = 0; x < IAX_MAX_CALLS; x++) {
                ast_mutex_init(&iaxsl[x]);
+               AST_LIST_HEAD_INIT(&frame_queue[x]);
+       }
 
        ast_cond_init(&sched_cond, NULL);