Make IAX2 multithreaded
authorMark Spencer <markster@digium.com>
Sun, 26 Feb 2006 20:27:14 +0000 (20:27 +0000)
committerMark Spencer <markster@digium.com>
Sun, 26 Feb 2006 20:27:14 +0000 (20:27 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@11192 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_iax2.c
configs/iax.conf.sample

index f6a1977..e5f52ca 100644 (file)
@@ -100,6 +100,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
  * otherwise, use the old jitterbuffer */
 #define NEWJB
 
+/* Define SCHED_MULTITHREADED to run the scheduler in a special
+   multithreaded mode. */
+#define SCHED_MULTITHREADED
+
 #ifdef NEWJB
 #include "../jitterbuf.h"
 #endif
@@ -124,6 +128,7 @@ static int nochecksums = 0;
 #define PTR_TO_CALLNO(a) ((unsigned short)(unsigned long)(a))
 #define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
 
+#define DEFAULT_THREAD_COUNT 10
 #define DEFAULT_RETRY_TIME 1000
 #define MEMORY_SIZE 100
 #define DEFAULT_DROP 3
@@ -230,6 +235,7 @@ static int iax2_encryption = 0;
 static struct ast_flags globalflags = { 0 };
 
 static pthread_t netthreadid = AST_PTHREADT_NULL;
+static pthread_t schedthreadid = AST_PTHREADT_NULL;
 
 enum {
        IAX_STATE_STARTED =             (1 << 0),
@@ -429,6 +435,8 @@ static int max_jitter_buffer = MAX_JITTER_BUFFER;
 /* If we have less than this much excess real jitter buffer, enlarge it. */
 static int min_jitter_buffer = MIN_JITTER_BUFFER;
 
+static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+
 struct iax_rr {
        int jitter;
        int losspct;
@@ -660,6 +668,35 @@ static struct iax2_peer *realtime_peer(const char *peername, struct sockaddr_in
 static void destroy_peer(struct iax2_peer *peer);
 static int ast_cli_netstats(int fd, int limit_fmt);
 
+#define IAX_IOSTATE_IDLE               0
+#define IAX_IOSTATE_READY              1
+#define IAX_IOSTATE_PROCESSING 2
+#define IAX_IOSTATE_SCHEDREADY 3
+
+struct iax2_thread {
+       ASTOBJ_COMPONENTS(struct iax2_thread);
+       int iostate;
+#ifdef SCHED_MULTITHREADED
+       void (*schedfunc)(void *);
+       void *scheddata;
+#endif
+       int actions;
+       int halt;
+       pthread_t threadid;
+       int threadnum;
+       struct sockaddr_in iosin;
+       unsigned char buf[4096]; 
+       int iores;
+       int iofd;
+       time_t checktime;
+};
+
+struct iax2_thread_list {
+       ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
+};
+
+static struct iax2_thread_list idlelist, activelist;
+
 static void iax_debug_output(const char *data)
 {
        if (iaxdebug)
@@ -771,18 +808,58 @@ static const struct ast_channel_tech iax2_tech = {
        .fixup = iax2_fixup,
 };
 
+static struct iax2_thread *find_idle_thread(void)
+{
+       struct iax2_thread *thread;
+       thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+       return thread;
+}
+
+#ifdef SCHED_MULTITHREADED
+static int schedule_action(void (*func)(void *data), void *data)
+{
+       struct iax2_thread *thread;
+       static time_t lasterror;
+       static time_t t;
+       thread = find_idle_thread();
+       if (thread) {
+               thread->schedfunc = func;
+               thread->scheddata = data;
+               thread->iostate = IAX_IOSTATE_SCHEDREADY;
+               pthread_kill(thread->threadid, SIGURG);
+               return 0;
+       }
+       time(&t);
+       if (t != lasterror) 
+               ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
+       lasterror = t;
+       return -1;
+}
+#endif
+
+static void __send_ping(void *data)
+{
+       int callno = (long)data;
+       send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+}
+
 static int send_ping(void *data)
 {
        int callno = (long)data;
-       /* Ping only if it's real, not if it's bridged */
        if (iaxs[callno]) {
 #ifdef BRIDGE_OPTIMIZATION
-               if (!iaxs[callno]->bridgecallno)
+               if (!iaxs[callno]->bridgecallno) 
 #endif
-                       send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_PING, 0, NULL, 0, -1);
+               {               
+#ifdef SCHED_MULTITHREADED
+                       if (schedule_action(__send_ping, data))
+#endif         
+                               __send_ping(data);
+               }
                return 1;
        } else
                return 0;
+       return 0;
 }
 
 static int get_encrypt_methods(const char *s)
@@ -797,18 +874,30 @@ static int get_encrypt_methods(const char *s)
        return e;
 }
 
-static int send_lagrq(void *data)
+static void __send_lagrq(void *data)
 {
        int callno = (long)data;
        /* Ping only if it's real not if it's bridged */
+       send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+}
+
+static int send_lagrq(void *data)
+{
+       int callno = (long)data;
        if (iaxs[callno]) {
 #ifdef BRIDGE_OPTIMIZATION
-               if (!iaxs[callno]->bridgecallno)
+               if (!iaxs[callno]->bridgecallno) 
+#endif
+               {               
+#ifdef SCHED_MULTITHREADED
+                       if (schedule_action(__send_lagrq, data))
 #endif         
-                       send_command(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_LAGRQ, 0, NULL, 0, -1);
+                               __send_lagrq(data);
+               }
                return 1;
        } else
                return 0;
+       return 0;
 }
 
 static unsigned char compress_subclass(int subclass)
@@ -1422,7 +1511,7 @@ static int __do_deliver(void *data)
 }
 
 #ifndef NEWJB
-static int do_deliver(void *data)
+static int __real_do_deliver(void *data)
 {
        /* Locking version of __do_deliver */
        struct iax_frame *fr = data;
@@ -1433,6 +1522,14 @@ static int do_deliver(void *data)
        ast_mutex_unlock(&iaxsl[callno]);
        return res;
 }
+static int do_deliver(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__do_deliver, data))
+#endif         
+               __real_do_deliver(data);
+       return 0;
+}
 #endif /* NEWJB */
 
 static int handle_error(void)
