Convert chan_iax2 to use linked lists for multithreading, and add dynamic threads...
authorJoshua Colp <jcolp@digium.com>
Tue, 11 Apr 2006 16:44:10 +0000 (16:44 +0000)
committerJoshua Colp <jcolp@digium.com>
Tue, 11 Apr 2006 16:44:10 +0000 (16:44 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@19254 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_iax2.c
configs/iax.conf.sample

index 75b3f66..33152b1 100644 (file)
@@ -92,6 +92,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/devicestate.h"
 #include "asterisk/netsock.h"
 #include "asterisk/stringfields.h"
+#include "asterisk/linkedlists.h"
 
 #include "iax2.h"
 #include "iax2-parser.h"
@@ -134,6 +135,7 @@ static int nochecksums = 0;
 #define CALLNO_TO_PTR(a) ((void *)(unsigned long)(a))
 
 #define DEFAULT_THREAD_COUNT 10
+#define DEFAULT_MAX_THREAD_COUNT 100
 #define DEFAULT_RETRY_TIME 1000
 #define MEMORY_SIZE 100
 #define DEFAULT_DROP 3
@@ -444,6 +446,8 @@ static int max_jitter_buffer = MAX_JITTER_BUFFER;
 static int min_jitter_buffer = MIN_JITTER_BUFFER;
 
 static int iaxthreadcount = DEFAULT_THREAD_COUNT;
+static int iaxmaxthreadcount = DEFAULT_MAX_THREAD_COUNT;
+static int iaxdynamicthreadcount = 0;
 
 struct iax_rr {
        int jitter;
@@ -681,8 +685,12 @@ static int ast_cli_netstats(struct mansession *s, int fd, int limit_fmt);
 #define IAX_IOSTATE_PROCESSING 2
 #define IAX_IOSTATE_SCHEDREADY 3
 
+#define IAX_TYPE_POOL    1
+#define IAX_TYPE_DYNAMIC 2
+
 struct iax2_thread {
-       ASTOBJ_COMPONENTS(struct iax2_thread);
+       AST_LIST_ENTRY(iax2_thread) list;
+       int type;
        int iostate;
 #ifdef SCHED_MULTITHREADED
        void (*schedfunc)(void *);
@@ -704,11 +712,12 @@ struct iax2_thread {
        ast_cond_t cond;
 };
 
-struct iax2_thread_list {
-       ASTOBJ_CONTAINER_COMPONENTS(struct iax2_thread);
-};
+/* Thread lists */
+static AST_LIST_HEAD_STATIC(idle_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(active_list, iax2_thread);
+static AST_LIST_HEAD_STATIC(dynamic_list, iax2_thread);
 
-static struct iax2_thread_list idlelist, activelist;
+static void *iax2_process_thread(void *data);
 
 static void signal_condition(ast_mutex_t *lock, ast_cond_t *cond)
 {
@@ -831,19 +840,59 @@ static const struct ast_channel_tech iax2_tech = {
 
 static struct iax2_thread *find_idle_thread(void)
 {
-       struct iax2_thread *thread;
-       thread = ASTOBJ_CONTAINER_UNLINK_START(&idlelist);
+       struct iax2_thread *thread = NULL;
+
+       /* Find free idle thread in the list, get a pointer to it, and remove it from the list */
+       AST_LIST_LOCK(&idle_list);
+       thread = AST_LIST_FIRST(&idle_list);
+       if (thread != NULL) {
+               AST_LIST_REMOVE(&idle_list, thread, list);
+               thread->list.next = NULL;
+       }
+       AST_LIST_UNLOCK(&idle_list);
+
+       /* If no idle thread is available from the regular list, try dynamic */
+       if (thread == NULL) {
+               AST_LIST_LOCK(&dynamic_list);
+               thread = AST_LIST_FIRST(&dynamic_list);
+               if (thread != NULL) {
+                       AST_LIST_REMOVE(&dynamic_list, thread, list);
+                       thread->list.next = NULL;
+               }
+               /* Make sure we absolutely have a thread... if not, try to make one if allowed */
+               if (thread == NULL && iaxmaxthreadcount > iaxdynamicthreadcount) {
+                       /* We need to MAKE a thread! */
+                       thread = ast_calloc(1, sizeof(*thread));
+                       if (thread != NULL) {
+                               thread->threadnum = iaxdynamicthreadcount;
+                               thread->type = IAX_TYPE_DYNAMIC;
+                               ast_mutex_init(&thread->lock);
+                               ast_cond_init(&thread->cond, NULL);
+                               if (ast_pthread_create(&thread->threadid, NULL, iax2_process_thread, thread)) {
+                                       free(thread);
+                                       thread = NULL;
+                               } else {
+                                       /* All went well and the thread is up, so increment our count */
+                                       iaxdynamicthreadcount++;
+                               }
+                       }
+               }
+               AST_LIST_UNLOCK(&dynamic_list);
+       }
+
        return thread;
 }
 
 #ifdef SCHED_MULTITHREADED
 static int __schedule_action(void (*func)(void *data), void *data, const char *funcname)
 {
-       struct iax2_thread *thread;
+       struct iax2_thread *thread = NULL;
        static time_t lasterror;
        static time_t t;
+
        thread = find_idle_thread();
-       if (thread) {
+
+       if (thread != NULL) {
                thread->schedfunc = func;
                thread->scheddata = data;
                thread->iostate = IAX_IOSTATE_SCHEDREADY;
@@ -857,6 +906,7 @@ static int __schedule_action(void (*func)(void *data), void *data, const char *f
        if (t != lasterror) 
                ast_log(LOG_NOTICE, "Out of idle IAX2 threads for scheduling!\n");
        lasterror = t;
+
        return -1;
 }
 #define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
@@ -4419,42 +4469,60 @@ static int __iax2_show_peers(int manager, int fd, struct mansession *s, int argc
 
 static int iax2_show_threads(int fd, int argc, char *argv[])
 {
+       struct iax2_thread *thread = NULL;
        time_t t;
-       int threadcount = 0;
+       int threadcount = 0, dynamiccount = 0;
+       char type;
+
        if (argc != 3)
                return RESULT_SHOWUSAGE;
                
        ast_cli(fd, "IAX2 Thread Information\n");
        time(&t);
        ast_cli(fd, "Idle Threads:\n");
+       AST_LIST_LOCK(&idle_list);
+       AST_LIST_TRAVERSE(&idle_list, thread, list) {
 #ifdef DEBUG_SCHED_MULTITHREAD
-       ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n", 
-                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
-               threadcount++;
-       });
+               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n", 
+                       thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
 #else
-       ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n", 
-                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
-               threadcount++;
-       });
+               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n", 
+                       thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
 #endif
+               threadcount++;
+       }
+       AST_LIST_UNLOCK(&idle_list);
        ast_cli(fd, "Active Threads:\n");
+       AST_LIST_LOCK(&active_list);
+       AST_LIST_TRAVERSE(&active_list, thread, list) {
+               if (thread->type == IAX_TYPE_DYNAMIC)
+                       type = 'D';
+               else
+                       type = 'P';
 #ifdef DEBUG_SCHED_MULTITHREAD
-       ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d, func ='%s'\n", 
-                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount, iterator->curfunc);
-               threadcount++;
-       });
+               ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d, func ='%s'\n", 
+                       type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
 #else
-       ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-               ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, refcnt=%d\n", 
-                       iterator->threadnum, iterator->iostate, (int)(t - iterator->checktime), iterator->actions, iterator->refcount);
+               ast_cli(fd, "Thread %c%d: state=%d, update=%d, actions=%d\n", 
+                       type, thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
+#endif
                threadcount++;
-       });
+       }
+       AST_LIST_UNLOCK(&active_list);
+       ast_cli(fd, "Dynamic Threads:\n");
+        AST_LIST_LOCK(&dynamic_list);
+        AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+#ifdef DEBUG_SCHED_MULTITHREAD
+                ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d, func ='%s'\n",
+                        thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions, thread->curfunc);
+#else
+                ast_cli(fd, "Thread %d: state=%d, update=%d, actions=%d\n",
+                        thread->threadnum, thread->iostate, (int)(t - thread->checktime), thread->actions);
 #endif
-       ast_cli(fd, "%d of %d threads accounted for\n", threadcount, iaxthreadcount);
+               dynamiccount++;
+        }
+        AST_LIST_UNLOCK(&dynamic_list);
+       ast_cli(fd, "%d of %d threads accounted for with %d dynamic threads\n", threadcount, iaxthreadcount, dynamiccount);
        return RESULT_SUCCESS;
 }
 
