Add threadpool options and accompanying test.
authorMark Michelson <mmichelson@digium.com>
Mon, 10 Dec 2012 06:13:09 +0000 (06:13 +0000)
committerMark Michelson <mmichelson@digium.com>
Mon, 10 Dec 2012 06:13:09 +0000 (06:13 +0000)
The only test added so far is an idle thread timeout
option. This will greatly aid threadpool users who wish
to maintain a threadpool by allowing for idle threads to
die out as necessary.

Test passes.

git-svn-id: https://origsvn.digium.com/svn/asterisk/team/mmichelson/threadpool@377580 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/threadpool.h
main/threadpool.c
tests/test_threadpool.c

index 6bb0dcc..3f36341 100644 (file)
@@ -82,6 +82,18 @@ struct ast_threadpool_listener {
        void *private_data;
 };
 
+struct ast_threadpool_options {
+#define AST_THREADPOOL_OPTIONS_VERSION 1
+       /*! Version of thradpool options in use */
+       int version;
+       /* !
+        * \brief Time limit in seconds for idle threads
+        *
+        * A time of 0 or less will mean an infinite timeout.
+        */
+       int idle_timeout;
+};
+
 /*!
  * \brief Allocate a threadpool listener
  *
@@ -106,7 +118,8 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
  * \retval NULL Failed to create the threadpool
  * \retval non-NULL The newly-created threadpool
  */
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size);
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+               int initial_size, const struct ast_threadpool_options *options);
 
 /*!
  * \brief Set the number of threads for the thread pool
index 45e8638..18c1349 100644 (file)
@@ -95,6 +95,8 @@ struct ast_threadpool {
        struct ast_taskprocessor *control_tps;
        /*! True if the threadpool is in the processof shutting down */
        int shutting_down;
+       /*! Threadpool-specific options */
+       struct ast_threadpool_options options;
 };
 
 /*!
@@ -266,6 +268,32 @@ static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
        ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
 }
 
+static int queued_idle_thread_dead(void *data)
+{
+       struct thread_worker_pair *pair = data;
+
+       ao2_unlink(pair->pool->idle_threads, pair->worker);
+       threadpool_send_state_changed(pair->pool);
+
+       ao2_ref(pair, -1);
+       return 0;
+}
+
+static void threadpool_idle_thread_dead(struct ast_threadpool *pool,
+               struct worker_thread *worker)
+{
+       struct thread_worker_pair *pair;
+       SCOPED_AO2LOCK(lock, pool);
+       if (pool->shutting_down) {
+               return;
+       }
+       pair = thread_worker_pair_alloc(pool, worker);
+       if (!pair) {
+               return;
+       }
+       ast_taskprocessor_push(pool->control_tps, queued_idle_thread_dead, pair);
+}
+
 /*!
  * \brief Execute a task in the threadpool
  * 
@@ -749,7 +777,13 @@ struct ast_threadpool_listener *ast_threadpool_listener_alloc(
        return listener;
 }
 
-struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
+struct pool_options_pair {
+       struct ast_threadpool *pool;
+       struct ast_threadpool_options options;
+};
+
+struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener,
+               int initial_size, const struct ast_threadpool_options *options)
 {
        struct ast_threadpool *pool;
        struct ast_taskprocessor *tps;
@@ -771,6 +805,7 @@ struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *lis
        pool->tps = tps;
        ao2_ref(listener, +1);
        pool->listener = listener;
+       pool->options = *options;
        ast_threadpool_set_size(pool, initial_size);
        return pool;
 }
@@ -814,6 +849,8 @@ struct worker_thread {
        enum worker_state state;
        /*! A boolean used to determine if an idle thread should become active */
        int wake_up;
+       /*! Options for this threadpool */
+       struct ast_threadpool_options options;
 };
 
 /*!
@@ -864,7 +901,7 @@ static void worker_shutdown(struct worker_thread *worker)
 static void worker_thread_destroy(void *obj)
 {
        struct worker_thread *worker = obj;
-       ast_log(LOG_NOTICE, "Worker dying\n");
+       ast_debug(1, "Destroying worker thread\n");
        worker_shutdown(worker);
        ast_mutex_destroy(&worker->lock);
        ast_cond_destroy(&worker->cond);
@@ -909,6 +946,7 @@ static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
        worker->pool = pool;
        worker->thread = AST_PTHREADT_NULL;
        worker->state = ALIVE;
+       worker->options = pool->options;
        if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
                ast_log(LOG_ERROR, "Unable to start worker thread!\n");
                ao2_ref(worker, -1);
@@ -961,13 +999,28 @@ static void worker_active(struct worker_thread *worker)
  */
 static int worker_idle(struct worker_thread *worker)
 {
+       struct timeval start = ast_tvnow();
+       struct timespec end = {
+               .tv_sec = start.tv_sec + worker->options.idle_timeout,
+               .tv_nsec = start.tv_usec * 1000,
+       };
        SCOPED_MUTEX(lock, &worker->lock);
        if (worker->state != ALIVE) {
                return 0;
        }
        threadpool_active_thread_idle(worker->pool, worker);
        while (!worker->wake_up) {
-               ast_cond_wait(&worker->cond, lock);
+               if (worker->options.idle_timeout <= 0) {
+                       ast_cond_wait(&worker->cond, lock);
+               } else if (ast_cond_timedwait(&worker->cond, lock, &end) == ETIMEDOUT) {
+                       break;
+               }
+       }
+
+       if (!worker->wake_up) {
+               ast_debug(1, "Worker thread idle timeout reached. Dying.\n");
+               threadpool_idle_thread_dead(worker->pool, worker);
+               worker->state = DEAD;
        }
        worker->wake_up = 0;
        return worker->state == ALIVE;
index 373d0c0..5de5168 100644 (file)
@@ -257,6 +257,10 @@ AST_TEST_DEFINE(threadpool_push)
        struct ast_threadpool_listener *listener = NULL;
        struct simple_task_data *std = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -275,7 +279,7 @@ AST_TEST_DEFINE(threadpool_push)
                return AST_TEST_FAIL;
        }
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -306,6 +310,10 @@ AST_TEST_DEFINE(threadpool_thread_creation)
        struct ast_threadpool_listener *listener = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -325,7 +333,7 @@ AST_TEST_DEFINE(threadpool_thread_creation)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -353,6 +361,10 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
        struct ast_threadpool_listener *listener = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -372,7 +384,7 @@ AST_TEST_DEFINE(threadpool_thread_destruction)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -400,6 +412,62 @@ end:
        return res;
 }
 