@@ -1693,7 +1790,8 @@ static int update_packet(struct iax_frame *f)
        return 0;
 }
 
-static int attempt_transmit(void *data)
+static int attempt_transmit(void *data);
+static void __attempt_transmit(void *data)
 {
        /* Attempt to transmit the frame to the remote peer...
           Called without iaxsl held. */
@@ -1780,6 +1878,14 @@ static int attempt_transmit(void *data)
                /* Free the IAX frame */
                iax2_frame_free(f);
        }
+}
+
+static int attempt_transmit(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__attempt_transmit, data))
+#endif         
+               __attempt_transmit(data);
        return 0;
 }
 
@@ -2166,9 +2272,10 @@ static void update_jbsched(struct chan_iax2_pvt *pvt) {
     }
 
     pvt->jbid = ast_sched_add(sched, when, get_from_jb, (void *)pvt);
+       pthread_kill(schedthreadid, SIGURG);
 }
 
-static int get_from_jb(void *p) 
+static void __get_from_jb(void *p) 
 {
        /* make sure pvt is valid! */   
     struct chan_iax2_pvt *pvt = p;
@@ -2238,7 +2345,15 @@ static int get_from_jb(void *p)
     }
     update_jbsched(pvt);
     ast_mutex_unlock(&iaxsl[pvt->callno]);
-    return 0;
+}
+
+static int get_from_jb(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__get_from_jb, data))
+#endif         
+               __get_from_jb(data);
+       return 0;
 }
 #endif
 
@@ -2491,6 +2606,7 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr
                if (option_debug && iaxdebug)
                        ast_log(LOG_DEBUG, "schedule_delivery: Scheduling delivery in %d ms\n", delay);
                fr->retrans = ast_sched_add(sched, delay, do_deliver, fr);
+               pthread_kill(schedthreadid, SIGURG);
        }
 #endif
        if (tsout)
@@ -2524,8 +2640,9 @@ static int iax2_transmit(struct iax_frame *fr)
        }
        iaxq.count++;
        ast_mutex_unlock(&iaxq.lock);
