Add auto-increment option and accompanying test.
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/test.h"
35 #include "asterisk/threadpool.h"
36 #include "asterisk/module.h"
37 #include "asterisk/lock.h"
38 #include "asterisk/astobj2.h"
39 #include "asterisk/logger.h"
40
41 struct test_listener_data {
42         int num_active;
43         int num_idle;
44         int task_pushed;
45         int num_tasks;
46         int empty_notice;
47         int was_empty;
48         ast_mutex_t lock;
49         ast_cond_t cond;
50 };
51
52 static void *test_alloc(struct ast_threadpool_listener *listener)
53 {
54         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
55         if (!tld) {
56                 return NULL;
57         }
58         ast_mutex_init(&tld->lock);
59         ast_cond_init(&tld->cond, NULL);
60         return tld;
61 }
62
63 static void test_state_changed(struct ast_threadpool *pool,
64                 struct ast_threadpool_listener *listener,
65                 int active_threads,
66                 int idle_threads)
67 {
68         struct test_listener_data *tld = listener->private_data;
69         SCOPED_MUTEX(lock, &tld->lock);
70         tld->num_active = active_threads;
71         tld->num_idle = idle_threads;
72         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
73         ast_cond_signal(&tld->cond);
74 }
75
76 static void test_task_pushed(struct ast_threadpool *pool,
77                 struct ast_threadpool_listener *listener,
78                 int was_empty)
79 {
80         struct test_listener_data *tld = listener->private_data;
81         SCOPED_MUTEX(lock, &tld->lock);
82         tld->task_pushed = 1;
83         ++tld->num_tasks;
84         tld->was_empty = was_empty;
85         ast_cond_signal(&tld->cond);
86 }
87
88 static void test_emptied(struct ast_threadpool *pool,
89                 struct ast_threadpool_listener *listener)
90 {
91         struct test_listener_data *tld = listener->private_data;
92         SCOPED_MUTEX(lock, &tld->lock);
93         tld->empty_notice = 1;
94         ast_cond_signal(&tld->cond);
95 }
96
97 static void test_destroy(void *private_data)
98 {
99         struct test_listener_data *tld = private_data;
100         ast_cond_destroy(&tld->cond);
101         ast_mutex_destroy(&tld->lock);
102         ast_free(tld);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .alloc = test_alloc,
107         .state_changed = test_state_changed,
108         .task_pushed = test_task_pushed,
109         .emptied = test_emptied,
110         .destroy = test_destroy,
111 };
112
113 struct simple_task_data {
114         int task_executed;
115         ast_mutex_t lock;
116         ast_cond_t cond;
117 };
118
119 static struct simple_task_data *simple_task_data_alloc(void)
120 {
121         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
122
123         if (!std) {
124                 return NULL;
125         }
126         ast_mutex_init(&std->lock);
127         ast_cond_init(&std->cond, NULL);
128         return std;
129 }
130
131 static int simple_task(void *data)
132 {
133         struct simple_task_data *std = data;
134         SCOPED_MUTEX(lock, &std->lock);
135         std->task_executed = 1;
136         ast_cond_signal(&std->cond);
137         return 0;
138 }
139
140 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
141 {
142         struct timeval start = ast_tvnow();
143         struct timespec end = {
144                 .tv_sec = start.tv_sec + 5,
145                 .tv_nsec = start.tv_usec * 1000
146         };
147         enum ast_test_result_state res = AST_TEST_PASS;
148         SCOPED_MUTEX(lock, &tld->lock);
149
150         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
151                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
152                         break;
153                 }
154         }
155
156         if (tld->num_active != num_active && tld->num_idle != num_idle) {
157                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
158                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
159                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
160                 res = AST_TEST_FAIL;
161         }
162
163         return res;
164 }
165
166 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
167 {
168         struct test_listener_data *tld = listener->private_data;
169         struct timeval start = ast_tvnow();
170         struct timespec end = {
171                 .tv_sec = start.tv_sec + 5,
172                 .tv_nsec = start.tv_usec * 1000
173         };
174         SCOPED_MUTEX(lock, &tld->lock);
175
176         while (!tld->task_pushed) {
177                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
178                         break;
179                 }
180         }
181 }
182
183 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
184 {
185         struct timeval start = ast_tvnow();
186         struct timespec end = {
187                 .tv_sec = start.tv_sec + 5,
188                 .tv_nsec = start.tv_usec * 1000
189         };
190         enum ast_test_result_state res = AST_TEST_PASS;
191         SCOPED_MUTEX(lock, &std->lock);
192
193         while (!std->task_executed) {
194                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
195                         break;
196                 }
197         }
198
199         if (!std->task_executed) {
200                 ast_test_status_update(test, "Task execution did not occur\n");
201                 res = AST_TEST_FAIL;
202         }
203         return res;
204 }
205
206 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
207 {
208         struct timeval start = ast_tvnow();
209         struct timespec end = {
210                 .tv_sec = start.tv_sec + 5,
211                 .tv_nsec = start.tv_usec * 1000
212         };
213         enum ast_test_result_state res = AST_TEST_PASS;
214         SCOPED_MUTEX(lock, &tld->lock);
215
216         while (!tld->empty_notice) {
217                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
218                         break;
219                 }
220         }
221
222         if (!tld->empty_notice) {
223                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
224                 res = AST_TEST_FAIL;
225         }
226
227         return res;
228 }
229
230 static enum ast_test_result_state listener_check(
231                 struct ast_test *test,
232                 struct ast_threadpool_listener *listener,
233                 int task_pushed,
234                 int was_empty,
235                 int num_tasks,
236                 int num_active,
237                 int num_idle,
238                 int empty_notice)
239 {
240         struct test_listener_data *tld = listener->private_data;
241         enum ast_test_result_state res = AST_TEST_PASS;
242
243         if (tld->task_pushed != task_pushed) {
244                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
245                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
246                 res = AST_TEST_FAIL;
247         }
248         if (tld->was_empty != was_empty) {
249                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
250                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
251                 res = AST_TEST_FAIL;
252         }
253         if (tld->num_tasks!= num_tasks) {
254                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
255                                 num_tasks, tld->num_tasks);
256                 res = AST_TEST_FAIL;
257         }
258         if (tld->num_active != num_active) {
259                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
260                                 num_active, tld->num_active);
261                 res = AST_TEST_FAIL;
262         }
263         if (tld->num_idle != num_idle) {
264                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
265                                 num_idle, tld->num_idle);
266                 res = AST_TEST_FAIL;
267         }
268         if (tld->empty_notice != empty_notice) {
269                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
270                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
271                 res = AST_TEST_FAIL;
272         }
273
274         return res;
275 }
276
277 AST_TEST_DEFINE(threadpool_push)
278 {
279         struct ast_threadpool *pool = NULL;
280         struct ast_threadpool_listener *listener = NULL;
281         struct simple_task_data *std = NULL;
282         enum ast_test_result_state res = AST_TEST_FAIL;
283         struct ast_threadpool_options options = {
284                 .version = AST_THREADPOOL_OPTIONS_VERSION,
285                 .idle_timeout = 0,
286                 .auto_increment = 0,
287         };
288
289         switch (cmd) {
290         case TEST_INIT:
291                 info->name = "push";
292                 info->category = "/main/threadpool/";
293                 info->summary = "Test task";
294                 info->description =
295                         "Basic threadpool test";
296                 return AST_TEST_NOT_RUN;
297         case TEST_EXECUTE:
298                 break;
299         }
300
301         listener = ast_threadpool_listener_alloc(&test_callbacks);
302         if (!listener) {
303                 return AST_TEST_FAIL;
304         }
305
306         pool = ast_threadpool_create(info->name, listener, 0, &options);
307         if (!pool) {
308                 goto end;
309         }
310
311         std = simple_task_data_alloc();
312         if (!std) {
313                 goto end;
314         }
315
316         ast_threadpool_push(pool, simple_task, std);
317
318         wait_for_task_pushed(listener);
319
320         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
321
322 end:
323         if (pool) {
324                 ast_threadpool_shutdown(pool);
325         }
326         ao2_cleanup(listener);
327         ast_free(std);
328         return res;
329 }
330
331 AST_TEST_DEFINE(threadpool_thread_creation)
332 {
333         struct ast_threadpool *pool = NULL;
334         struct ast_threadpool_listener *listener = NULL;
335         enum ast_test_result_state res = AST_TEST_FAIL;
336         struct test_listener_data *tld;
337         struct ast_threadpool_options options = {
338                 .version = AST_THREADPOOL_OPTIONS_VERSION,
339                 .idle_timeout = 0,
340                 .auto_increment = 0,
341         };
342
343         switch (cmd) {
344         case TEST_INIT:
345                 info->name = "thread_creation";
346                 info->category = "/main/threadpool/";
347                 info->summary = "Test threadpool thread creation";
348                 info->description =
349                         "Ensure that threads can be added to a threadpool";
350                 return AST_TEST_NOT_RUN;
351         case TEST_EXECUTE:
352                 break;
353         }
354
355         listener = ast_threadpool_listener_alloc(&test_callbacks);
356         if (!listener) {
357                 return AST_TEST_FAIL;
358         }
359         tld = listener->private_data;
360
361         pool = ast_threadpool_create(info->name, listener, 0, &options);
362         if (!pool) {
363                 goto end;
364         }
365
366         /* Now let's create a thread. It should start active, then go
367          * idle immediately
368          */
369         ast_threadpool_set_size(pool, 1);
370
371         res = wait_until_thread_state(test, tld, 0, 1);
372
373 end:
374         if (pool) {
375                 ast_threadpool_shutdown(pool);
376         }
377         ao2_cleanup(listener);
378         return res;
379 }
380
381 AST_TEST_DEFINE(threadpool_thread_destruction)
382 {
383         struct ast_threadpool *pool = NULL;
384         struct ast_threadpool_listener *listener = NULL;
385         enum ast_test_result_state res = AST_TEST_FAIL;
386         struct test_listener_data *tld;
387         struct ast_threadpool_options options = {
388                 .version = AST_THREADPOOL_OPTIONS_VERSION,
389                 .idle_timeout = 0,
390                 .auto_increment = 0,
391         };
392
393         switch (cmd) {
394         case TEST_INIT:
395                 info->name = "thread_destruction";
396                 info->category = "/main/threadpool/";
397                 info->summary = "Test threadpool thread destruction";
398                 info->description =
399                         "Ensure that threads are properly destroyed in a threadpool";
400                 return AST_TEST_NOT_RUN;
401         case TEST_EXECUTE:
402                 break;
403         }
404
405         listener = ast_threadpool_listener_alloc(&test_callbacks);
406         if (!listener) {
407                 return AST_TEST_FAIL;
408         }
409         tld = listener->private_data;
410
411         pool = ast_threadpool_create(info->name, listener, 0, &options);
412         if (!pool) {
413                 goto end;
414         }
415
416         ast_threadpool_set_size(pool, 3);
417
418         res = wait_until_thread_state(test, tld, 0, 3);
419         if (res == AST_TEST_FAIL) {
420                 goto end;
421         }
422
423         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
424         if (res == AST_TEST_FAIL) {
425                 goto end;
426         }
427
428         ast_threadpool_set_size(pool, 2);
429
430         res = wait_until_thread_state(test, tld, 0, 2);
431
432 end:
433         if (pool) {
434                 ast_threadpool_shutdown(pool);
435         }
436         ao2_cleanup(listener);
437         return res;
438 }
439
440 AST_TEST_DEFINE(threadpool_thread_timeout)
441 {
442         struct ast_threadpool *pool = NULL;
443         struct ast_threadpool_listener *listener = NULL;
444         enum ast_test_result_state res = AST_TEST_FAIL;
445         struct test_listener_data *tld;
446         struct ast_threadpool_options options = {
447                 .version = AST_THREADPOOL_OPTIONS_VERSION,
448                 .idle_timeout = 2,
449                 .auto_increment = 0,
450         };
451
452         switch (cmd) {
453         case TEST_INIT:
454                 info->name = "thread_timeout";
455                 info->category = "/main/threadpool/";
456                 info->summary = "Test threadpool thread timeout";
457                 info->description =
458                         "Ensure that a thread with a two second timeout dies as expected.";
459                 return AST_TEST_NOT_RUN;
460         case TEST_EXECUTE:
461                 break;
462         }
463
464         listener = ast_threadpool_listener_alloc(&test_callbacks);
465         if (!listener) {
466                 return AST_TEST_FAIL;
467         }
468         tld = listener->private_data;
469
470         pool = ast_threadpool_create(info->name, listener, 0, &options);
471         if (!pool) {
472                 goto end;
473         }
474
475         ast_threadpool_set_size(pool, 1);
476
477         res = wait_until_thread_state(test, tld, 0, 1);
478         if (res == AST_TEST_FAIL) {
479                 goto end;
480         }
481
482         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
483         if (res == AST_TEST_FAIL) {
484                 goto end;
485         }
486
487         res = wait_until_thread_state(test, tld, 0, 0);
488         if (res == AST_TEST_FAIL) {
489                 goto end;
490         }
491
492         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
493
494 end:
495         if (pool) {
496                 ast_threadpool_shutdown(pool);
497         }
498         ao2_cleanup(listener);
499         return res;
500 }
501
502 AST_TEST_DEFINE(threadpool_one_task_one_thread)
503 {
504         struct ast_threadpool *pool = NULL;
505         struct ast_threadpool_listener *listener = NULL;
506         struct simple_task_data *std = NULL;
507         enum ast_test_result_state res = AST_TEST_FAIL;
508         struct test_listener_data *tld;
509         struct ast_threadpool_options options = {
510                 .version = AST_THREADPOOL_OPTIONS_VERSION,
511                 .idle_timeout = 0,
512                 .auto_increment = 0,
513         };
514
515         switch (cmd) {
516         case TEST_INIT:
517                 info->name = "one_task_one_thread";
518                 info->category = "/main/threadpool/";
519                 info->summary = "Test a single task with a single thread";
520                 info->description =
521                         "Push a task into an empty threadpool, then add a thread to the pool.";
522                 return AST_TEST_NOT_RUN;
523         case TEST_EXECUTE:
524                 break;
525         }
526
527         listener = ast_threadpool_listener_alloc(&test_callbacks);
528         if (!listener) {
529                 return AST_TEST_FAIL;
530         }
531         tld = listener->private_data;
532
533         pool = ast_threadpool_create(info->name, listener, 0, &options);
534         if (!pool) {
535                 goto end;
536         }
537
538         std = simple_task_data_alloc();
539         if (!std) {
540                 goto end;
541         }
542
543         ast_threadpool_push(pool, simple_task, std);
544
545         ast_threadpool_set_size(pool, 1);
546
547         /* Threads added to the pool are active when they start,
548          * so the newly-created thread should immediately execute
549          * the waiting task.
550          */
551         res = wait_for_completion(test, std);
552         if (res == AST_TEST_FAIL) {
553                 goto end;
554         }
555
556         res = wait_for_empty_notice(test, tld);
557         if (res == AST_TEST_FAIL) {
558                 goto end;
559         }
560         
561         /* After completing the task, the thread should go idle */
562         res = wait_until_thread_state(test, tld, 0, 1);
563         if (res == AST_TEST_FAIL) {
564                 goto end;
565         }
566
567         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
568
569 end:
570         if (pool) {
571                 ast_threadpool_shutdown(pool);
572         }
573         ao2_cleanup(listener);
574         ast_free(std);
575         return res;
576
577 }
578
579 AST_TEST_DEFINE(threadpool_one_thread_one_task)
580 {
581         struct ast_threadpool *pool = NULL;
582         struct ast_threadpool_listener *listener = NULL;
583         struct simple_task_data *std = NULL;
584         enum ast_test_result_state res = AST_TEST_FAIL;
585         struct test_listener_data *tld;
586         struct ast_threadpool_options options = {
587                 .version = AST_THREADPOOL_OPTIONS_VERSION,
588                 .idle_timeout = 0,
589                 .auto_increment = 0,
590         };
591
592         switch (cmd) {
593         case TEST_INIT:
594                 info->name = "one_thread_one_task";
595                 info->category = "/main/threadpool/";
596                 info->summary = "Test a single thread with a single task";
597                 info->description =
598                         "Add a thread to the pool and then push a task to it.";
599                 return AST_TEST_NOT_RUN;
600         case TEST_EXECUTE:
601                 break;
602         }
603
604         listener = ast_threadpool_listener_alloc(&test_callbacks);
605         if (!listener) {
606                 return AST_TEST_FAIL;
607         }
608         tld = listener->private_data;
609
610         pool = ast_threadpool_create(info->name, listener, 0, &options);
611         if (!pool) {
612                 goto end;
613         }
614
615         std = simple_task_data_alloc();
616         if (!std) {
617                 goto end;
618         }
619
620         ast_threadpool_set_size(pool, 1);
621
622         res = wait_until_thread_state(test, tld, 0, 1);
623         if (res == AST_TEST_FAIL) {
624                 goto end;
625         }
626
627         ast_threadpool_push(pool, simple_task, std);
628
629         res = wait_for_completion(test, std);
630         if (res == AST_TEST_FAIL) {
631                 goto end;
632         }
633
634         res = wait_for_empty_notice(test, tld);
635         if (res == AST_TEST_FAIL) {
636                 goto end;
637         }
638
639         /* After completing the task, the thread should go idle */
640         res = wait_until_thread_state(test, tld, 0, 1);
641         if (res == AST_TEST_FAIL) {
642                 goto end;
643         }
644
645         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
646
647 end:
648         if (pool) {
649                 ast_threadpool_shutdown(pool);
650         }
651         ao2_cleanup(listener);
652         ast_free(std);
653         return res;
654 }
655
656 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
657 {
658         struct ast_threadpool *pool = NULL;
659         struct ast_threadpool_listener *listener = NULL;
660         struct simple_task_data *std1 = NULL;
661         struct simple_task_data *std2 = NULL;
662         struct simple_task_data *std3 = NULL;
663         enum ast_test_result_state res = AST_TEST_FAIL;
664         struct test_listener_data *tld;
665         struct ast_threadpool_options options = {
666                 .version = AST_THREADPOOL_OPTIONS_VERSION,
667                 .idle_timeout = 0,
668                 .auto_increment = 0,
669         };
670
671         switch (cmd) {
672         case TEST_INIT:
673                 info->name = "one_thread_multiple_tasks";
674                 info->category = "/main/threadpool/";
675                 info->summary = "Test a single thread with multiple tasks";
676                 info->description =
677                         "Add a thread to the pool and then push three tasks to it.";
678                 return AST_TEST_NOT_RUN;
679         case TEST_EXECUTE:
680                 break;
681         }
682
683         listener = ast_threadpool_listener_alloc(&test_callbacks);
684         if (!listener) {
685                 return AST_TEST_FAIL;
686         }
687         tld = listener->private_data;
688
689         pool = ast_threadpool_create(info->name, listener, 0, &options);
690         if (!pool) {
691                 goto end;
692         }
693
694         std1 = simple_task_data_alloc();
695         std2 = simple_task_data_alloc();
696         std3 = simple_task_data_alloc();
697         if (!std1 || !std2 || !std3) {
698                 goto end;
699         }
700
701         ast_threadpool_set_size(pool, 1);
702
703         res = wait_until_thread_state(test, tld, 0, 1);
704         if (res == AST_TEST_FAIL) {
705                 goto end;
706         }
707
708         ast_threadpool_push(pool, simple_task, std1);
709         ast_threadpool_push(pool, simple_task, std2);
710         ast_threadpool_push(pool, simple_task, std3);
711
712         res = wait_for_completion(test, std1);
713         if (res == AST_TEST_FAIL) {
714                 goto end;
715         }
716         res = wait_for_completion(test, std2);
717         if (res == AST_TEST_FAIL) {
718                 goto end;
719         }
720         res = wait_for_completion(test, std3);
721         if (res == AST_TEST_FAIL) {
722                 goto end;
723         }
724
725         res = wait_for_empty_notice(test, tld);
726         if (res == AST_TEST_FAIL) {
727                 goto end;
728         }
729
730         res = wait_until_thread_state(test, tld, 0, 1);
731         if (res == AST_TEST_FAIL) {
732                 goto end;
733         }
734
735         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
736
737 end:
738         if (pool) {
739                 ast_threadpool_shutdown(pool);
740         }
741         ao2_cleanup(listener);
742         ast_free(std1);
743         ast_free(std2);
744         ast_free(std3);
745         return res;
746 }
747
748 AST_TEST_DEFINE(threadpool_auto_increment)
749 {
750         struct ast_threadpool *pool = NULL;
751         struct ast_threadpool_listener *listener = NULL;
752         struct simple_task_data *std = NULL;
753         enum ast_test_result_state res = AST_TEST_FAIL;
754         struct test_listener_data *tld;
755         struct ast_threadpool_options options = {
756                 .version = AST_THREADPOOL_OPTIONS_VERSION,
757                 .idle_timeout = 0,
758                 .auto_increment = 3,
759         };
760
761         switch (cmd) {
762         case TEST_INIT:
763                 info->name = "auto_increment";
764                 info->category = "/main/threadpool/";
765                 info->summary = "Test that the threadpool grows as tasks are added";
766                 info->description =
767                         "Create an empty threadpool and push a task to it. Once the task is\n"
768                         "pushed, the threadpool should add three threads and be able to\n"
769                         "handle the task. The threads should then go idle\n";
770                 return AST_TEST_NOT_RUN;
771         case TEST_EXECUTE:
772                 break;
773         }
774
775         listener = ast_threadpool_listener_alloc(&test_callbacks);
776         if (!listener) {
777                 return AST_TEST_FAIL;
778         }
779         tld = listener->private_data;
780
781         pool = ast_threadpool_create(info->name, listener, 0, &options);
782         if (!pool) {
783                 goto end;
784         }
785
786         std = simple_task_data_alloc();
787         if (!std) {
788                 goto end;
789         }
790
791         ast_threadpool_push(pool, simple_task, std);
792
793         /* Pushing the task should result in the threadpool growing
794          * by three threads. This will allow the task to actually execute
795          */
796         res = wait_for_completion(test, std);
797         if (res == AST_TEST_FAIL) {
798                 goto end;
799         }
800
801         res = wait_for_empty_notice(test, tld);
802         if (res == AST_TEST_FAIL) {
803                 goto end;
804         }
805
806         res = wait_until_thread_state(test, tld, 0, 3);
807         if (res == AST_TEST_FAIL) {
808                 goto end;
809         }
810
811         res = listener_check(test, listener, 1, 1, 1, 0, 3, 1);
812
813 end:
814         if (pool) {
815                 ast_threadpool_shutdown(pool);
816         }
817         ao2_cleanup(listener);
818         ast_free(std);
819         return res;
820 }
821
822 AST_TEST_DEFINE(threadpool_reactivation)
823 {
824         struct ast_threadpool *pool = NULL;
825         struct ast_threadpool_listener *listener = NULL;
826         struct simple_task_data *std1 = NULL;
827         struct simple_task_data *std2 = NULL;
828         enum ast_test_result_state res = AST_TEST_FAIL;
829         struct test_listener_data *tld;
830         struct ast_threadpool_options options = {
831                 .version = AST_THREADPOOL_OPTIONS_VERSION,
832                 .idle_timeout = 0,
833                 .auto_increment = 0,
834         };
835
836         switch (cmd) {
837         case TEST_INIT:
838                 info->name = "reactivation";
839                 info->category = "/main/threadpool/";
840                 info->summary = "Test that a threadpool reactivates when work is added";
841                 info->description =
842                         "Push a task into a threadpool. Make sure the task executes and the\n"
843                         "thread goes idle. Then push a second task and ensure that the thread\n"
844                         "awakens and executes the second task.\n";
845                 return AST_TEST_NOT_RUN;
846         case TEST_EXECUTE:
847                 break;
848         }
849
850         listener = ast_threadpool_listener_alloc(&test_callbacks);
851         if (!listener) {
852                 return AST_TEST_FAIL;
853         }
854         tld = listener->private_data;
855
856         pool = ast_threadpool_create(info->name, listener, 0, &options);
857         if (!pool) {
858                 goto end;
859         }
860
861         std1 = simple_task_data_alloc();
862         std2 = simple_task_data_alloc();
863         if (!std1 || !std2) {
864                 goto end;
865         }
866
867         ast_threadpool_push(pool, simple_task, std1);
868
869         ast_threadpool_set_size(pool, 1);
870
871         res = wait_for_completion(test, std1);
872         if (res == AST_TEST_FAIL) {
873                 goto end;
874         }
875
876         res = wait_for_empty_notice(test, tld);
877         if (res == AST_TEST_FAIL) {
878                 goto end;
879         }
880         
881         res = wait_until_thread_state(test, tld, 0, 1);
882         if (res == AST_TEST_FAIL) {
883                 goto end;
884         }
885
886         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
887         if (res == AST_TEST_FAIL) {
888                 goto end;
889         }
890
891         /* Now make sure the threadpool reactivates when we add a second task */
892         ast_threadpool_push(pool, simple_task, std2);
893
894         res = wait_for_completion(test, std2);
895         if (res == AST_TEST_FAIL) {
896                 goto end;
897         }
898
899         res = wait_for_empty_notice(test, tld);
900         if (res == AST_TEST_FAIL) {
901                 goto end;
902         }
903         
904         res = wait_until_thread_state(test, tld, 0, 1);
905         if (res == AST_TEST_FAIL) {
906                 goto end;
907         }
908
909         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
910
911 end:
912         if (pool) {
913                 ast_threadpool_shutdown(pool);
914         }
915         ao2_cleanup(listener);
916         ast_free(std1);
917         ast_free(std2);
918         return res;
919
920 }
921
922 struct complex_task_data {
923         int task_executed;
924         int continue_task;
925         ast_mutex_t lock;
926         ast_cond_t stall_cond;
927         ast_cond_t done_cond;
928 };
929
930 static struct complex_task_data *complex_task_data_alloc(void)
931 {
932         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
933
934         if (!ctd) {
935                 return NULL;
936         }
937         ast_mutex_init(&ctd->lock);
938         ast_cond_init(&ctd->stall_cond, NULL);
939         ast_cond_init(&ctd->done_cond, NULL);
940         return ctd;
941 }
942
943 static int complex_task(void *data)
944 {
945         struct complex_task_data *ctd = data;
946         SCOPED_MUTEX(lock, &ctd->lock);
947         while (!ctd->continue_task) {
948                 ast_cond_wait(&ctd->stall_cond, lock);
949         }
950         /* We got poked. Finish up */
951         ctd->task_executed = 1;
952         ast_cond_signal(&ctd->done_cond);
953         return 0;
954 }
955
956 static void poke_worker(struct complex_task_data *ctd)
957 {
958         SCOPED_MUTEX(lock, &ctd->lock);
959         ctd->continue_task = 1;
960         ast_cond_signal(&ctd->stall_cond);
961 }
962
963 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
964 {
965         struct timeval start = ast_tvnow();
966         struct timespec end = {
967                 .tv_sec = start.tv_sec + 5,
968                 .tv_nsec = start.tv_usec * 1000
969         };
970         enum ast_test_result_state res = AST_TEST_PASS;
971         SCOPED_MUTEX(lock, &ctd->lock);
972
973         while (!ctd->task_executed) {
974                 if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
975                         break;
976                 }
977         }
978
979         if (!ctd->task_executed) {
980                 res = AST_TEST_FAIL;
981         }
982         return res;
983 }
984
985 AST_TEST_DEFINE(threadpool_task_distribution)
986 {
987         struct ast_threadpool *pool = NULL;
988         struct ast_threadpool_listener *listener = NULL;
989         struct complex_task_data *ctd1 = NULL;
990         struct complex_task_data *ctd2 = NULL;
991         enum ast_test_result_state res = AST_TEST_FAIL;
992         struct test_listener_data *tld;
993         struct ast_threadpool_options options = {
994                 .version = AST_THREADPOOL_OPTIONS_VERSION,
995                 .idle_timeout = 0,
996                 .auto_increment = 0,
997         };
998
999         switch (cmd) {
1000         case TEST_INIT:
1001                 info->name = "task_distribution";
1002                 info->category = "/main/threadpool/";
1003                 info->summary = "Test that tasks are evenly distributed to threads";
1004                 info->description =
1005                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1006                         "a separate thread\n";
1007                 return AST_TEST_NOT_RUN;
1008         case TEST_EXECUTE:
1009                 break;
1010         }
1011
1012         listener = ast_threadpool_listener_alloc(&test_callbacks);
1013         if (!listener) {
1014                 return AST_TEST_FAIL;
1015         }
1016         tld = listener->private_data;
1017
1018         pool = ast_threadpool_create(info->name, listener, 0, &options);
1019         if (!pool) {
1020                 goto end;
1021         }
1022
1023         ctd1 = complex_task_data_alloc();
1024         ctd2 = complex_task_data_alloc();
1025         if (!ctd1 || !ctd2) {
1026                 goto end;
1027         }
1028
1029         ast_threadpool_push(pool, complex_task, ctd1);
1030         ast_threadpool_push(pool, complex_task, ctd2);
1031
1032         ast_threadpool_set_size(pool, 2);
1033
1034         res = wait_until_thread_state(test, tld, 2, 0);
1035         if (res == AST_TEST_FAIL) {
1036                 goto end;
1037         }
1038
1039         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1040         if (res == AST_TEST_FAIL) {
1041                 goto end;
1042         }
1043
1044         /* The tasks are stalled until we poke them */
1045         poke_worker(ctd1);
1046         poke_worker(ctd2);
1047
1048         res = wait_for_complex_completion(ctd1);
1049         if (res == AST_TEST_FAIL) {
1050                 goto end;
1051         }
1052         res = wait_for_complex_completion(ctd2);
1053         if (res == AST_TEST_FAIL) {
1054                 goto end;
1055         }
1056
1057         res = wait_until_thread_state(test, tld, 0, 2);
1058         if (res == AST_TEST_FAIL) {
1059                 goto end;
1060         }
1061
1062         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1063
1064 end:
1065         if (pool) {
1066                 ast_threadpool_shutdown(pool);
1067         }
1068         ao2_cleanup(listener);
1069         ast_free(ctd1);
1070         ast_free(ctd2);
1071         return res;
1072 }
1073
1074 AST_TEST_DEFINE(threadpool_more_destruction)
1075 {
1076         struct ast_threadpool *pool = NULL;
1077         struct ast_threadpool_listener *listener = NULL;
1078         struct complex_task_data *ctd1 = NULL;
1079         struct complex_task_data *ctd2 = NULL;
1080         enum ast_test_result_state res = AST_TEST_FAIL;
1081         struct test_listener_data *tld;
1082         struct ast_threadpool_options options = {
1083                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1084                 .idle_timeout = 0,
1085                 .auto_increment = 0,
1086         };
1087
1088         switch (cmd) {
1089         case TEST_INIT:
1090                 info->name = "more_destruction";
1091                 info->category = "/main/threadpool/";
1092                 info->summary = "Test that threads are destroyed as expected";
1093                 info->description =
1094                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1095                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1096                         "threadpool down to 1 thread. Ensure that the thread leftove is active\n"
1097                         "and ensure that both tasks complete.\n";
1098                 return AST_TEST_NOT_RUN;
1099         case TEST_EXECUTE:
1100                 break;
1101         }
1102
1103         listener = ast_threadpool_listener_alloc(&test_callbacks);
1104         if (!listener) {
1105                 return AST_TEST_FAIL;
1106         }
1107         tld = listener->private_data;
1108
1109         pool = ast_threadpool_create(info->name, listener, 0, &options);
1110         if (!pool) {
1111                 goto end;
1112         }
1113
1114         ctd1 = complex_task_data_alloc();
1115         ctd2 = complex_task_data_alloc();
1116         if (!ctd1 || !ctd2) {
1117                 goto end;
1118         }
1119
1120         ast_threadpool_push(pool, complex_task, ctd1);
1121         ast_threadpool_push(pool, complex_task, ctd2);
1122
1123         ast_threadpool_set_size(pool, 4);
1124
1125         res = wait_until_thread_state(test, tld, 2, 2);
1126         if (res == AST_TEST_FAIL) {
1127                 goto end;
1128         }
1129
1130         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1131         if (res == AST_TEST_FAIL) {
1132                 goto end;
1133         }
1134
1135         ast_threadpool_set_size(pool, 1);
1136
1137         /* Shrinking the threadpool should kill off the two idle threads
1138          * and one of the active threads.
1139          */
1140         res = wait_until_thread_state(test, tld, 1, 0);
1141         if (res == AST_TEST_FAIL) {
1142                 goto end;
1143         }
1144
1145         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1146         if (res == AST_TEST_FAIL) {
1147                 goto end;
1148         }
1149
1150         /* The tasks are stalled until we poke them */
1151         poke_worker(ctd1);
1152         poke_worker(ctd2);
1153
1154         res = wait_for_complex_completion(ctd1);
1155         if (res == AST_TEST_FAIL) {
1156                 goto end;
1157         }
1158         res = wait_for_complex_completion(ctd2);
1159         if (res == AST_TEST_FAIL) {
1160                 goto end;
1161         }
1162
1163         res = wait_until_thread_state(test, tld, 0, 1);
1164         if (res == AST_TEST_FAIL) {
1165                 goto end;
1166         }
1167
1168         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1169
1170 end:
1171         if (pool) {
1172                 ast_threadpool_shutdown(pool);
1173         }
1174         ao2_cleanup(listener);
1175         ast_free(ctd1);
1176         ast_free(ctd2);
1177         return res;
1178 }
1179
1180 static int unload_module(void)
1181 {
1182         ast_test_unregister(threadpool_push);
1183         ast_test_unregister(threadpool_thread_creation);
1184         ast_test_unregister(threadpool_thread_destruction);
1185         ast_test_unregister(threadpool_thread_timeout);
1186         ast_test_unregister(threadpool_one_task_one_thread);
1187         ast_test_unregister(threadpool_one_thread_one_task);
1188         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1189         ast_test_unregister(threadpool_auto_increment);
1190         ast_test_unregister(threadpool_reactivation);
1191         ast_test_unregister(threadpool_task_distribution);
1192         ast_test_unregister(threadpool_more_destruction);
1193         return 0;
1194 }
1195
1196 static int load_module(void)
1197 {
1198         ast_test_register(threadpool_push);
1199         ast_test_register(threadpool_thread_creation);
1200         ast_test_register(threadpool_thread_destruction);
1201         ast_test_register(threadpool_thread_timeout);
1202         ast_test_register(threadpool_one_task_one_thread);
1203         ast_test_register(threadpool_one_thread_one_task);
1204         ast_test_register(threadpool_one_thread_multiple_tasks);
1205         ast_test_register(threadpool_auto_increment);
1206         ast_test_register(threadpool_reactivation);
1207         ast_test_register(threadpool_task_distribution);
1208         ast_test_register(threadpool_more_destruction);
1209         return AST_MODULE_LOAD_SUCCESS;
1210 }
1211
1212 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");