@@ -6512,11 +6580,15 @@ static int socket_read(int *id, int fd, short events, void *cbdata)
                        if (errno != ECONNREFUSED)
                                ast_log(LOG_WARNING, "Error: %s\n", strerror(errno));
                        handle_error();
-                       ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
+                       AST_LIST_LOCK(&idle_list);
+                       AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+                       AST_LIST_UNLOCK(&idle_list);
                        return 1;
                }
                if (test_losspct && ((100.0 * ast_random() / (RAND_MAX + 1.0)) < test_losspct)) { /* simulate random loss condition */
-                       ASTOBJ_CONTAINER_LINK_END(&idlelist, thread); 
+                       AST_LIST_LOCK(&idle_list);
+                       AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+                       AST_LIST_UNLOCK(&idle_list);
                        return 1;
                }
                /* Mark as ready and send on its way */
@@ -7891,34 +7963,44 @@ retryowner2:
        return 1;
 }
 
-static void destroy_helper(struct iax2_thread *thread)
-{
-       ast_log(LOG_DEBUG, "Destroying helper %d!\n", thread->threadnum);
-       ast_mutex_destroy(&thread->lock);
-       ast_cond_destroy(&thread->cond);
-       free(thread);
-}
-
 static void *iax2_process_thread(void *data)
 {
-       struct iax2_thread *thread_copy, *thread = data;
+       struct iax2_thread *thread = data;
+       struct timeval tv;
+       struct timespec ts;
 
        for(;;) {
                /* Wait for something to signal us to be awake */
                ast_mutex_lock(&thread->lock);
-               ast_cond_wait(&thread->cond, &thread->lock);
+               if (thread->type == IAX_TYPE_DYNAMIC) {
+                       /* Wait to be signalled or time out */
+                       tv = ast_tvadd(ast_tvnow(), ast_samp2tv(30000, 1000));
+                       ts.tv_sec = tv.tv_sec;
+                       ts.tv_nsec = tv.tv_usec * 1000;
+                       if (ast_cond_timedwait(&thread->cond, &thread->lock, &ts) == ETIMEDOUT) {
+                               ast_mutex_unlock(&thread->lock);
+                               AST_LIST_LOCK(&dynamic_list);
+                               AST_LIST_REMOVE(&dynamic_list, thread, list);
+                               iaxdynamicthreadcount--;
+                               AST_LIST_UNLOCK(&dynamic_list);
+                               break;
+                       }
+               } else {
+                       ast_cond_wait(&thread->cond, &thread->lock);
+               }
                ast_mutex_unlock(&thread->lock);
-               /* Unlink from idlelist / activelist if there*/
-               ASTOBJ_CONTAINER_UNLINK(&idlelist, thread);
-               ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
-               /* If instructed to halt, stop now */
+
+               /* If we were signalled, then we are already out of both lists or we are shutting down */
                if (thread->halt) {
-                       ast_log(LOG_DEBUG, "Halting, refcount = %d\n", thread->refcount);
-                       ASTOBJ_UNREF(thread, destroy_helper);
                        break;
                }
-               /* Remove our reference */
-               ASTOBJ_CONTAINER_LINK_END(&activelist, thread);
+
+               /* Add ourselves to the active list now */
+               AST_LIST_LOCK(&active_list);
+               AST_LIST_INSERT_HEAD(&active_list, thread, list);
+               AST_LIST_UNLOCK(&active_list);
+
+               /* See what we need to do */
                switch(thread->iostate) {
                case IAX_IOSTATE_READY:
                        thread->actions++;
@@ -7938,16 +8020,31 @@ static void *iax2_process_thread(void *data)
 #ifdef DEBUG_SCHED_MULTITHREAD
                thread->curfunc[0]='\0';
 #endif         
-               ASTOBJ_CONTAINER_UNLINK(&activelist, thread);
-               ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
-               /* Make a copy so we don't lose thread, but if 
-                  we become unreferenced here, our thread gets
-                  cancelled anyway, so it's okay */
-               thread_copy = thread;
-               ASTOBJ_UNREF(thread_copy, destroy_helper);
-               thread_copy = thread;
-               ASTOBJ_UNREF(thread_copy, destroy_helper);
+
+               /* Now... remove ourselves from the active list, and return to the idle list */
+               AST_LIST_LOCK(&active_list);
+               AST_LIST_REMOVE(&active_list, thread, list);
+               thread->list.next = NULL;
+               AST_LIST_UNLOCK(&active_list);
+
+               /* Go back into our respective list */
+               if (thread->type == IAX_TYPE_DYNAMIC) {
+                       AST_LIST_LOCK(&dynamic_list);
+                       AST_LIST_INSERT_TAIL(&dynamic_list, thread, list);
+                       AST_LIST_UNLOCK(&dynamic_list);
+               } else {
+                       AST_LIST_LOCK(&idle_list);
+                       AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+                       AST_LIST_UNLOCK(&idle_list);
+               }
        }
+
+       /* Free our own memory */
+       ast_mutex_destroy(&thread->lock);
+       ast_cond_destroy(&thread->cond);
+       free(thread);
+       thread = NULL;
+
        return NULL;
 }
 
@@ -8352,12 +8449,10 @@ static int start_network_thread(void)
 {
        int threadcount = 0;
        int x;
-       ASTOBJ_CONTAINER_INIT(&idlelist);
-       ASTOBJ_CONTAINER_INIT(&activelist);
        for (x = 0; x < iaxthreadcount; x++) {
                struct iax2_thread *thread = ast_calloc(1, sizeof(struct iax2_thread));
                if (thread) {
-                       ASTOBJ_INIT(thread);
+                       thread->type = IAX_TYPE_POOL;
                        thread->threadnum = ++threadcount;
                        ast_mutex_init(&thread->lock);
                        ast_cond_init(&thread->cond, NULL);
@@ -8366,8 +8461,9 @@ static int start_network_thread(void)
                                free(thread);
                                thread = NULL;
                        }
-                       ASTOBJ_CONTAINER_LINK_END(&idlelist, thread);
-                       ASTOBJ_UNREF(thread, destroy_helper);
+                       AST_LIST_LOCK(&idle_list);
+                       AST_LIST_INSERT_TAIL(&idle_list, thread, list);
+                       AST_LIST_UNLOCK(&idle_list);
                }
        }
        ast_pthread_create(&schedthreadid, NULL, sched_thread, NULL);
@@ -9020,6 +9116,21 @@ static int set_config(char *config_file, int reload)
                                        iaxthreadcount = 256;
                                }
                        }