-       /* Wake up the network thread */
+       /* Wake up the network and scheduler thread */
        pthread_kill(netthreadid, SIGURG);
+       pthread_kill(schedthreadid, SIGURG);
        return 0;
 }
 
@@ -2827,7 +2944,7 @@ static int create_addr(const char *peername, struct sockaddr_in *sin, struct cre
        return 0;
 }
 
-static int auto_congest(void *nothing)
+static void __auto_congest(void *nothing)
 {
        int callno = PTR_TO_CALLNO(nothing);
        struct ast_frame f = { AST_FRAME_CONTROL, AST_CONTROL_CONGESTION };
@@ -2838,6 +2955,14 @@ static int auto_congest(void *nothing)
                ast_log(LOG_NOTICE, "Auto-congesting call due to slow response\n");
        }
        ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_congest(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__auto_congest, data))
+#endif         
+               __auto_congest(data);
        return 0;
 }
 
@@ -4280,6 +4405,31 @@ static int __iax2_show_peers(int manager, int fd, int argc, char *argv[])
 #undef FORMAT2
 }
 
+static int iax2_show_threads(int fd, int argc, char *argv[])
+{
+       time_t t;
+       int threadcount = 0;
+       if (argc != 3)
+               return RESULT_SHOWUSAGE;
+               
+       ast_cli(fd, "IAX2 Thread Information\n");
+       time(&t);
+       ast_cli(fd, "Idle Threads:\n");
+       ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+               ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n", 
+                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+               threadcount++;
+       });
+       ast_cli(fd, "Active Threads:\n");
+       ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+               ast_cli(fd, "Thread %d: state %d, last update: %d seconds ago, %d actions handled, refcnt = %d\n", 
+                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+               threadcount++;
+       });
+       ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+       return RESULT_SUCCESS;
+}
+
 static int iax2_show_peers(int fd, int argc, char *argv[])
 {
        return __iax2_show_peers(0, fd, argc, argv);
@@ -5277,11 +5427,19 @@ static int authenticate_reply(struct chan_iax2_pvt *p, struct sockaddr_in *sin,
 
 static int iax2_do_register(struct iax2_registry *reg);
 
-static int iax2_do_register_s(void *data)
+static void __iax2_do_register_s(void *data)
 {
        struct iax2_registry *reg = data;
        reg->expire = -1;
        iax2_do_register(reg);
+}
+
+static int iax2_do_register_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__iax2_do_register_s, data))
+#endif         
+               __iax2_do_register_s(data);
        return 0;
 }
 
@@ -5562,7 +5720,7 @@ static void register_peer_exten(struct iax2_peer *peer, int onoff)
 }
 static void prune_peers(void);
 
-static int expire_registry(void *data)
+static void __expire_registry(void *data)
 {
        struct iax2_peer *p = data;
 
@@ -5584,11 +5742,17 @@ static int expire_registry(void *data)
                ast_set_flag(p, IAX_DELME);
                prune_peers();
        }
+}
 
+static int expire_registry(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__expire_registry, data))
+#endif         
+               __expire_registry(data);
        return 0;
 }
 
-
 static int iax2_poke_peer(struct iax2_peer *peer, int heldcall);
 
 static void reg_source_db(struct iax2_peer *p)
@@ -5828,7 +5992,7 @@ static int stop_stuff(int callno)
                return 0;
 }
 
-static int auth_reject(void *nothing)
+static void __auth_reject(void *nothing)
 {
        /* Called from IAX thread only, without iaxs lock */
        int callno = (int)(long)(nothing);
@@ -5847,6 +6011,14 @@ static int auth_reject(void *nothing)
                send_command_final(iaxs[callno], AST_FRAME_IAX, iaxs[callno]->authfail, 0, ied.buf, ied.pos, -1);
        }
        ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auth_reject(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__auth_reject, data))
+#endif         
+               __auth_reject(data);
        return 0;
 }
 
@@ -5868,7 +6040,7 @@ static int auth_fail(int callno, int failcode)
        return 0;
 }
 
