This now compiles.
[asterisk/asterisk.git] / main / threadpool.c
index 362c765..7b7df4e 100644 (file)
 
 #include "asterisk/threadpool.h"
 #include "asterisk/taskprocessor.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/utils.h"
 
-struct ast_threadpool;
+#define THREAD_BUCKETS 89
+
+static int id_counter;
+
+struct ast_threadpool {
+       struct ast_threadpool_listener *listener;
+       struct ao2_container *active_threads;
+       struct ao2_container *idle_threads;
+       struct ao2_container *zombie_threads;
+       struct ast_taskprocessor *tps;
+       struct ast_taskprocessor *control_tps;
+};
 
 enum worker_state {
        ALIVE,
@@ -31,29 +44,169 @@ enum worker_state {
 };
 
 struct worker_thread {
+       int id;
        ast_cond_t cond;
        ast_mutex_t lock;
        pthread_t thread;
        struct ast_threadpool *pool;
-       AST_LIST_ENTRY(struct worker_thread) next;
-       int wake_up;
        enum worker_state state;
+       int wake_up;
+};
+
+static int worker_thread_hash(const void *obj, int flags)
+{
+       const struct worker_thread *worker = obj;
+
+       return worker->id;
+}
+
+static int worker_thread_cmp(void *obj, void *arg, int flags)
+{
+       struct worker_thread *worker1 = obj;
+       struct worker_thread *worker2 = arg;
+
+       return worker1->id == worker2->id ? CMP_MATCH : 0;
+}
+
+static void worker_thread_destroy(void *obj)
+{
+       struct worker_thread *worker = obj;
+       ast_mutex_destroy(&worker->lock);
+       ast_cond_destroy(&worker->cond);
+}
+
+static int worker_active(struct worker_thread *worker);
+
+static void *worker_start(void *arg)
+{
+       struct worker_thread *worker = arg;
+
+       worker_active(worker);
+       return NULL;
+}
+
+static struct worker_thread *worker_thread_alloc(struct ast_threadpool *pool)
+{
+       struct worker_thread *worker = ao2_alloc(sizeof(*worker), worker_thread_destroy);
+       if (!worker) {
+               /* XXX Dangit! */
+               return NULL;
+       }
+       worker->id = ast_atomic_fetchadd_int(&id_counter, 1);
+       ast_mutex_init(&worker->lock);
+       ast_cond_init(&worker->cond, NULL);
+       worker->pool = pool;
+       worker->thread = AST_PTHREADT_NULL;
+       worker->state = ALIVE;
+       if (ast_pthread_create(&worker->thread, NULL, worker_start, worker) < 0) {
+               /* XXX Poop! */
+               ao2_ref(worker, -1);
+               return NULL;
+       }
+       return worker;
+}
+
+static void threadpool_send_state_changed(struct ast_threadpool *pool)
+{
+       int active_size = ao2_container_count(pool->active_threads);
+       int idle_size = ao2_container_count(pool->idle_threads);
+       int zombie_size = ao2_container_count(pool->zombie_threads);
+
+       pool->listener->callbacks->state_changed(pool->listener, active_size, idle_size, zombie_size);
+}
+
+struct thread_worker_pair {
+       struct ast_threadpool *pool;
+       struct worker_thread *worker;
 };
 
+static void thread_worker_pair_destructor(void *obj)
+{
+       struct thread_worker_pair *pair = obj;
+       ao2_ref(pair->pool, -1);
+       ao2_ref(pair->worker, -1);
+}
+
+static struct thread_worker_pair *thread_worker_pair_init(struct ast_threadpool *pool,
+               struct worker_thread *worker)
+{
+       struct thread_worker_pair *pair = ao2_alloc(sizeof(*pair), thread_worker_pair_destructor);
+       if (!pair) {
+               /*XXX Crap */
+               return NULL;
+       }
+       ao2_ref(pool, +1);
+       pair->pool = pool;
+       ao2_ref(worker, +1);
+       pair->worker = worker;
+       return pair;
+}
+
+static int queued_active_thread_idle(void *data)
+{
+       struct thread_worker_pair *pair = data;
+
+       ao2_link(pair->pool->idle_threads, pair->worker);
+       ao2_unlink(pair->pool->active_threads, pair->worker);
+
+       threadpool_send_state_changed(pair->pool);
+
+       ao2_ref(pair, -1);
+       return 0;
+}
+
+static void threadpool_active_thread_idle(struct ast_threadpool *pool,
+               struct worker_thread *worker)
+{
+       struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
+       if (!pair) {
+               /*XXX Crap */
+               return;
+       }
+       ast_taskprocessor_push(pool->control_tps, queued_active_thread_idle, pair);
+}
+
+static int queued_zombie_thread_dead(void *data)
+{
+       struct thread_worker_pair *pair = data;
+
+       ao2_unlink(pair->pool->zombie_threads, pair->worker);
+       threadpool_send_state_changed(pair->pool);
+
+       ao2_ref(pair, -1);
+       return 0;
+}
+
+static void threadpool_zombie_thread_dead(struct ast_threadpool *pool,
+               struct worker_thread *worker)
+{
+       struct thread_worker_pair *pair = thread_worker_pair_init(pool, worker);
+       if (!pair) {
+               /* XXX Crap */
+               return;
+       }
+       ast_taskprocessor_push(pool->control_tps, queued_zombie_thread_dead, pair);
+}
+
 static int worker_idle(struct worker_thread *worker)
 {
        SCOPED_MUTEX(lock, &worker->lock);
        if (worker->state != ALIVE) {
-               return false;
+               return 0;
        }
        threadpool_active_thread_idle(worker->pool, worker);
        while (!worker->wake_up) {
                ast_cond_wait(&worker->cond, lock);
        }
-       worker->wake_up = false;
+       worker->wake_up = 0;
        return worker->state == ALIVE;
 }
 
+static int threadpool_execute(struct ast_threadpool *pool)
+{
+       return ast_taskprocessor_execute(pool->tps);
+}
+
 static int worker_active(struct worker_thread *worker)
 {
        int alive = 1;
@@ -78,39 +231,168 @@ static int worker_active(struct worker_thread *worker)
        return 0;
 }
 
-struct ast_threadpool {
-       struct ast_threadpool_listener *threadpool_listener;
-       int active_threads;
-       int idle_threads;
-       int zombie_threads;
+static void worker_set_state(struct worker_thread *worker, enum worker_state state)
+{
+       SCOPED_MUTEX(lock, &worker->lock);
+       worker->state = state;
+       worker->wake_up = 1;
+       ast_cond_signal(&worker->cond);
+}
+
+static int worker_shutdown(void *obj, void *arg, int flags)
+{
+       struct worker_thread *worker = obj;
+
+       worker_set_state(worker, DEAD);
+       if (worker->thread != AST_PTHREADT_NULL) {
+               pthread_join(worker->thread, NULL);
+               worker->thread = AST_PTHREADT_NULL;
+       }
+       return 0;
+}
+
+static void threadpool_tps_listener_destroy(void *private_data)
+{
+       struct ast_threadpool *pool = private_data;
+       /* XXX Probably should let the listener know we're being destroyed? */
+
+       /* Threads should all be shut down by now, so this should be a painless
+        * operation
+        */
+       ao2_ref(pool->active_threads, -1);
+       ao2_ref(pool->idle_threads, -1);
+       ao2_ref(pool->zombie_threads, -1);
+       ao2_ref(pool->listener, -1);
+       ao2_ref(pool, -1);
 }
 
 static void *threadpool_tps_listener_alloc(struct ast_taskprocessor_listener *listener)
 {
-       RAII_VAR(ast_threadpool *, threadpool,
-                       ao2_alloc(sizeof(*threadpool), threadpool_destroy), ao2_cleanup);
+       RAII_VAR(struct ast_threadpool *, pool,
+                       ao2_alloc(sizeof(*pool), threadpool_tps_listener_destroy), ao2_cleanup);
+
+       pool->control_tps = ast_taskprocessor_get("CHANGE THIS", TPS_REF_DEFAULT);
+       if (!pool->control_tps) {
+               return NULL;
+       }
+       pool->active_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+       if (!pool->active_threads) {
+               return NULL;
+       }
+       pool->idle_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+       if (!pool->idle_threads) {
+               return NULL;
+       }
+       pool->zombie_threads = ao2_container_alloc(THREAD_BUCKETS, worker_thread_hash, worker_thread_cmp);
+       if (!pool->zombie_threads) {
+               return NULL;
+       }
+
+       pool->tps = listener->tps;
 
-       return threadpool;
+       ao2_ref(pool, +1);
+       return pool;
 }
 
-static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener)
+struct task_pushed_data {
+       struct ast_threadpool *pool;
+       int was_empty;
+};
+
+static void task_pushed_data_destroy(void *obj)
 {
-       /* XXX stub */
+       struct task_pushed_data *tpd = obj;
+       ao2_ref(tpd->pool, -1);
 }
 
-static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
+static struct task_pushed_data *task_pushed_data_alloc(struct ast_threadpool *pool,
+               int was_empty)
 {
-       /* XXX stub */
+       struct task_pushed_data *tpd = ao2_alloc(sizeof(*tpd),
+                       task_pushed_data_destroy);
+
+       if (!tpd) {
+               return NULL;
+       }
+       ao2_ref(pool, +1);
+       tpd->pool = pool;
+       tpd->was_empty = was_empty;
+       return tpd;
 }
 
-static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
+static int activate_threads(void *obj, void *arg, int flags)
+{
+       struct worker_thread *worker = obj;
+       struct ast_threadpool *pool = arg;
+
+       ao2_link(pool->active_threads, worker);
+       worker_set_state(worker, ALIVE);
+       return 0;
+}
+
+static int handle_task_pushed(void *data)
+{
+       struct task_pushed_data *tpd = data;
+       struct ast_threadpool *pool = tpd->pool;
+       int was_empty = tpd->was_empty;
+
+       pool->listener->callbacks->tps_task_pushed(pool->listener, was_empty);
+       ao2_callback(pool->idle_threads, OBJ_UNLINK, activate_threads, pool);
+       ao2_ref(tpd, -1);
+       return 0;
+}
+
+static void threadpool_tps_task_pushed(struct ast_taskprocessor_listener *listener,
+               int was_empty)
+{
+       struct ast_threadpool *pool = listener->private_data;
+       struct task_pushed_data *tpd = task_pushed_data_alloc(pool, was_empty);
+
+       if (!tpd) {
+               /* XXX Drat! */
+               return;
+       }
+
+       ast_taskprocessor_push(pool->control_tps, handle_task_pushed, tpd);
+}
+
+static int handle_emptied(void *data)
+{
+       struct ast_threadpool *pool = data;
+
+       pool->listener->callbacks->emptied(pool->listener);
+       ao2_ref(pool, -1);
+       return 0;
+}
+
+static void threadpool_tps_emptied(struct ast_taskprocessor_listener *listener)
 {
-       /* XXX stub */
+       struct ast_threadpool *pool = listener->private_data;
+
+       ao2_ref(pool, +1);
+       ast_taskprocessor_push(pool->control_tps, handle_emptied, pool);
 }
 
-static void threadpool_tps_listener_destroy(struct ast_taskprocessor_listener *listener)
+static void threadpool_tps_shutdown(struct ast_taskprocessor_listener *listener)
 {
-       /* XXX stub */
+       /*
+        * The threadpool triggers the taskprocessor to shut down. As a result,
+        * we have the freedom of shutting things down in three stages:
+        *
+        * 1) Before the tasprocessor is shut down
+        * 2) During taskprocessor shutdown (here)
+        * 3) After taskprocessor shutdown
+        *
+        * In the spirit of the taskprocessor shutdown, this would be
+        * where we make sure that all the worker threads are no longer
+        * executing. We could just do this before we even shut down
+        * the taskprocessor, but this feels more "right".
+        */
+
+       struct ast_threadpool *pool = listener->private_data;
+       ao2_callback(pool->active_threads, 0, worker_shutdown, NULL);
+       ao2_callback(pool->idle_threads, 0, worker_shutdown, NULL);
+       ao2_callback(pool->zombie_threads, 0, worker_shutdown, NULL);
 }
 
 static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callbacks = {
@@ -121,41 +403,158 @@ static struct ast_taskprocessor_listener_callbacks threadpool_tps_listener_callb
        .destroy = threadpool_tps_listener_destroy,
 };
 
-/*!
- * \brief Allocate the taskprocessor to be used for the threadpool
- *
- * We use a custom taskprocessor listener. We allocate our custom
- * listener and then create a taskprocessor.
- */
-static struct ast_taskprocessor_listener *threadpool_tps_alloc(void)
+static void grow(struct ast_threadpool *pool, int delta)
 {
-       RAII_VAR(struct threadpool_tps_listener *, tps_listener,
-                       ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
-                       ao2_cleanup);
+       int i;
+       for (i = 0; i < delta; ++i) {
+               struct worker_thread *worker = worker_thread_alloc(pool);
+               if (!worker) {
+                       /* XXX Abandon */
+                       return;
+               }
+               ao2_link(pool->active_threads, worker);
+       }
+}
 
-       if (!tps_listener) {
+static int kill_threads(void *obj, void *arg, int flags)
+{
+       struct worker_thread *worker = obj;
+       int *num_to_kill = arg;
+
+       if ((*num_to_kill)-- > 0) {
+               worker_shutdown(worker, arg, flags);
+               return CMP_MATCH;
+       } else {
+               return CMP_STOP;
+       }
+}
+
+static int zombify_threads(void *obj, void *arg, void *data, int flags)
+{
+       struct worker_thread *worker = obj;
+       struct ast_threadpool *pool = arg;
+       int *num_to_zombify = data;
+
+       if ((*num_to_zombify)-- > 0) {
+               ao2_link(pool->zombie_threads, worker);
+               worker_set_state(worker, ZOMBIE);
+               return CMP_MATCH;
+       } else {
+               return CMP_STOP;
+       }
+}
+
+static void shrink(struct ast_threadpool *pool, int delta)
+{
+       /* 
+        * Preference is to kill idle threads, but
+        * we'll move on to deactivating active threads
+        * if we have to
+        */
+       int idle_threads = ao2_container_count(pool->idle_threads);
+       int idle_threads_to_kill = MIN(delta, idle_threads);
+       int active_threads_to_zombify = delta - idle_threads_to_kill;
+
+       ao2_callback(pool->idle_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
+                       kill_threads, &idle_threads_to_kill);
+
+       ao2_callback_data(pool->active_threads, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE | OBJ_NOLOCK,
+                       zombify_threads, pool, &active_threads_to_zombify);
+}
+
+struct set_size_data {
+       struct ast_threadpool *pool;
+       int size;
+};
+
+static void set_size_data_destroy(void *obj)
+{
+       struct set_size_data *ssd = obj;
+       ao2_ref(ssd->pool, -1);
+}
+
+static struct set_size_data *set_size_data_alloc(struct ast_threadpool *pool,
+               int size)
+{
+       struct set_size_data *ssd = ao2_alloc(sizeof(*ssd), set_size_data_destroy);
+       if (!ssd) {
+               /* XXX Crap */
                return NULL;
        }
 
-       return ast_taskprocessor_create_with_listener(tps_listener);
+       ao2_ref(pool, +1);
+       ssd->pool = pool;
+       ssd->size = size;
+       return ssd;
 }
 
-void ast_threadpool_set_size(struct ast_threadpool *pool, int size)
+static int queued_set_size(void *data)
 {
+       struct set_size_data *ssd = data;
+       struct ast_threadpool *pool = ssd->pool;
+       int num_threads = ssd->size;
+
+       /* We don't count zombie threads as being "live when potentially resizing */
+       int current_size = ao2_container_count(pool->active_threads) +
+               ao2_container_count(pool->idle_threads);
+
+       if (current_size == num_threads) {
+               return 0;
+       }
+
+       if (current_size < num_threads) {
+               grow(pool, num_threads - current_size);
+       } else {
+               shrink(pool, current_size - num_threads);
+       }
+
+       threadpool_send_state_changed(pool);
+       ao2_ref(ssd, -1);
+       return 0;
+}
+
+void ast_threadpool_set_size(struct ast_threadpool *pool, unsigned int size)
+{
+       struct set_size_data *ssd;
+
+       ssd = set_size_data_alloc(pool, size);
+       if (!ssd) {
+               /* XXX *groan* */
+               return;
+       }
+
+       ast_taskprocessor_push(pool->control_tps, queued_set_size, ssd);
 }
 
 struct ast_threadpool *ast_threadpool_create(struct ast_threadpool_listener *listener, int initial_size)
 {
        struct ast_threadpool *pool;
-       RAII_VAR(ast_taskprocessor *, tps, threadpool_tps_alloc(), ast_taskprocessor_unreference);
+       struct ast_taskprocessor *tps;
+       RAII_VAR(struct ast_taskprocessor_listener *, tps_listener,
+                       ast_taskprocessor_listener_alloc(&threadpool_tps_listener_callbacks),
+                       ao2_cleanup);
+
+       if (!tps_listener) {
+               return NULL;
+       }
+
+       tps = ast_taskprocessor_create_with_listener("XXX CHANGE THIS XXX", tps_listener);
 
        if (!tps) {
                return NULL;
        }
 
-       pool = tps->listener->private_data;
-       pool->tps = tps;
+       pool = tps_listener->private_data;
        ast_threadpool_set_size(pool, initial_size);
-
        return pool;
 }
+
+void ast_threadpool_shutdown(struct ast_threadpool *pool)
+{
+       /* Pretty simple really. We just shut down the
+        * taskprocessors and everything else just
+        * takes care of itself via the taskprocessor callbacks
+        */
+       ast_taskprocessor_unreference(pool->control_tps);
+       ast_taskprocessor_unreference(pool->tps);
+}