Create longer thread destruction test.
authorMark Michelson <mmichelson@digium.com>
Sun, 9 Dec 2012 18:56:25 +0000 (18:56 +0000)
committerMark Michelson <mmichelson@digium.com>
Sun, 9 Dec 2012 18:56:25 +0000 (18:56 +0000)
This one involves shrinking the threadpool in such
a way that both idle and active threads are affected.

This test made me re-realize why the zombie state exists,
so I re-added it. We don't want to clog up the control
taskprocessor by waiting on active threads to complete
what they are doing. Instead, we mark them as zombies so
that when they are done, they can clean themselves up
properly.

Without the zombie state available, the new test actually
will deadlock.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377474 65c4cc65-6c06-0410-ace0-fbb531ad65f3

main/threadpool.c
tests/test_threadpool.c

index 5ed9a42..1443716 100644 (file)
@@ -46,6 +46,11 @@ struct ast_threadpool {
         */
        struct ao2_container *idle_threads;
        /*! 
+        * \brief The container of zombie threads.
+        * Zombie threads may be running tasks, but they are scheduled to die soon
+        */
+       struct ao2_container *zombie_threads;
+       /*! 
         * \brief The main taskprocessor
         * 
         * Tasks that are queued in this taskprocessor are
@@ -79,7 +84,7 @@ struct ast_threadpool {
         * This is done for three main reasons
         * 1) It ensures that listeners are given an accurate portrayal
         * of the threadpool's current state. In other words, when a listener
-        * gets told a count of active and idle threads, it does not
+        * gets told a count of active, idle and zombie threads, it does not
         * need to worry that internal state of the threadpool might be different
         * from what it has been told.
         * 2) It minimizes the locking required in both the threadpool and in
@@ -96,7 +101,20 @@ struct ast_threadpool {
 enum worker_state {
        /*! The worker is either active or idle */
        ALIVE,
-       /*! The worker has been asked to shut down. */
+       /*!
+        * The worker has been asked to shut down but
+        * may still be in the process of executing tasks.
+        * This transition happens when the threadpool needs
+        * to shrink and needs to kill active threads in order
+        * to do so.
+        */
+       ZOMBIE,
+       /*!
+        * The worker has been asked to shut down. Typically
+        * only idle threads go to this state directly, but
+        * active threads may go straight to this state when
+        * the threadpool is shut down.
+        */
        DEAD,
 };
 