-static int auto_hangup(void *nothing)
+static void __auto_hangup(void *nothing)
 {
        /* Called from IAX thread only, without iaxs lock */
        int callno = (int)(long)(nothing);
@@ -5882,6 +6054,14 @@ static int auto_hangup(void *nothing)
                send_command_final(iaxs[callno], AST_FRAME_IAX, IAX_COMMAND_HANGUP, 0, ied.buf, ied.pos, -1);
        }
        ast_mutex_unlock(&iaxsl[callno]);
+}
+
+static int auto_hangup(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__auto_hangup, data))
+#endif         
+               __auto_hangup(data);
        return 0;
 }
 
@@ -5919,11 +6099,19 @@ static void vnak_retransmit(int callno, int last)
        ast_mutex_unlock(&iaxq.lock);
 }
 
-static int iax2_poke_peer_s(void *data)
+static void __iax2_poke_peer_s(void *data)
 {
        struct iax2_peer *peer = data;
        peer->pokeexpire = -1;
        iax2_poke_peer(peer, 0);
+}
+
+static int iax2_poke_peer_s(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__iax2_poke_peer_s, data))
+#endif         
+               __iax2_poke_peer_s(data);
        return 0;
 }
 
@@ -6258,18 +6446,50 @@ static void save_rr(struct iax_frame *fr, struct iax_ies *ies)
 
 static int socket_read(int *id, int fd, short events, void *cbdata)
 {
+       struct iax2_thread *thread;
+       socklen_t len;
+       thread = find_idle_thread();
+       time_t t;
+       static time_t last_errtime=0;
+       if (thread) {
+               len = sizeof(thread->iosin);
+               thread->iofd = fd;
+               thread->iores = recvfrom(fd, thread->buf, sizeof(thread->buf), 0,(struct sockaddr *) &thread->iosin, &len);
+               if (thread->iores < 0) {
+                       if (errno != ECONNREFUSED)
+                               ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
+                       handle_error();
+                       return 1;
+               }
+               if(test_losspct) { /* simulate random loss condition */
+                       if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct) 
+                               return 1;
+               }
+               /* Mark as ready and send on its way */
+               thread->iostate = IAX_IOSTATE_READY;
+               pthread_kill(thread->threadid, SIGURG);
+       } else {
+               time(&t);
+               if (t != last_errtime)
+                       ast_log(LOG_NOTICE, "Out of idle IAX2 threads for I/O, pausing!\n");
+               last_errtime = t;
+               usleep(1);
+       }
+       return 1;
+}
+
+static int socket_process(struct iax2_thread *thread)
+{
        struct sockaddr_in sin;
        int res;
        int updatehistory=1;
        int new = NEW_PREVENT;
-       unsigned char buf[4096]; 
        void *ptr;
-       socklen_t len = sizeof(sin);
        int dcallno = 0;
-       struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)buf;
-       struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)buf;
-       struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)buf;
-       struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)buf;
+       struct ast_iax2_full_hdr *fh = (struct ast_iax2_full_hdr *)thread->buf;
+       struct ast_iax2_mini_hdr *mh = (struct ast_iax2_mini_hdr *)thread->buf;
+       struct ast_iax2_meta_hdr *meta = (struct ast_iax2_meta_hdr *)thread->buf;
+       struct ast_iax2_video_hdr *vh = (struct ast_iax2_video_hdr *)thread->buf;
        struct ast_iax2_meta_trunk_hdr *mth;
        struct ast_iax2_meta_trunk_entry *mte;
        struct ast_iax2_meta_trunk_mini *mtm;
@@ -6286,6 +6506,7 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
        struct iax_ies ies;
        struct iax_ie_data ied0, ied1;
        int format;
+       int fd;
        int exists;
        int minivid = 0;
        unsigned int ts;
@@ -6298,19 +6519,12 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
 
        dblbuf[0] = 0;  /* Keep GCC from whining */
        fr.callno = 0;