+               } else if (!strcasecmp(v->name, "iaxmaxthreadcount")) {
+                       if (reload) {
+                               AST_LIST_LOCK(&dynamic_list);
+                               iaxmaxthreadcount = atoi(v->value);
+                               AST_LIST_UNLOCK(&dynamic_list);
+                       } else {
+                               iaxmaxthreadcount = atoi(v->value);
+                               if (iaxmaxthreadcount < 0) {
+                                       ast_log(LOG_NOTICE, "iaxmaxthreadcount must be at least 0.\n");
+                                       iaxmaxthreadcount = 0;
+                               } else if (iaxmaxthreadcount > 256) {
+                                       ast_log(LOG_NOTICE, "Limiting iaxmaxthreadcount to 256\n");
+                                       iaxmaxthreadcount = 256;
+                               }
+                       }
                } else if (!strcasecmp(v->name, "nochecksums")) {
 #ifdef SO_NO_CHECK
                        if (ast_true(v->value))
@@ -9891,7 +10002,9 @@ static struct ast_cli_entry iax2_cli[] = {
 
 static int __unload_module(void)
 {
+       struct iax2_thread *thread = NULL;
        int x;
+
        /* Cancel the network thread, close the net socket */
        if (netthreadid != AST_PTHREADT_NULL) {
                pthread_cancel(netthreadid);
@@ -9905,19 +10018,29 @@ static int __unload_module(void)
                ast_mutex_unlock(&sched_lock);
                pthread_join(schedthreadid, NULL);
        }
-       while (idlelist.head || activelist.head) {
-               ASTOBJ_CONTAINER_TRAVERSE(&idlelist, 1, {
-                       iterator->halt = 1;
-                       signal_condition(&iterator->lock, &iterator->cond);
-               });
-               ASTOBJ_CONTAINER_TRAVERSE(&activelist, 1, {
-                       iterator->halt = 1;
-                       signal_condition(&iterator->lock, &iterator->cond);
-               });
-               usleep(100000);
-       }
-       ASTOBJ_CONTAINER_DESTROY(&idlelist);
-       ASTOBJ_CONTAINER_DESTROY(&activelist);
+
+       /* Call for all threads to halt */
+       AST_LIST_LOCK(&idle_list);
+       AST_LIST_TRAVERSE(&idle_list, thread, list) {
+               thread->halt = 1;
+               signal_condition(&thread->lock, &thread->cond);
+       }
+       AST_LIST_UNLOCK(&idle_list);
+
+       AST_LIST_LOCK(&active_list);
+       AST_LIST_TRAVERSE(&active_list, thread, list) {
+               thread->halt = 1;
+               signal_condition(&thread->lock, &thread->cond);
+       }
+       AST_LIST_UNLOCK(&active_list);
+
+       AST_LIST_LOCK(&dynamic_list);
+        AST_LIST_TRAVERSE(&dynamic_list, thread, list) {
+                thread->halt = 1;
+                signal_condition(&thread->lock, &thread->cond);
+        }
+        AST_LIST_UNLOCK(&dynamic_list);
+
        ast_netsock_release(netsock);
        for (x=0;x<IAX_MAX_CALLS;x++)
                if (iaxs[x])
index 2c5e030..3eeb760 100644 (file)
@@ -170,6 +170,8 @@ forcejitterbuffer=no
 ; IAX helper threads
 ; Establishes the number of iax helper threads to handle I/O.
 ; iaxthreadcount = 10
+; Establishes the number of extra dynamic threads that may be spawned to handle I/O
+; iaxmaxthreadcount = 100
 ;
 ; We can register with another IAX server to let him know where we are
 ; in case we have a dynamic IP address for example