@@ -202,6 +220,41 @@ static void threadpool_active_thread_idle(struct ast_threadpool *pool,
 }
 
 /*!
+ * \brief Kill a zombie thread
+ *
+ * This runs from the threadpool's control taskprocessor thread.
+ *
+ * \param data A thread_worker_pair containing the threadpool and the zombie thread
+ * \return 0
+ */
+static int queued_zombie_thread_dead(void *data)
+{
+       struct thread_worker_pair *pair = data;
+
+       ao2_unlink(pair->pool->zombie_threads, pair->worker);
+       threadpool_send_state_changed(pair->pool);
+
+       ao2_ref(pair, -1);
+       return 0;
+}
+
+/*!
+ * \brief Queue a task to kill a zombie thread
+ *
+ * This is called by a worker thread when it acknowledges that it is time for
+ * it to die.
+ */
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+               struct worker_thread *worker)
+{
+       struct thread_worker_pair *pair = thread_worker_pair_alloc(pool, worker);
+       if (!pair) {
+               return;
+       }
+       ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
+}
+
+/*!
  * \brief Execute a task in the threadpool
  * 
  * This is the function that worker threads call in order to execute tasks
@@ -263,6 +316,10 @@ static void *threadpool_alloc(struct ast_taskprocessor_listener *listener)
        if (!pool->idle_threads) {
                return NULL;
        }
+       pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+       if (!pool->zombie_threads) {
+               return NULL;
+       }
 
        ao2_ref(pool, +1);
        return pool;
@@ -413,6 +470,7 @@ static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
 
        ao2_cleanup(pool->active_threads);
        ao2_cleanup(pool->idle_threads);
+       ao2_cleanup(pool->zombie_threads);
 }
 
 /*!
@@ -459,6 +517,7 @@ static void grow(struct ast_threadpool *pool, int delta)
                        return;
                }
                ao2_link(pool->active_threads, worker);
+               ao2_ref(worker, -1);
        }
 }
 
@@ -478,7 +537,46 @@ static int kill_threads(void *obj, void *arg, int flags)
 {
        int *num_to_kill = arg;
 
-       if ((*num_to_kill)-- > 0) {
+       ast_log(LOG_NOTICE, "num to kill is %d\n", *num_to_kill);
+
+       if (*num_to_kill > 0) {
+               --(*num_to_kill);
+               ast_log(LOG_NOTICE, "Should be killing a thread\n");
+               return CMP_MATCH;
+       } else {
+               return CMP_STOP;
+       }
+}
+
+/*!
+ * \brief ao2 callback to zombify a set number of threads.
+ *
+ * Threads will be zombified as long as as the counter has not reached
+ * zero. The counter is decremented with each thread that is zombified.
+ *
+ * Zombifying a thread involves removing it from its current container,
+ * adding it to the zombie container, and changing the state of the
+ * worker to a zombie
+ *
+ * This callback is called from the threadpool control taskprocessor thread.
+ *
+ * \param obj The worker thread that may be zombified
+ * \param arg The pool to which the worker belongs
+ * \param data The counter
+ * \param flags Unused
+ * \retval CMP_MATCH The zombified thread should be removed from its current container
+ * \retval CMP_STOP Stop attempting to zombify threads
+ */
+static int zombify_threads(void *obj, void *arg, void *data, int flags)
+{
+       struct worker_thread *worker = obj;
+       struct ast_threadpool *pool = arg;
+       int *num_to_zombify = data;
+
+       if ((*num_to_zombify)-- > 0) {
+               ast_log(LOG_NOTICE, "Should be zombifying a thread\n");
+               ao2_link(pool->zombie_threads, worker);
+               worker_set_state(worker, ZOMBIE);
                return CMP_MATCH;
        } else {
                return CMP_STOP;
@@ -490,7 +588,7 @@ static int kill_threads(void *obj, void *arg, int flags)
  *
  * The preference is to kill idle threads. However, if there are
  * more threads to remove than there are idle threads, then active
- * threads will be removed too.
+ * threads will be zombified instead.
  *
  * This function is called from the threadpool control taskprocessor thread.
  *
@@ -499,15 +597,21 @@ static int kill_threads(void *obj, void *arg, int flags)
  */
 static void shrink(struct ast_threadpool *pool, int delta)
 {
+       /* 
+        * Preference is to kill idle threads, but
+        * we'll move on to deactivating active threads
+        * if we have to
+        */
        int idle_threads = ao2_container_count(pool->idle_threads);
        int idle_threads_to_kill = MIN(delta, idle_threads);
-       int active_threads_to_kill = delta - idle_threads_to_kill;
+       int active_threads_to_zombify = delta - idle_threads_to_kill;
 
-       ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK,
+       ast_log(LOG_NOTICE, "Going to kill off %d idle threads\n", idle_threads_to_kill);
+       ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
                        kill_threads, &idle_threads_to_kill);
 
-       ao2_callback(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK,
-                       kill_threads, &active_threads_to_kill);
+       ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NOLOCK | OBJ_NODATA | OBJ_MULTIPLE,
+                       zombify_threads, pool, &active_threads_to_zombify);
 }
 
 /*!
@@ -553,20 +657,22 @@ static int queued_set_size(void *data)
 {
        struct set_size_data *ssd = data;
        struct ast_threadpool *pool = ssd->pool;
-       unsigned int new_size = ssd->size;
+       unsigned int num_threads = ssd->size;
+
+       /* We don't count zombie threads as being "live when potentially resizing */
        unsigned int current_size = ao2_container_count(pool->active_threads) +
                ao2_container_count(pool->idle_threads);
 
-       if (current_size == new_size) {
+       if (current_size == num_threads) {
                ast_log(LOG_NOTICE, "Not changing threadpool size since new size %u is the same as current %u\n",
-                               new_size, current_size);
+                               num_threads, current_size);
                return 0;
        }
 