-       
-       res = recvfrom(fd, buf, sizeof(buf), 0,(struct sockaddr *) &sin, &len);
-       if (res < 0) {
-               if (errno != ECONNREFUSED)
-                       ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
-               handle_error();
-               return 1;
-       }
-       if(test_losspct) { /* simulate random loss condition */
-               if( (100.0*rand()/(RAND_MAX+1.0)) < test_losspct) 
-                       return 1;
-       }
+
+       /* Copy frequently used parameters to the stack */
+       res = thread->iores;
+       fd = thread->iofd;
+       memcpy(&sin, &thread->iosin, sizeof(sin));
+
        if (res < sizeof(struct ast_iax2_mini_hdr)) {
                ast_log(LOG_WARNING, "midget packet received (%d of %d min)\n", res, (int)sizeof(struct ast_iax2_mini_hdr));
                return 1;
@@ -6643,14 +6857,14 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
 
                if (f.datalen) {
                        if (f.frametype == AST_FRAME_IAX) {
-                               if (iax_parse_ies(&ies, buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
+                               if (iax_parse_ies(&ies, thread->buf + sizeof(struct ast_iax2_full_hdr), f.datalen)) {
                                        ast_log(LOG_WARNING, "Undecodable frame received from '%s'\n", ast_inet_ntoa(iabuf, sizeof(iabuf), sin.sin_addr));
                                        ast_mutex_unlock(&iaxsl[fr.callno]);
                                        return 1;
                                }
                                f.data = NULL;
                        } else
-                               f.data = buf + sizeof(struct ast_iax2_full_hdr);
+                               f.data = thread->buf + sizeof(struct ast_iax2_full_hdr);
                } else {
                        if (f.frametype == AST_FRAME_IAX)
                                f.data = NULL;
@@ -7526,7 +7740,7 @@ retryowner2:
                }
                f.datalen = res - sizeof(struct ast_iax2_video_hdr);
                if (f.datalen)
-                       f.data = buf + sizeof(struct ast_iax2_video_hdr);
+                       f.data = thread->buf + sizeof(struct ast_iax2_video_hdr);
                else
                        f.data = NULL;
 #ifdef IAXTESTS
@@ -7553,7 +7767,7 @@ retryowner2:
                        return 1;
                }
                if (f.datalen)
-                       f.data = buf + sizeof(struct ast_iax2_mini_hdr);
+                       f.data = thread->buf + sizeof(struct ast_iax2_mini_hdr);
                else
                        f.data = NULL;
 #ifdef IAXTESTS
@@ -7620,6 +7834,59 @@ retryowner2:
        return 1;
 }
 
+static void destroy_helper(struct iax2_thread *thread)
+{
+       ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
+       free(thread);
+}
+
+static void *iax2_process_thread(void *data)
+{
+       struct iax2_thread *thread_copy, *thread = data;
+       struct timeval tv;
+       for(;;) {
+               /* Sleep for up to 1 second */
+               tv.tv_sec = 1;
+               tv.tv_usec = 0;
+               select(0, NULL, NULL, NULL, &tv);
+               /* Unlink from idlelist / activelist if there*/
+               ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
+               ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+               /* If instructed to halt, stop now */
+               if (thread->halt) {
+                       ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
+                       ASTOBJ_UNREF(thread, destroy_helper);
+                       break;
+               }
+               /* Remove our reference */
+               ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+               switch(thread->iostate) {
+               case IAX_IOSTATE_READY:
+                       thread->actions++;
+                       thread->iostate = IAX_IOSTATE_PROCESSING;
+                       socket_process(thread);
+                       break;
+               case IAX_IOSTATE_SCHEDREADY:
+                       thread->actions++;
+                       thread->iostate = IAX_IOSTATE_PROCESSING;
+                       thread->schedfunc(thread->scheddata);
+                       break;
+               }
+               time(&thread->checktime);
+               thread->iostate = IAX_IOSTATE_IDLE;
+               ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
+               ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+               /* Make a copy so we don't lose thread, but if 
+                  we become unreferenced here, our thread gets
+                  cancelled anyway, so it's okay */
+               thread_copy = thread;
+               ASTOBJ_UNREF(thread_copy, destroy_helper);
+               thread_copy = thread;
+               ASTOBJ_UNREF(thread_copy, destroy_helper);
+       }
+       return NULL;
+}
+
 static int iax2_do_register(struct iax2_registry *reg)
 {
        struct iax_ie_data ied;
@@ -7771,7 +8038,7 @@ static int iax2_prov_cmd(int fd, int argc, char *argv[])
        return RESULT_SUCCESS;
 }
 
-static int iax2_poke_noanswer(void *data)
+static void __iax2_poke_noanswer(void *data)
 {
        struct iax2_peer *peer = data;
        peer->pokeexpire = -1;
@@ -7786,6 +8053,14 @@ static int iax2_poke_noanswer(void *data)
        peer->lastms = -1;
        /* Try again quickly */
        peer->pokeexpire = ast_sched_add(sched, peer->pokefreqnotok, iax2_poke_peer_s, peer);
+}
+
+static int iax2_poke_noanswer(void *data)
+{
+#ifdef SCHED_MULTITHREADED
+       if (schedule_action(__iax2_poke_noanswer, data))
+#endif         
+               __iax2_poke_noanswer(data);
        return 0;
 }
 
@@ -7918,6 +8193,26 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data
        return c;
 }
 