+AST_TEST_DEFINE(threadpool_thread_timeout)
+{
+       struct ast_threadpool *pool = NULL;
+       struct ast_threadpool_listener *listener = NULL;
+       enum ast_test_result_state res = AST_TEST_FAIL;
+       struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 5,
+       };
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = "threadpool_thread_timeout";
+               info->category = "/main/threadpool/";
+               info->summary = "Test threadpool thread timeout";
+               info->description =
+                       "Ensure that a thread with a five second timeout dies as expected.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       listener = ast_threadpool_listener_alloc(&test_callbacks);
+       if (!listener) {
+               return AST_TEST_FAIL;
+       }
+       tld = listener->private_data;
+
+       pool = ast_threadpool_create(listener, 0, &options);
+       if (!pool) {
+               goto end;
+       }
+
+       ast_threadpool_set_size(pool, 1);
+
+       WAIT_WHILE(tld, tld->num_idle < 1);
+
+       res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
+       if (res == AST_TEST_FAIL) {
+               goto end;
+       }
+
+       /* The thread should time out after 5 seconds */
+       WAIT_WHILE(tld, tld->num_idle > 0);
+
+       res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
+
+end:
+       if (pool) {
+               ast_threadpool_shutdown(pool);
+       }
+       ao2_cleanup(listener);
+       return res;
+}
+
 AST_TEST_DEFINE(threadpool_one_task_one_thread)
 {
        struct ast_threadpool *pool = NULL;
@@ -407,6 +475,10 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
        struct simple_task_data *std = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -426,7 +498,7 @@ AST_TEST_DEFINE(threadpool_one_task_one_thread)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -476,6 +548,10 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
        struct simple_task_data *std = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -495,7 +571,7 @@ AST_TEST_DEFINE(threadpool_one_thread_one_task)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -545,6 +621,10 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
        struct simple_task_data *std3 = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -564,7 +644,7 @@ AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -626,6 +706,10 @@ AST_TEST_DEFINE(threadpool_reactivation)
        struct simple_task_data *std2 = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -647,7 +731,7 @@ AST_TEST_DEFINE(threadpool_reactivation)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -773,6 +857,10 @@ AST_TEST_DEFINE(threadpool_task_distribution)
        struct complex_task_data *ctd2 = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -793,7 +881,7 @@ AST_TEST_DEFINE(threadpool_task_distribution)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -851,6 +939,10 @@ AST_TEST_DEFINE(threadpool_more_destruction)
        struct complex_task_data *ctd2 = NULL;
        enum ast_test_result_state res = AST_TEST_FAIL;
        struct test_listener_data *tld;
+       struct ast_threadpool_options options = {
+               .version = AST_THREADPOOL_OPTIONS_VERSION,
+               .idle_timeout = 0,
+       };
 
        switch (cmd) {
        case TEST_INIT:
@@ -873,7 +965,7 @@ AST_TEST_DEFINE(threadpool_more_destruction)
        }
        tld = listener->private_data;
 
-       pool = ast_threadpool_create(listener, 0);
+       pool = ast_threadpool_create(listener, 0, &options);
        if (!pool) {
                goto end;
        }
@@ -940,6 +1032,7 @@ static int unload_module(void)
        ast_test_unregister(threadpool_push);
        ast_test_unregister(threadpool_thread_creation);
        ast_test_unregister(threadpool_thread_destruction);
+       ast_test_unregister(threadpool_thread_timeout);
        ast_test_unregister(threadpool_one_task_one_thread);
        ast_test_unregister(threadpool_one_thread_one_task);
        ast_test_unregister(threadpool_one_thread_multiple_tasks);
@@ -954,6 +1047,7 @@ static int load_module(void)
        ast_test_register(threadpool_push);
        ast_test_register(threadpool_thread_creation);
        ast_test_register(threadpool_thread_destruction);
+       ast_test_register(threadpool_thread_timeout);
        ast_test_register(threadpool_one_task_one_thread);
        ast_test_register(threadpool_one_thread_one_task);
        ast_test_register(threadpool_one_thread_multiple_tasks);