-       if (current_size < new_size) {
-               grow(pool, new_size - current_size);
+       if (current_size < num_threads) {
+               grow(pool, num_threads - current_size);
        } else {
-               shrink(pool, current_size - new_size);
+               shrink(pool, current_size - num_threads);
        }
 
        threadpool_send_state_changed(pool);
@@ -788,6 +894,18 @@ static void worker_active(struct worker_thread *worker)
                        alive = worker_idle(worker);
                }
        }
+
+       /* Reaching this portion means the thread is
+        * on death's door. It may have been killed while
+        * it was idle, in which case it can just die
+        * peacefully. If it's a zombie, though, then
+        * it needs to let the pool know so
+        * that the thread can be removed from the
+        * list of zombie threads.
+        */
+       if (worker->state == ZOMBIE) {
+               threadpool_zombie_thread_dead(worker->pool, worker);
+       }
 }
 
 /*!
index 3435b28..3d64fe9 100644 (file)
@@ -842,6 +842,98 @@ end:
        return res;
 }
 
+AST_TEST_DEFINE(threadpool_more_destruction)
+{
+       struct ast_threadpool *pool = NULL;
+       struct ast_threadpool_listener *listener = NULL;
+       struct complex_task_data *ctd1 = NULL;
+       struct complex_task_data *ctd2 = NULL;
+       enum ast_test_result_state res = AST_TEST_FAIL;
+       struct test_listener_data *tld;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = "threadpool_more_destruction";
+               info->category = "/main/threadpool/";
+               info->summary = "Test that threads are destroyed as expected";
+               info->description =
+                       "Push two tasks into a threadpool. Set the threadpool size to 4\n"
+                       "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
+                       "threadpool down to 1 thread. Ensure that the thread leftove is active\n"
+                       "and ensure that both tasks complete.\n";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       listener = ast_threadpool_listener_alloc(&test_callbacks);
+       if (!listener) {
+               return AST_TEST_FAIL;
+       }
+       tld = listener->private_data;
+
+       pool = ast_threadpool_create(listener, 0);
+       if (!pool) {
+               goto end;
+       }
+
+       ctd1 = complex_task_data_alloc();
+       ctd2 = complex_task_data_alloc();
+       if (!ctd1 || !ctd2) {
+               goto end;
+       }
+
+       ast_threadpool_push(pool, complex_task, ctd1);
+       ast_threadpool_push(pool, complex_task, ctd2);
+
+       ast_threadpool_set_size(pool, 4);
+
+       WAIT_WHILE(tld, tld->num_idle < 2);
+
+       res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
+       if (res == AST_TEST_FAIL) {
+               goto end;
+       }
+
+       ast_threadpool_set_size(pool, 1);
+
+       /* Shrinking the threadpool should kill off the two idle threads
+        * and one of the active threads.
+        */
+       WAIT_WHILE(tld, tld->num_idle > 0 || tld->num_active > 1);
+
+       res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
+       if (res == AST_TEST_FAIL) {
+               goto end;
+       }
+
+       /* The tasks are stalled until we poke them */
+       poke_worker(ctd1);
+       poke_worker(ctd2);
+
+       res = wait_for_complex_completion(ctd1);
+       if (res == AST_TEST_FAIL) {
+               goto end;
+       }
+       res = wait_for_complex_completion(ctd2);
+       if (res == AST_TEST_FAIL) {
+               goto end;
+       }
+
+       WAIT_WHILE(tld, tld->num_idle < 1);
+
+       res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
+
+end:
+       if (pool) {
+               ast_threadpool_shutdown(pool);
+       }
+       ao2_cleanup(listener);
+       ast_free(ctd1);
+       ast_free(ctd2);
+       return res;
+}
+
 static int unload_module(void)
 {
        ast_test_unregister(threadpool_push);
@@ -852,6 +944,7 @@ static int unload_module(void)
        ast_test_unregister(threadpool_one_thread_multiple_tasks);
        ast_test_unregister(threadpool_reactivation);
        ast_test_unregister(threadpool_task_distribution);
+       ast_test_unregister(threadpool_more_destruction);
        return 0;
 }
 
@@ -865,6 +958,7 @@ static int load_module(void)
        ast_test_register(threadpool_one_thread_multiple_tasks);
        ast_test_register(threadpool_reactivation);
        ast_test_register(threadpool_task_distribution);
+       ast_test_register(threadpool_more_destruction);
        return AST_MODULE_LOAD_SUCCESS;
 }