+static void *sched_thread(void *ignore)
+{
+       int count;
+       int res;
+       for (;;) {
+               res = ast_sched_wait(sched);
+               if ((res > 1000) || (res < 0))
+                       res = 1000;
+               res = poll(NULL, 0, res);
+               if (res < 0) {
+                       if ((errno != EAGAIN) && (errno != EINTR))
+                               ast_log(LOG_WARNING, "poll failed: %s\n", strerror(errno));
+               }
+               count = ast_sched_runq(sched);
+               if (count >= 20)
+                       ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
+       }
+       return NULL;
+}
+
 static void *network_thread(void *ignore)
 {
        /* Our job is simple: Send queued messages, retrying if necessary.  Read frames 
@@ -7958,6 +8253,7 @@ static void *network_thread(void *ignore)
                                        /* We need reliable delivery.  Schedule a retransmission */
                                        f->retries++;
                                        f->retrans = ast_sched_add(sched, f->retrytime, attempt_transmit, f);
+                                       pthread_kill(schedthreadid, SIGURG);
                                }
                        }
                        f = f->next;
@@ -7969,16 +8265,10 @@ static void *network_thread(void *ignore)
                        ast_log(LOG_DEBUG, "chan_iax2: Sent %d queued outbound frames all at once\n", count);
 
                /* Now do the IO, and run scheduled tasks */
-               res = ast_sched_wait(sched);
-               if ((res > 1000) || (res < 0))
-                       res = 1000;
-               res = ast_io_wait(io, res);
+               res = ast_io_wait(io, -1);
                if (res >= 0) {
                        if (res >= 20)
                                ast_log(LOG_DEBUG, "chan_iax2: ast_io_wait ran %d I/Os all at once\n", res);
-                       count = ast_sched_runq(sched);
-                       if (count >= 20)
-                               ast_log(LOG_DEBUG, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
                }
        }
        return NULL;
@@ -7986,7 +8276,29 @@ static void *network_thread(void *ignore)
 
 static int start_network_thread(void)
 {
-       return ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+       int threadcount = 0;
+       int x;
+       ASTOBJ_CONTAINER_INIT(&idlelist);
+       ASTOBJ_CONTAINER_INIT(&activelist);
+       for (x = 0; x < iaxthreadcount; x++) {
+               struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
+               if (thread) {
+                       ASTOBJ_INIT(thread);
+                       thread->threadnum = ++threadcount;
+                       if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+                               ast_log(LOG_WARNING, "Failed to create new thread!\n");
+                               free(thread);
+                               thread = NULL;
+                       }
+                       ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+                       ASTOBJ_UNREF(thread, destroy_helper);
+               }
+       }
+       ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
+       ast_pthread_create(&netthreadid, NULL, network_thread, NULL);
+       if (option_verbose > 1)
+               ast_verbose(VERBOSE_PREFIX_2 "%d helper threaads started\n", threadcount);
+       return 0;
 }
 
 static struct iax2_context *build_context(char *context)
