Fix threadpool rapid growth problem.
authorMark Michelson <mmichelson@digium.com>
Thu, 20 Jun 2013 16:29:35 +0000 (16:29 +0000)
committerMark Michelson <mmichelson@digium.com>
Thu, 20 Jun 2013 16:29:35 +0000 (16:29 +0000)
When a threadpool is set to autoincrement its threadcount, an issue
may arise when multiple tasks are queued at once into the threadpool. Since
threads start active, each new task would result in autoincrementing the
thread count. So if all threads were active, and a thread's autoincrement
value were 5, then 3 new tasks would result in 15 threads being created even
though the initial autoincrement was sufficient to handle the number of tasks.

This change introduces three behavior changes:

1) New threads in the threadpool start idle instead of active.
2) When a threadpool autoincrements, one thread is activated after the growth.
3) When a threadpool's size is incremented manually, all added threads are activated.

For a more detailed explanation about the changes, please see the Review Board link
at the bottom of this commit.

Review: https://reviewboard.asterisk.org/r/2629

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@392318 65c4cc65-6c06-0410-ace0-fbb531ad65f3

main/threadpool.c

index 1ff7601..6cb241b 100644 (file)
@@ -510,7 +510,7 @@ static void grow(struct ast_threadpool *pool, int delta)
                if (!worker) {
                        return;
                }
-               if (ao2_link(pool->active_threads, worker)) {
+               if (ao2_link(pool->idle_threads, worker)) {
                        if (worker_thread_start(worker)) {
                                ast_log(LOG_ERROR, "Unable to start worker thread %d. Destroying.\n", worker->id);
                                ao2_unlink(pool->active_threads, worker);
@@ -536,24 +536,21 @@ static int queued_task_pushed(void *data)
        struct task_pushed_data *tpd = data;
        struct ast_threadpool *pool = tpd->pool;
        int was_empty = tpd->was_empty;
-       int state_changed;
 
        if (pool->listener && pool->listener->callbacks->task_pushed) {
                pool->listener->callbacks->task_pushed(pool, pool->listener, was_empty);
        }
        if (ao2_container_count(pool->idle_threads) == 0) {
-               if (pool->options.auto_increment > 0) {
-                       grow(pool, pool->options.auto_increment);
-                       state_changed = 1;
+               if (!pool->options.auto_increment) {
+                       return 0;
                }
-       } else {
-               ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
-                               activate_thread, pool);
-               state_changed = 1;
-       }
-       if (state_changed) {
-               threadpool_send_state_changed(pool);
+               grow(pool, pool->options.auto_increment);
        }
+
+       ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA,
+                       activate_thread, pool);
+
+       threadpool_send_state_changed(pool);
        ao2_ref(tpd, -1);
        return 0;
 }
@@ -808,6 +805,8 @@ static int queued_set_size(void *data)
 
        if (current_size < num_threads) {
                grow(pool, num_threads - current_size);
+               ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+                               activate_thread, pool);
        } else {
                shrink(pool, current_size - num_threads);
        }
@@ -986,7 +985,31 @@ static void *worker_start(void *arg)
        if (worker->options.thread_start) {
                worker->options.thread_start();
        }
-       worker_active(worker);
+
+       ast_mutex_lock(&worker->lock);
+       while (worker_idle(worker)) {
+               ast_mutex_unlock(&worker->lock);
+               worker_active(worker);
+               ast_mutex_lock(&worker->lock);
+               if (worker->state != ALIVE) {
+                       break;
+               }
+               threadpool_active_thread_idle(worker->pool, worker);
+       }
+       ast_mutex_unlock(&worker->lock);
+
+       /* Reaching this portion means the thread is
+        * on death's door. It may have been killed while
+        * it was idle, in which case it can just die
+        * peacefully. If it's a zombie, though, then
+        * it needs to let the pool know so
+        * that the thread can be removed from the
+        * list of zombie threads.
+        */
+       if (worker->state == ZOMBIE) {
+               threadpool_zombie_thread_dead(worker->pool, worker);
+       }
+
        if (worker->options.thread_end) {
                worker->options.thread_end();
        }
@@ -1035,24 +1058,19 @@ static int worker_thread_start(struct worker_thread *worker)
  */
 static void worker_active(struct worker_thread *worker)
 {
-       int alive = 1;
-       while (alive) {
-               if (!threadpool_execute(worker->pool)) {
-                       alive = worker_idle(worker);
-               }
-       }
+       int alive;
 
-       /* Reaching this portion means the thread is
-        * on death's door. It may have been killed while
-        * it was idle, in which case it can just die
-        * peacefully. If it's a zombie, though, then
-        * it needs to let the pool know so
-        * that the thread can be removed from the
-        * list of zombie threads.
+       /* The following is equivalent to 
+        *
+        * while (threadpool_execute(worker->pool));
+        *
+        * However, reviewers have suggested in the past
+        * doing that can cause optimizers to (wrongly)
+        * optimize the code away.
         */
-       if (worker->state == ZOMBIE) {
-               threadpool_zombie_thread_dead(worker->pool, worker);
-       }
+       do {
+               alive = threadpool_execute(worker->pool);
+       } while (alive);
 }
 
 /*!
@@ -1061,6 +1079,8 @@ static void worker_active(struct worker_thread *worker)
  * The worker waits here until it gets told by the threadpool
  * to wake up.
  *
+ * worker is locked before entering this function.
+ *
  * \param worker The idle worker
  * \retval 0 The thread is being woken up so that it can conclude.
  * \retval non-zero The thread is being woken up to do more work.
@@ -1072,15 +1092,10 @@ static int worker_idle(struct worker_thread *worker)
                .tv_sec = start.tv_sec + worker->options.idle_timeout,
                .tv_nsec = start.tv_usec * 1000,
        };
-       SCOPED_MUTEX(lock, &worker->lock);
-       if (worker->state != ALIVE) {
-               return 0;
-       }
-       threadpool_active_thread_idle(worker->pool, worker);
        while (!worker->wake_up) {
                if (worker->options.idle_timeout <= 0) {
-                       ast_cond_wait(&worker->cond, lock);
-               } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+                       ast_cond_wait(&worker->cond, &worker->lock);
+               } else if (ast_cond_timedwait(&worker->cond, &worker->lock, &end) == ETIMEDOUT) {
                        break;
                }
        }