Extreme IAX2 trunking performance improvements
authorMark Spencer <markster@digium.com>
Tue, 27 Apr 2004 15:18:55 +0000 (15:18 +0000)
committerMark Spencer <markster@digium.com>
Tue, 27 Apr 2004 15:18:55 +0000 (15:18 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@2783 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_iax2.c

index 30ebb5e..9b97481 100755 (executable)
@@ -230,10 +230,6 @@ struct iax2_peer {
        int delme;                                              /* I need to be deleted */
        int temponly;                                   /* I'm only a temp */
        int trunk;                                              /* Treat as an IAX trunking */
-       struct timeval txtrunktime;             /* Transmit trunktime */
-       struct timeval rxtrunktime;             /* Receive trunktime */
-       struct timeval lasttxtime;              /* Last transmitted trunktime */
-       unsigned int lastsent;                  /* Last sent time */
 
        /* Qualification */
        int callno;                                     /* Call number of POKE request */
@@ -246,6 +242,28 @@ struct iax2_peer {
        int notransfer;
 };
 
+#define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
+
+static struct iax2_trunk_peer {
+       ast_mutex_t lock;
+       struct sockaddr_in addr;
+       struct timeval txtrunktime;             /* Transmit trunktime */
+       struct timeval rxtrunktime;             /* Receive trunktime */
+       struct timeval lasttxtime;              /* Last transmitted trunktime */
+       struct timeval trunkact;                /* Last trunk activity */
+       unsigned int lastsent;                  /* Last sent time */
+       /* Trunk data and length */
+       unsigned char *trunkdata;
+       unsigned int trunkdatalen;
+       unsigned int trunkdataalloc;
+       struct iax2_trunk_peer *next;
+       int trunkerror;
+       int calls;
+       int firstcallno;
+} *tpeers = NULL;
+
+static ast_mutex_t tpeerlock = AST_MUTEX_INITIALIZER;
+
 struct iax_firmware {
        struct iax_firmware *next;
        int fd;
@@ -290,7 +308,8 @@ static struct iax2_registry *registrations;
 #define MAX_RETRY_TIME  10000
 #define MAX_JITTER_BUFFER 50
 
-#define MAX_TRUNKDATA  640             /* 40ms, uncompressed linear */
+#define DEFAULT_TRUNKDATA      640 * 10                /* 40ms, uncompressed linear * 10 channels */
+#define MAX_TRUNKDATA          640 * 200               /* 40ms, uncompressed linear * 200 channels */
 
 /* If we have more than this much excess real jitter buffer, srhink it. */
 static int max_jitter_buffer = MAX_JITTER_BUFFER;
@@ -426,10 +445,6 @@ struct chan_iax2_pvt {
        int amaflags;
        /* This is part of a trunk interface */
        int trunk;
-       /* Trunk data and length */
-       unsigned char trunkdata[MAX_TRUNKDATA];
-       unsigned int trunkdatalen;
-       int trunkerror;
        struct iax2_dpcache *dpentries;
        int notransfer;         /* do we want native bridging */
 };
@@ -2530,39 +2545,38 @@ static struct ast_channel *ast_iax2_new(struct chan_iax2_pvt *i, int state, int
        return tmp;
 }
 
-static unsigned int calc_txpeerstamp(struct iax2_peer *peer, int sampms)
+static unsigned int calc_txpeerstamp(struct iax2_trunk_peer *tpeer, int sampms, struct timeval *tv)
 {
-       struct timeval tv;
        long int mssincetx;
        long int ms, pred;
 
-       gettimeofday(&tv, NULL);
-       mssincetx = (tv.tv_sec - peer->lasttxtime.tv_sec) * 1000 + (tv.tv_usec - peer->lasttxtime.tv_usec) / 1000;
+       tpeer->trunkact = *tv;
+       mssincetx = (tv->tv_sec - tpeer->lasttxtime.tv_sec) * 1000 + (tv->tv_usec - tpeer->lasttxtime.tv_usec) / 1000;
        if (mssincetx > 5000) {
                /* If it's been at least 5 seconds since the last time we transmitted on this trunk, reset our timers */
-               peer->txtrunktime.tv_sec = tv.tv_sec;
-               peer->txtrunktime.tv_usec = tv.tv_usec;
-               peer->lastsent = 999999;
+               tpeer->txtrunktime.tv_sec = tv->tv_sec;
+               tpeer->txtrunktime.tv_usec = tv->tv_usec;
+               tpeer->lastsent = 999999;
        }
        /* Update last transmit time now */
-       peer->lasttxtime.tv_sec = tv.tv_sec;
-       peer->lasttxtime.tv_usec = tv.tv_usec;
+       tpeer->lasttxtime.tv_sec = tv->tv_sec;
+       tpeer->lasttxtime.tv_usec = tv->tv_usec;
        
        /* Calculate ms offset */
-       ms = (tv.tv_sec - peer->txtrunktime.tv_sec) * 1000 + (tv.tv_usec - peer->txtrunktime.tv_usec) / 1000;
+       ms = (tv->tv_sec - tpeer->txtrunktime.tv_sec) * 1000 + (tv->tv_usec - tpeer->txtrunktime.tv_usec) / 1000;
        /* Predict from last value */
-       pred = peer->lastsent + sampms;
+       pred = tpeer->lastsent + sampms;
        if (abs(ms - pred) < 640)
                ms = pred;
        
        /* We never send the same timestamp twice, so fudge a little if we must */
-       if (ms == peer->lastsent)
-               ms = peer->lastsent + 1;
-       peer->lastsent = ms;
+       if (ms == tpeer->lastsent)
+               ms = tpeer->lastsent + 1;
+       tpeer->lastsent = ms;
        return ms;
 }
 
-static unsigned int fix_peerts(struct iax2_peer *peer, int callno, unsigned int ts)
+static unsigned int fix_peerts(struct iax2_trunk_peer *peer, int callno, unsigned int ts)
 {
        long ms;        /* NOT unsigned */
        if (!iaxs[callno]->rxcore.tv_sec && !iaxs[callno]->rxcore.tv_usec) {
@@ -2677,6 +2691,85 @@ static unsigned int calc_rxstamp(struct chan_iax2_pvt *p)
        return ms;
 }
 
+struct iax2_trunk_peer *find_tpeer(struct sockaddr_in *sin)
+{
+       struct iax2_trunk_peer *tpeer;
+       /* Finds and locks trunk peer */
+       ast_mutex_lock(&tpeerlock);
+       tpeer = tpeers;
+       while(tpeer) {
+               /* We don't lock here because tpeer->addr *never* changes */
+               if (!inaddrcmp(&tpeer->addr, sin)) {
+                       ast_mutex_lock(&tpeer->lock);
+                       break;
+               }
+               tpeer = tpeer->next;
+       }
+       if (!tpeer) {
+               tpeer = malloc(sizeof(struct iax2_trunk_peer));
+               if (tpeer) {
+                       memset(tpeer, 0, sizeof(struct iax2_trunk_peer));
+                       ast_mutex_init(&tpeer->lock);
+                       tpeer->lastsent = 9999;
+                       memcpy(&tpeer->addr, sin, sizeof(tpeer->addr));
+                       gettimeofday(&tpeer->trunkact, NULL);
+                       ast_mutex_lock(&tpeer->lock);
+                       tpeer->next = tpeers;
+                       tpeers = tpeer;
+                       ast_log(LOG_DEBUG, "Created trunk peer for '%s:%d'\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+               }
+       }
+       ast_mutex_unlock(&tpeerlock);
+       return tpeer;
+}
+
+static int iax2_trunk_queue(struct chan_iax2_pvt *pvt, struct ast_frame *f)
+{
+       struct iax2_trunk_peer *tpeer;
+       void *tmp, *ptr;
+       struct ast_iax2_meta_trunk_entry *met;
+       tpeer = find_tpeer(&pvt->addr);
+       if (tpeer) {
+               if (tpeer->trunkdatalen + f->datalen + 4 >= tpeer->trunkdataalloc) {
+                       /* Need to reallocate space */
+                       if (tpeer->trunkdataalloc < MAX_TRUNKDATA) {
+                               tmp = realloc(tpeer->trunkdata, tpeer->trunkdataalloc + DEFAULT_TRUNKDATA + IAX2_TRUNK_PREFACE);
+                               if (tmp) {
+                                       tpeer->trunkdataalloc += DEFAULT_TRUNKDATA;
+                                       tpeer->trunkdata = tmp;
+                                       ast_log(LOG_DEBUG, "Expanded trunk '%s:%d' to %d bytes\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), tpeer->trunkdataalloc);
+                               } else {
+                                       ast_log(LOG_WARNING, "Insufficient memory to expand trunk data to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+                                       ast_mutex_unlock(&tpeer->lock);
+                                       return -1;
+                               }
+                       } else {
+                               ast_log(LOG_WARNING, "Maximum trunk data space exceeded to %s:%d\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port));
+                               ast_mutex_unlock(&tpeer->lock);
+                               return -1;
+                       }
+               }
+               
+               /* Append to meta frame */
+               ptr = tpeer->trunkdata + IAX2_TRUNK_PREFACE + tpeer->trunkdatalen;
+               met = (struct ast_iax2_meta_trunk_entry *)ptr;
+               /* Store call number and length in meta header */
+               met->callno = htons(pvt->callno);
+               met->len = htons(f->datalen);
+               /* Advance pointers/decrease length past trunk entry header */
+               ptr += sizeof(struct ast_iax2_meta_trunk_entry);
+               tpeer->trunkdatalen += sizeof(struct ast_iax2_meta_trunk_entry);
+               /* Copy actual trunk data */
+               memcpy(ptr, f->data, f->datalen);
+               tpeer->trunkdatalen += f->datalen;
+               if (!tpeer->firstcallno)
+                       tpeer->firstcallno = pvt->callno;
+               tpeer->calls++;
+               ast_mutex_unlock(&tpeer->lock);
+       }
+       return 0;
+}
+
 static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned int ts, int seqno, int now, int transfer, int final)
 {
        /* Queue a packet for delivery on a given private structure.  Use "ts" for
@@ -2794,17 +2887,7 @@ static int iax2_send(struct chan_iax2_pvt *pvt, struct ast_frame *f, unsigned in
                        res = iax2_transmit(fr);
        } else {
                if (pvt->trunk) {
-                       /* Queue for transmission in a meta frame */
-                       if ((sizeof(pvt->trunkdata) - pvt->trunkdatalen) >= fr->af.datalen) {
-                               memcpy(pvt->trunkdata + pvt->trunkdatalen, fr->af.data, fr->af.datalen);
-                               pvt->trunkdatalen += fr->af.datalen;
-                               res = 0;
-                               pvt->trunkerror = 0;
-                       } else {
-                               if (!pvt->trunkerror)
-                                       ast_log(LOG_WARNING, "Out of trunk data space on call number %d, dropping\n", pvt->callno);
-                               pvt->trunkerror = 1;
-                       }
+                       iax2_trunk_queue(pvt, &fr->af);
                        res = 0;
                } else if (fr->af.frametype == AST_FRAME_VIDEO) {
                        /* Video frame have no sequence number */
@@ -4192,96 +4275,70 @@ static int iax2_poke_peer_s(void *data)
        return 0;
 }
 
-static int send_trunk(struct iax2_peer *peer)
+static int send_trunk(struct iax2_trunk_peer *tpeer, struct timeval *now)
 {
-       int x;
-       int calls = 0;
        int res = 0;
-       int firstcall = 0;
-       unsigned char buf[65536 + sizeof(struct iax_frame)], *ptr;
-       int len = 65536;
        struct iax_frame *fr;
        struct ast_iax2_meta_hdr *meta;
        struct ast_iax2_meta_trunk_hdr *mth;
-       struct ast_iax2_meta_trunk_entry *met;
+       int calls = 0;
        
        /* Point to frame */
-       fr = (struct iax_frame *)buf;
+       fr = (struct iax_frame *)tpeer->trunkdata;
        /* Point to meta data */
        meta = (struct ast_iax2_meta_hdr *)fr->afdata;
        mth = (struct ast_iax2_meta_trunk_hdr *)meta->data;
-       /* Point past meta data for first meta trunk entry */
-       ptr = fr->afdata + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
-       len -= sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
-       
-       /* Search through trunked calls for a match with this peer */
-       for (x=TRUNK_CALL_START;x<maxtrunkcall; x++) {
-               ast_mutex_lock(&iaxsl[x]);
-#if 0
-               if (iaxtrunkdebug)
-                       ast_verbose("Call %d is at %s:%d (%d)\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port), iaxs[x]->addr.sin_family);
-#endif
-               if (iaxs[x] && iaxs[x]->trunk && iaxs[x]->trunkdatalen && !inaddrcmp(&iaxs[x]->addr, &peer->addr)) {
-                       if (iaxtrunkdebug)
-                               ast_verbose(" -- Sending call %d via trunk to %s:%d\n", x, inet_ntoa(iaxs[x]->addr.sin_addr), ntohs(iaxs[x]->addr.sin_port));
-                       if (len >= iaxs[x]->trunkdatalen + sizeof(struct ast_iax2_meta_trunk_entry)) {
-                               met = (struct ast_iax2_meta_trunk_entry *)ptr;
-                               /* Store call number and length in meta header */
-                               met->callno = htons(x);
-                               met->len = htons(iaxs[x]->trunkdatalen);
-                               /* Advance pointers/decrease length past trunk entry header */
-                               ptr += sizeof(struct ast_iax2_meta_trunk_entry);
-                               len -= sizeof(struct ast_iax2_meta_trunk_entry);
-                               /* Copy actual trunk data */
-                               memcpy(ptr, iaxs[x]->trunkdata, iaxs[x]->trunkdatalen);
-                               /* Advance pointeres/decrease length for actual data */
-                               ptr += iaxs[x]->trunkdatalen;
-                               len -= iaxs[x]->trunkdatalen;
-                       } else 
-                               ast_log(LOG_WARNING, "Out of space in frame for trunking call %d\n", x);
-                       iaxs[x]->trunkdatalen = 0;
-                       calls++;
-                       if (!firstcall)
-                               firstcall = x;
-               }
-               ast_mutex_unlock(&iaxsl[x]);
-       }
-       if (calls) {
+       if (tpeer->trunkdatalen) {
                /* We're actually sending a frame, so fill the meta trunk header and meta header */
                meta->zeros = 0;
                meta->metacmd = IAX_META_TRUNK;
                meta->cmddata = 0;
-               mth->ts = htonl(calc_txpeerstamp(peer, trunkfreq));
+               mth->ts = htonl(calc_txpeerstamp(tpeer, trunkfreq, now));
                /* And the rest of the ast_iax2 header */
                fr->direction = DIRECTION_OUTGRESS;
                fr->retrans = -1;
                fr->transfer = 0;
                /* Any appropriate call will do */
-               fr->callno = firstcall;
+               fr->callno = tpeer->firstcallno;
                fr->data = fr->afdata;
-               fr->datalen = 65536 - len;
+               fr->datalen = tpeer->trunkdatalen + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr);
 #if 0
                ast_log(LOG_DEBUG, "Trunking %d calls in %d bytes, ts=%d\n", calls, fr->datalen, ntohl(mth->ts));
 #endif         
                res = send_packet(fr);
+               calls = tpeer->calls;
+               /* Reset transmit trunk side data */
+               tpeer->trunkdatalen = 0;
+               tpeer->calls = 0;
+               tpeer->firstcallno = 0;
        }
        if (res < 0)
                return res;
        return calls;
 }
 
+static inline int iax2_trunk_expired(struct iax2_trunk_peer *tpeer, struct timeval *now)
+{
+       /* Drop when trunk is about 5 seconds idle */
+       if (now->tv_sec > tpeer->trunkact.tv_sec + 5) 
+               return 1;
+       return 0;
+}
+
 static int timing_read(int *id, int fd, short events, void *cbdata)
 {
        char buf[1024];
        int res;
-       struct iax2_peer *peer;
+       struct iax2_trunk_peer *tpeer, *prev = NULL, *drop=NULL;
        int processed = 0;
        int totalcalls = 0;
 #ifdef ZT_TIMERACK
        int x = 1;
 #endif
+       struct timeval now;
        if (iaxtrunkdebug)
                ast_verbose("Beginning trunk processing\n");
+       gettimeofday(&now, NULL);
        if (events & AST_IO_PRI) {
 #ifdef ZT_TIMERACK
                /* Great, this is a timing interface, just call the ioctl */
@@ -4299,20 +4356,43 @@ static int timing_read(int *id, int fd, short events, void *cbdata)
                }
        }
        /* For each peer that supports trunking... */
-       ast_mutex_lock(&peerl.lock);
-       peer = peerl.peers;
-       while(peer) {
-               if (peer->trunk) {
-                       processed++;
-                       res = send_trunk(peer);
+       ast_mutex_lock(&tpeerlock);
+       tpeer = tpeers;
+       while(tpeer) {
+               processed++;
+               ast_mutex_lock(&tpeer->lock);
+               /* We can drop a single tpeer per pass.  That makes all this logic
+                  substantially easier */
+               if (!drop && iax2_trunk_expired(tpeer, &now)) {
+                       /* Take it out of the list, but don't free it yet, because it
+                          could be in use */
+                       if (prev)
+                               prev->next = tpeer->next;
+                       else
+                               tpeers = tpeer->next;
+                       drop = tpeer;
+               } else {
+                       res = send_trunk(tpeer, &now);
                        if (iaxtrunkdebug)
-                               ast_verbose("Processed trunk peer '%s' (%s:%d) with %d call(s)\n", peer->name, inet_ntoa(peer->addr.sin_addr), ntohs(peer->addr.sin_port), res);
-                       totalcalls += res;      
-                       res = 0;
-               }
-               peer = peer->next;
+                               ast_verbose("Processed trunk peer (%s:%d) with %d call(s)\n", inet_ntoa(tpeer->addr.sin_addr), ntohs(tpeer->addr.sin_port), res);
+               }               
+               totalcalls += res;      
+               res = 0;
+               ast_mutex_unlock(&tpeer->lock);
+               prev = tpeer;
+               tpeer = tpeer->next;
+       }
+       ast_mutex_unlock(&tpeerlock);
+       if (drop) {
+               ast_mutex_lock(&drop->lock);
+               /* Once we have this lock, we're sure nobody else is using it or could use it once we release it, 
+                  because by the time they could get tpeerlock, we've already grabbed it */
+               ast_log(LOG_DEBUG, "Dropping unused iax2 trunk peer '%s:%d'\n", inet_ntoa(drop->addr.sin_addr), ntohs(drop->addr.sin_port));
+               free(drop->trunkdata);
+               ast_mutex_unlock(&drop->lock);
+               free(drop);
+               
        }
-       ast_mutex_unlock(&peerl.lock);
        if (iaxtrunkdebug)
                ast_verbose("Ending trunk processing with %d peers and %d calls processed\n", processed, totalcalls);
        iaxtrunkdebug =0;
@@ -4466,6 +4546,7 @@ static int iax_park(struct ast_channel *chan1, struct ast_channel *chan2)
        return -1;
 }
 
+
 static int socket_read(int *id, int fd, short events, void *cbdata)
 {
        struct sockaddr_in sin;
@@ -4488,6 +4569,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
        struct ast_channel *c;
        struct iax2_dpcache *dp;
        struct iax2_peer *peer;
+       struct iax2_trunk_peer *tpeer;
        struct iax_ies ies;
        struct iax_ie_data ied0, ied1;
        int format;
@@ -4526,21 +4608,16 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
                        ts = ntohl(mth->ts);
                        res -= (sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr));
                        ptr = mth->data;
-                       ast_mutex_lock(&peerl.lock);
-                       peer = peerl.peers;
-                       while(peer) {
-                               if (!inaddrcmp(&peer->addr, &sin))
-                                       break;
-                               peer = peer->next;
-                       }
-                       ast_mutex_unlock(&peerl.lock);
-                       if (!peer) {
+                       tpeer = find_tpeer(&sin);
+                       if (!tpeer) {
                                ast_log(LOG_WARNING, "Unable to accept trunked packet from '%s:%d': No matching peer\n", inet_ntoa(sin.sin_addr), ntohs(sin.sin_port));
                                return 1;
                        }
-                       if (!ts || (!peer->rxtrunktime.tv_sec && !peer->rxtrunktime.tv_usec)) {
-                               gettimeofday(&peer->rxtrunktime, NULL);
-                       }
+                       if (!ts || (!tpeer->rxtrunktime.tv_sec && !tpeer->rxtrunktime.tv_usec)) {
+                               gettimeofday(&tpeer->rxtrunktime, NULL);
+                               tpeer->trunkact = tpeer->rxtrunktime;
+                       } else
+                               gettimeofday(&tpeer->trunkact, NULL);
                        while(res >= sizeof(struct ast_iax2_meta_trunk_entry)) {
                                /* Process channels */
                                mte = (struct ast_iax2_meta_trunk_entry *)ptr;
@@ -4566,7 +4643,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
                                                                        f.data = ptr;
                                                                else
                                                                        f.data = NULL;
-                                                               fr.ts = fix_peerts(peer, fr.callno, ts);
+                                                               fr.ts = fix_peerts(tpeer, fr.callno, ts);
                                                                /* Don't pass any packets until we're started */
                                                                if ((iaxs[fr.callno]->state & IAX_STATE_STARTED)) {
                                                                        /* Common things */
@@ -4602,6 +4679,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
                                ptr += len;
                                res -= len;
                        }
+                       ast_mutex_unlock(&tpeer->lock);
                        
                }
                return 1;
@@ -5746,7 +5824,6 @@ static struct iax2_peer *build_peer(char *name, struct ast_variable *v)
                memset(peer, 0, sizeof(struct iax2_peer));
                peer->expire = -1;
                peer->pokeexpire = -1;
-               peer->lastsent = 999999;
        }
        if (peer) {
                if (!found) {