@@ -8620,7 +8932,21 @@ static int set_config(char *config_file, int reload)
                                portno = atoi(v->value);
                } else if (!strcasecmp(v->name, "pingtime")) 
                        ping_time = atoi(v->value);
-               else if (!strcasecmp(v->name, "nochecksums")) {
+               else if (!strcasecmp(v->name, "iaxthreadcount")) {
+                       if (reload) {
+                               if (atoi(v->value) != iaxthreadcount)
+                                       ast_log(LOG_NOTICE, "Ignoring any changes to iaxthreadcount during reload\n");
+                       } else {
+                               iaxthreadcount = atoi(v->value);
+                               if (iaxthreadcount < 1) {
+                                       ast_log(LOG_NOTICE, "iaxthreadcount must be at least 1.\n");
+                                       iaxthreadcount = 1;
+                               } else if (iaxthreadcount > 256) {
+                                       ast_log(LOG_NOTICE, "limiting iaxthreadcount to 256\n");
+                                       iaxthreadcount = 256;
+                               }
+                       }
+               } else if (!strcasecmp(v->name, "nochecksums")) {
 #ifdef SO_NO_CHECK
                        if (ast_true(v->value))
                                nochecksums = 1;
@@ -8759,7 +9085,7 @@ static int set_config(char *config_file, int reload)
                                amaflags = format;
                        }
                } else if (!strcasecmp(v->name, "language")) {
-                        ast_copy_string(language, v->value, sizeof(language));
+                       ast_copy_string(language, v->value, sizeof(language));
                } /*else if (strcasecmp(v->name,"type")) */
                /*      ast_log(LOG_WARNING, "Ignoring %s\n", v->name); */
                v = v->next;
@@ -9366,6 +9692,10 @@ static char show_netstats_usage[] =
 "Usage: iax2 show netstats\n"
 "       Lists network status for all currently active IAX channels.\n";
 
+static char show_threads_usage[] = 
+"Usage: iax2 show threads\n"
+"       Lists status of IAX helper threads\n";
+
 static char show_peers_usage[] = 
 "Usage: iax2 show peers [registered] [like <pattern>]\n"
 "       Lists all known IAX2 peers.\n"
@@ -9445,6 +9775,8 @@ static struct ast_cli_entry iax2_cli[] = {
          "Show active IAX channel netstats", show_netstats_usage },
        { { "iax2", "show", "peers", NULL }, iax2_show_peers,
          "Show defined IAX peers", show_peers_usage },
+       { { "iax2", "show", "threads", NULL }, iax2_show_threads,
+         "Show IAX helper thread info", show_threads_usage },
        { { "iax2", "show", "registry", NULL }, iax2_show_registry,
          "Show IAX registration status", show_reg_usage },
        { { "iax2", "debug", NULL }, iax2_do_debug,
@@ -9481,6 +9813,23 @@ static int __unload_module(void)
                pthread_cancel(netthreadid);
                pthread_join(netthreadid, NULL);
        }
+       if (schedthreadid != AST_PTHREADT_NULL) {
+               pthread_cancel(schedthreadid);
+               pthread_join(schedthreadid, NULL);
+       }
+       while (idlelist.head || activelist.head) {
+               ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
+                       iterator->halt = 1;
+                       pthread_kill(iterator->threadid, SIGURG);
+               });
+               ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
+                       iterator->halt = 1;
+                       pthread_kill(iterator->threadid, SIGURG);
+               });
+               usleep(100000);
+       }
+       ASTOBJ_CONTAINER_DESTROY(&idlelist);
+       ASTOBJ_CONTAINER_DESTROY(&activelist);
        ast_netsock_release(netsock);
        for (x=0;x<IAX_MAX_CALLS;x++)
                if (iaxs[x])
index 26d637d..5c4b226 100644 (file)
@@ -163,6 +163,10 @@ forcejitterbuffer=no
 ; minregexpire = 60
 ; maxregexpire = 60
 ;
+; IAX helper threads
+; Establishes the number of iax helper threads to handle I/O.
+; iaxthreadcount = 10
+;
 ; We can register with another IAX server to let him know where we are
 ; in case we have a dynamic IP address for example
 ;