threadpool: Handle worker thread transitioning to dead when going active.
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/lock.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/module.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/test.h"
40 #include "asterisk/threadpool.h"
41
42 struct test_listener_data {
43         int num_active;
44         int num_idle;
45         int task_pushed;
46         int num_tasks;
47         int empty_notice;
48         int was_empty;
49         ast_mutex_t lock;
50         ast_cond_t cond;
51 };
52
53 static struct test_listener_data *test_alloc(void)
54 {
55         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56         if (!tld) {
57                 return NULL;
58         }
59         ast_mutex_init(&tld->lock);
60         ast_cond_init(&tld->cond, NULL);
61         return tld;
62 }
63
64 static void test_state_changed(struct ast_threadpool *pool,
65                 struct ast_threadpool_listener *listener,
66                 int active_threads,
67                 int idle_threads)
68 {
69         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
70         SCOPED_MUTEX(lock, &tld->lock);
71         tld->num_active = active_threads;
72         tld->num_idle = idle_threads;
73         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74         ast_cond_signal(&tld->cond);
75 }
76
77 static void test_task_pushed(struct ast_threadpool *pool,
78                 struct ast_threadpool_listener *listener,
79                 int was_empty)
80 {
81         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
82         SCOPED_MUTEX(lock, &tld->lock);
83         tld->task_pushed = 1;
84         ++tld->num_tasks;
85         tld->was_empty = was_empty;
86         ast_cond_signal(&tld->cond);
87 }
88
89 static void test_emptied(struct ast_threadpool *pool,
90                 struct ast_threadpool_listener *listener)
91 {
92         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
93         SCOPED_MUTEX(lock, &tld->lock);
94         tld->empty_notice = 1;
95         ast_cond_signal(&tld->cond);
96 }
97
98 static void test_shutdown(struct ast_threadpool_listener *listener)
99 {
100         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
101         ast_cond_destroy(&tld->cond);
102         ast_mutex_destroy(&tld->lock);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .state_changed = test_state_changed,
107         .task_pushed = test_task_pushed,
108         .emptied = test_emptied,
109         .shutdown = test_shutdown,
110 };
111
112 struct simple_task_data {
113         int task_executed;
114         ast_mutex_t lock;
115         ast_cond_t cond;
116 };
117
118 static struct simple_task_data *simple_task_data_alloc(void)
119 {
120         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121
122         if (!std) {
123                 return NULL;
124         }
125         ast_mutex_init(&std->lock);
126         ast_cond_init(&std->cond, NULL);
127         return std;
128 }
129
130 static int simple_task(void *data)
131 {
132         struct simple_task_data *std = data;
133         SCOPED_MUTEX(lock, &std->lock);
134         std->task_executed = 1;
135         ast_cond_signal(&std->cond);
136         return 0;
137 }
138
139 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
140 {
141         struct timeval start = ast_tvnow();
142         struct timespec end = {
143                 .tv_sec = start.tv_sec + 5,
144                 .tv_nsec = start.tv_usec * 1000
145         };
146         enum ast_test_result_state res = AST_TEST_PASS;
147         SCOPED_MUTEX(lock, &tld->lock);
148
149         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
150                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
151                         break;
152                 }
153         }
154
155         if (tld->num_active != num_active && tld->num_idle != num_idle) {
156                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
157                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
158                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
159                 res = AST_TEST_FAIL;
160         }
161
162         return res;
163 }
164
165 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
166 {
167         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
168         struct timeval start = ast_tvnow();
169         struct timespec end = {
170                 .tv_sec = start.tv_sec + 5,
171                 .tv_nsec = start.tv_usec * 1000
172         };
173         SCOPED_MUTEX(lock, &tld->lock);
174
175         while (!tld->task_pushed) {
176                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
177                         break;
178                 }
179         }
180 }
181
182 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
183 {
184         struct timeval start = ast_tvnow();
185         struct timespec end = {
186                 .tv_sec = start.tv_sec + 5,
187                 .tv_nsec = start.tv_usec * 1000
188         };
189         enum ast_test_result_state res = AST_TEST_PASS;
190         SCOPED_MUTEX(lock, &std->lock);
191
192         while (!std->task_executed) {
193                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
194                         break;
195                 }
196         }
197
198         if (!std->task_executed) {
199                 ast_test_status_update(test, "Task execution did not occur\n");
200                 res = AST_TEST_FAIL;
201         }
202         return res;
203 }
204
205 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
206 {
207         struct timeval start = ast_tvnow();
208         struct timespec end = {
209                 .tv_sec = start.tv_sec + 5,
210                 .tv_nsec = start.tv_usec * 1000
211         };
212         enum ast_test_result_state res = AST_TEST_PASS;
213         SCOPED_MUTEX(lock, &tld->lock);
214
215         while (!tld->empty_notice) {
216                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
217                         break;
218                 }
219         }
220
221         if (!tld->empty_notice) {
222                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
223                 res = AST_TEST_FAIL;
224         }
225
226         return res;
227 }
228
229 static enum ast_test_result_state listener_check(
230                 struct ast_test *test,
231                 struct ast_threadpool_listener *listener,
232                 int task_pushed,
233                 int was_empty,
234                 int num_tasks,
235                 int num_active,
236                 int num_idle,
237                 int empty_notice)
238 {
239         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
240         enum ast_test_result_state res = AST_TEST_PASS;
241
242         if (tld->task_pushed != task_pushed) {
243                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
244                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
245                 res = AST_TEST_FAIL;
246         }
247         if (tld->was_empty != was_empty) {
248                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
249                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
250                 res = AST_TEST_FAIL;
251         }
252         if (tld->num_tasks!= num_tasks) {
253                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
254                                 num_tasks, tld->num_tasks);
255                 res = AST_TEST_FAIL;
256         }
257         if (tld->num_active != num_active) {
258                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
259                                 num_active, tld->num_active);
260                 res = AST_TEST_FAIL;
261         }
262         if (tld->num_idle != num_idle) {
263                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
264                                 num_idle, tld->num_idle);
265                 res = AST_TEST_FAIL;
266         }
267         if (tld->empty_notice != empty_notice) {
268                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
269                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
270                 res = AST_TEST_FAIL;
271         }
272
273         return res;
274 }
275
276 AST_TEST_DEFINE(threadpool_push)
277 {
278         struct ast_threadpool *pool = NULL;
279         struct ast_threadpool_listener *listener = NULL;
280         struct simple_task_data *std = NULL;
281         struct test_listener_data *tld = NULL;
282         enum ast_test_result_state res = AST_TEST_FAIL;
283         struct ast_threadpool_options options = {
284                 .version = AST_THREADPOOL_OPTIONS_VERSION,
285                 .idle_timeout = 0,
286                 .auto_increment = 0,
287                 .initial_size = 0,
288                 .max_size = 0,
289         };
290
291         switch (cmd) {
292         case TEST_INIT:
293                 info->name = "push";
294                 info->category = "/main/threadpool/";
295                 info->summary = "Test task";
296                 info->description =
297                         "Basic threadpool test";
298                 return AST_TEST_NOT_RUN;
299         case TEST_EXECUTE:
300                 break;
301         }
302         tld = test_alloc();
303         if (!tld) {
304                 return AST_TEST_FAIL;
305         }
306
307         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
308         if (!listener) {
309                 goto end;
310         }
311
312         pool = ast_threadpool_create(info->name, listener, &options);
313         if (!pool) {
314                 goto end;
315         }
316
317         std = simple_task_data_alloc();
318         if (!std) {
319                 goto end;
320         }
321
322         ast_threadpool_push(pool, simple_task, std);
323
324         wait_for_task_pushed(listener);
325
326         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
327
328 end:
329         ast_threadpool_shutdown(pool);
330         ao2_cleanup(listener);
331         ast_free(std);
332         ast_free(tld);
333         return res;
334 }
335
336 AST_TEST_DEFINE(threadpool_initial_threads)
337 {
338         struct ast_threadpool *pool = NULL;
339         struct ast_threadpool_listener *listener = NULL;
340         enum ast_test_result_state res = AST_TEST_FAIL;
341         struct test_listener_data *tld = NULL;
342         struct ast_threadpool_options options = {
343                 .version = AST_THREADPOOL_OPTIONS_VERSION,
344                 .idle_timeout = 0,
345                 .auto_increment = 0,
346                 .initial_size = 3,
347                 .max_size = 0,
348         };
349
350         switch (cmd) {
351         case TEST_INIT:
352                 info->name = "initial_threads";
353                 info->category = "/main/threadpool/";
354                 info->summary = "Test threadpool initialization state";
355                 info->description =
356                         "Ensure that a threadpool created with a specific size contains the\n"
357                         "proper number of idle threads.";
358                 return AST_TEST_NOT_RUN;
359         case TEST_EXECUTE:
360                 break;
361         }
362
363         tld = test_alloc();
364         if (!tld) {
365                 return AST_TEST_FAIL;
366         }
367
368         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
369         if (!listener) {
370                 goto end;
371         }
372
373         pool = ast_threadpool_create(info->name, listener, &options);
374         if (!pool) {
375                 goto end;
376         }
377
378         res = wait_until_thread_state(test, tld, 0, 3);
379
380 end:
381         ast_threadpool_shutdown(pool);
382         ao2_cleanup(listener);
383         ast_free(tld);
384         return res;
385 }
386
387
388 AST_TEST_DEFINE(threadpool_thread_creation)
389 {
390         struct ast_threadpool *pool = NULL;
391         struct ast_threadpool_listener *listener = NULL;
392         enum ast_test_result_state res = AST_TEST_FAIL;
393         struct test_listener_data *tld = NULL;
394         struct ast_threadpool_options options = {
395                 .version = AST_THREADPOOL_OPTIONS_VERSION,
396                 .idle_timeout = 0,
397                 .auto_increment = 0,
398                 .initial_size = 0,
399                 .max_size = 0,
400         };
401
402         switch (cmd) {
403         case TEST_INIT:
404                 info->name = "thread_creation";
405                 info->category = "/main/threadpool/";
406                 info->summary = "Test threadpool thread creation";
407                 info->description =
408                         "Ensure that threads can be added to a threadpool";
409                 return AST_TEST_NOT_RUN;
410         case TEST_EXECUTE:
411                 break;
412         }
413
414         tld = test_alloc();
415         if (!tld) {
416                 return AST_TEST_FAIL;
417         }
418
419         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
420         if (!listener) {
421                 goto end;
422         }
423
424         pool = ast_threadpool_create(info->name, listener, &options);
425         if (!pool) {
426                 goto end;
427         }
428
429         /* Now let's create a thread. It should start active, then go
430          * idle immediately
431          */
432         ast_threadpool_set_size(pool, 1);
433
434         res = wait_until_thread_state(test, tld, 0, 1);
435
436 end:
437         ast_threadpool_shutdown(pool);
438         ao2_cleanup(listener);
439         ast_free(tld);
440         return res;
441 }
442
443 AST_TEST_DEFINE(threadpool_thread_destruction)
444 {
445         struct ast_threadpool *pool = NULL;
446         struct ast_threadpool_listener *listener = NULL;
447         enum ast_test_result_state res = AST_TEST_FAIL;
448         struct test_listener_data *tld = NULL;
449         struct ast_threadpool_options options = {
450                 .version = AST_THREADPOOL_OPTIONS_VERSION,
451                 .idle_timeout = 0,
452                 .auto_increment = 0,
453                 .initial_size = 0,
454                 .max_size = 0,
455         };
456
457         switch (cmd) {
458         case TEST_INIT:
459                 info->name = "thread_destruction";
460                 info->category = "/main/threadpool/";
461                 info->summary = "Test threadpool thread destruction";
462                 info->description =
463                         "Ensure that threads are properly destroyed in a threadpool";
464                 return AST_TEST_NOT_RUN;
465         case TEST_EXECUTE:
466                 break;
467         }
468
469         tld = test_alloc();
470         if (!tld) {
471                 return AST_TEST_FAIL;
472         }
473
474         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
475         if (!listener) {
476                 goto end;
477         }
478
479         pool = ast_threadpool_create(info->name, listener, &options);
480         if (!pool) {
481                 goto end;
482         }
483
484         ast_threadpool_set_size(pool, 3);
485
486         res = wait_until_thread_state(test, tld, 0, 3);
487         if (res == AST_TEST_FAIL) {
488                 goto end;
489         }
490
491         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
492         if (res == AST_TEST_FAIL) {
493                 goto end;
494         }
495
496         ast_threadpool_set_size(pool, 2);
497
498         res = wait_until_thread_state(test, tld, 0, 2);
499
500 end:
501         ast_threadpool_shutdown(pool);
502         ao2_cleanup(listener);
503         ast_free(tld);
504         return res;
505 }
506
507 AST_TEST_DEFINE(threadpool_thread_timeout)
508 {
509         struct ast_threadpool *pool = NULL;
510         struct ast_threadpool_listener *listener = NULL;
511         enum ast_test_result_state res = AST_TEST_FAIL;
512         struct test_listener_data *tld = NULL;
513         struct ast_threadpool_options options = {
514                 .version = AST_THREADPOOL_OPTIONS_VERSION,
515                 .idle_timeout = 2,
516                 .auto_increment = 0,
517                 .initial_size = 0,
518                 .max_size = 0,
519         };
520
521         switch (cmd) {
522         case TEST_INIT:
523                 info->name = "thread_timeout";
524                 info->category = "/main/threadpool/";
525                 info->summary = "Test threadpool thread timeout";
526                 info->description =
527                         "Ensure that a thread with a two second timeout dies as expected.";
528                 return AST_TEST_NOT_RUN;
529         case TEST_EXECUTE:
530                 break;
531         }
532
533         tld = test_alloc();
534         if (!tld) {
535                 return AST_TEST_FAIL;
536         }
537
538         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
539         if (!listener) {
540                 goto end;
541         }
542
543         pool = ast_threadpool_create(info->name, listener, &options);
544         if (!pool) {
545                 goto end;
546         }
547
548         ast_threadpool_set_size(pool, 1);
549
550         res = wait_until_thread_state(test, tld, 0, 1);
551         if (res == AST_TEST_FAIL) {
552                 goto end;
553         }
554
555         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
556         if (res == AST_TEST_FAIL) {
557                 goto end;
558         }
559
560         res = wait_until_thread_state(test, tld, 0, 0);
561         if (res == AST_TEST_FAIL) {
562                 goto end;
563         }
564
565         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
566
567 end:
568         ast_threadpool_shutdown(pool);
569         ao2_cleanup(listener);
570         ast_free(tld);
571         return res;
572 }
573
574 AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
575 {
576         struct ast_threadpool *pool = NULL;
577         struct ast_threadpool_listener *listener = NULL;
578         enum ast_test_result_state res = AST_TEST_FAIL;
579         struct test_listener_data *tld = NULL;
580         struct ast_threadpool_options options = {
581                 .version = AST_THREADPOOL_OPTIONS_VERSION,
582                 .idle_timeout = 1,
583                 .auto_increment = 1,
584                 .initial_size = 0,
585                 .max_size = 1,
586         };
587         int iteration;
588
589         switch (cmd) {
590         case TEST_INIT:
591                 info->name = "thread_timeout_thrash";
592                 info->category = "/main/threadpool/";
593                 info->summary = "Thrash threadpool thread timeout";
594                 info->description =
595                         "Repeatedly queue a task when a threadpool thread should timeout.";
596                 return AST_TEST_NOT_RUN;
597         case TEST_EXECUTE:
598                 break;
599         }
600
601         tld = test_alloc();
602         if (!tld) {
603                 return AST_TEST_FAIL;
604         }
605
606         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
607         if (!listener) {
608                 goto end;
609         }
610
611         pool = ast_threadpool_create(info->name, listener, &options);
612         if (!pool) {
613                 goto end;
614         }
615
616         ast_threadpool_set_size(pool, 1);
617
618         for (iteration = 0; iteration < 30; ++iteration) {
619                 struct simple_task_data *std = NULL;
620                 struct timeval start = ast_tvnow();
621                 struct timespec end = {
622                         .tv_sec = start.tv_sec + options.idle_timeout,
623                         .tv_nsec = start.tv_usec * 1000
624                 };
625
626                 std = simple_task_data_alloc();
627                 if (!std) {
628                         goto end;
629                 }
630
631                 /* Wait until the threadpool thread should timeout due to being idle */
632                 ast_mutex_lock(&tld->lock);
633                 while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
634                         /* This purposely left empty as we want to loop waiting for a time out */
635                 }
636                 ast_mutex_unlock(&tld->lock);
637
638                 ast_threadpool_push(pool, simple_task, std);
639         }
640
641         res = wait_until_thread_state(test, tld, 0, 0);
642         if (res == AST_TEST_FAIL) {
643                 goto end;
644         }
645
646         res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
647
648 end:
649         ast_threadpool_shutdown(pool);
650         ao2_cleanup(listener);
651         ast_free(tld);
652         return res;
653 }
654
655 AST_TEST_DEFINE(threadpool_one_task_one_thread)
656 {
657         struct ast_threadpool *pool = NULL;
658         struct ast_threadpool_listener *listener = NULL;
659         struct simple_task_data *std = NULL;
660         enum ast_test_result_state res = AST_TEST_FAIL;
661         struct test_listener_data *tld = NULL;
662         struct ast_threadpool_options options = {
663                 .version = AST_THREADPOOL_OPTIONS_VERSION,
664                 .idle_timeout = 0,
665                 .auto_increment = 0,
666                 .initial_size = 0,
667                 .max_size = 0,
668         };
669
670         switch (cmd) {
671         case TEST_INIT:
672                 info->name = "one_task_one_thread";
673                 info->category = "/main/threadpool/";
674                 info->summary = "Test a single task with a single thread";
675                 info->description =
676                         "Push a task into an empty threadpool, then add a thread to the pool.";
677                 return AST_TEST_NOT_RUN;
678         case TEST_EXECUTE:
679                 break;
680         }
681
682         tld = test_alloc();
683         if (!tld) {
684                 return AST_TEST_FAIL;
685         }
686
687         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
688         if (!listener) {
689                 goto end;
690         }
691
692         pool = ast_threadpool_create(info->name, listener, &options);
693         if (!pool) {
694                 goto end;
695         }
696
697         std = simple_task_data_alloc();
698         if (!std) {
699                 goto end;
700         }
701
702         ast_threadpool_push(pool, simple_task, std);
703
704         ast_threadpool_set_size(pool, 1);
705
706         /* Threads added to the pool are active when they start,
707          * so the newly-created thread should immediately execute
708          * the waiting task.
709          */
710         res = wait_for_completion(test, std);
711         if (res == AST_TEST_FAIL) {
712                 goto end;
713         }
714
715         res = wait_for_empty_notice(test, tld);
716         if (res == AST_TEST_FAIL) {
717                 goto end;
718         }
719
720         /* After completing the task, the thread should go idle */
721         res = wait_until_thread_state(test, tld, 0, 1);
722         if (res == AST_TEST_FAIL) {
723                 goto end;
724         }
725
726         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
727
728 end:
729         ast_threadpool_shutdown(pool);
730         ao2_cleanup(listener);
731         ast_free(std);
732         ast_free(tld);
733         return res;
734
735 }
736
737 AST_TEST_DEFINE(threadpool_one_thread_one_task)
738 {
739         struct ast_threadpool *pool = NULL;
740         struct ast_threadpool_listener *listener = NULL;
741         struct simple_task_data *std = NULL;
742         enum ast_test_result_state res = AST_TEST_FAIL;
743         struct test_listener_data *tld = NULL;
744         struct ast_threadpool_options options = {
745                 .version = AST_THREADPOOL_OPTIONS_VERSION,
746                 .idle_timeout = 0,
747                 .auto_increment = 0,
748                 .initial_size = 0,
749                 .max_size = 0,
750         };
751
752         switch (cmd) {
753         case TEST_INIT:
754                 info->name = "one_thread_one_task";
755                 info->category = "/main/threadpool/";
756                 info->summary = "Test a single thread with a single task";
757                 info->description =
758                         "Add a thread to the pool and then push a task to it.";
759                 return AST_TEST_NOT_RUN;
760         case TEST_EXECUTE:
761                 break;
762         }
763
764         tld = test_alloc();
765         if (!tld) {
766                 return AST_TEST_FAIL;
767         }
768
769         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
770         if (!listener) {
771                 goto end;
772         }
773
774         pool = ast_threadpool_create(info->name, listener, &options);
775         if (!pool) {
776                 goto end;
777         }
778
779         std = simple_task_data_alloc();
780         if (!std) {
781                 goto end;
782         }
783
784         ast_threadpool_set_size(pool, 1);
785
786         res = wait_until_thread_state(test, tld, 0, 1);
787         if (res == AST_TEST_FAIL) {
788                 goto end;
789         }
790
791         ast_threadpool_push(pool, simple_task, std);
792
793         res = wait_for_completion(test, std);
794         if (res == AST_TEST_FAIL) {
795                 goto end;
796         }
797
798         res = wait_for_empty_notice(test, tld);
799         if (res == AST_TEST_FAIL) {
800                 goto end;
801         }
802
803         /* After completing the task, the thread should go idle */
804         res = wait_until_thread_state(test, tld, 0, 1);
805         if (res == AST_TEST_FAIL) {
806                 goto end;
807         }
808
809         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
810
811 end:
812         ast_threadpool_shutdown(pool);
813         ao2_cleanup(listener);
814         ast_free(std);
815         ast_free(tld);
816         return res;
817 }
818
819 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
820 {
821         struct ast_threadpool *pool = NULL;
822         struct ast_threadpool_listener *listener = NULL;
823         struct simple_task_data *std1 = NULL;
824         struct simple_task_data *std2 = NULL;
825         struct simple_task_data *std3 = NULL;
826         enum ast_test_result_state res = AST_TEST_FAIL;
827         struct test_listener_data *tld = NULL;
828         struct ast_threadpool_options options = {
829                 .version = AST_THREADPOOL_OPTIONS_VERSION,
830                 .idle_timeout = 0,
831                 .auto_increment = 0,
832                 .initial_size = 0,
833                 .max_size = 0,
834         };
835
836         switch (cmd) {
837         case TEST_INIT:
838                 info->name = "one_thread_multiple_tasks";
839                 info->category = "/main/threadpool/";
840                 info->summary = "Test a single thread with multiple tasks";
841                 info->description =
842                         "Add a thread to the pool and then push three tasks to it.";
843                 return AST_TEST_NOT_RUN;
844         case TEST_EXECUTE:
845                 break;
846         }
847
848         tld = test_alloc();
849         if (!tld) {
850                 return AST_TEST_FAIL;
851         }
852
853         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
854         if (!listener) {
855                 goto end;
856         }
857
858         pool = ast_threadpool_create(info->name, listener, &options);
859         if (!pool) {
860                 goto end;
861         }
862
863         std1 = simple_task_data_alloc();
864         std2 = simple_task_data_alloc();
865         std3 = simple_task_data_alloc();
866         if (!std1 || !std2 || !std3) {
867                 goto end;
868         }
869
870         ast_threadpool_set_size(pool, 1);
871
872         res = wait_until_thread_state(test, tld, 0, 1);
873         if (res == AST_TEST_FAIL) {
874                 goto end;
875         }
876
877         ast_threadpool_push(pool, simple_task, std1);
878         ast_threadpool_push(pool, simple_task, std2);
879         ast_threadpool_push(pool, simple_task, std3);
880
881         res = wait_for_completion(test, std1);
882         if (res == AST_TEST_FAIL) {
883                 goto end;
884         }
885         res = wait_for_completion(test, std2);
886         if (res == AST_TEST_FAIL) {
887                 goto end;
888         }
889         res = wait_for_completion(test, std3);
890         if (res == AST_TEST_FAIL) {
891                 goto end;
892         }
893
894         res = wait_for_empty_notice(test, tld);
895         if (res == AST_TEST_FAIL) {
896                 goto end;
897         }
898
899         res = wait_until_thread_state(test, tld, 0, 1);
900         if (res == AST_TEST_FAIL) {
901                 goto end;
902         }
903
904         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
905
906 end:
907         ast_threadpool_shutdown(pool);
908         ao2_cleanup(listener);
909         ast_free(std1);
910         ast_free(std2);
911         ast_free(std3);
912         ast_free(tld);
913         return res;
914 }
915
916 AST_TEST_DEFINE(threadpool_auto_increment)
917 {
918         struct ast_threadpool *pool = NULL;
919         struct ast_threadpool_listener *listener = NULL;
920         struct simple_task_data *std1 = NULL;
921         struct simple_task_data *std2 = NULL;
922         struct simple_task_data *std3 = NULL;
923         struct simple_task_data *std4 = NULL;
924         enum ast_test_result_state res = AST_TEST_FAIL;
925         struct test_listener_data *tld = NULL;
926         struct ast_threadpool_options options = {
927                 .version = AST_THREADPOOL_OPTIONS_VERSION,
928                 .idle_timeout = 0,
929                 .auto_increment = 3,
930                 .initial_size = 0,
931                 .max_size = 0,
932         };
933
934         switch (cmd) {
935         case TEST_INIT:
936                 info->name = "auto_increment";
937                 info->category = "/main/threadpool/";
938                 info->summary = "Test that the threadpool grows as tasks are added";
939                 info->description =
940                         "Create an empty threadpool and push a task to it. Once the task is\n"
941                         "pushed, the threadpool should add three threads and be able to\n"
942                         "handle the task. The threads should then go idle";
943                 return AST_TEST_NOT_RUN;
944         case TEST_EXECUTE:
945                 break;
946         }
947
948         tld = test_alloc();
949         if (!tld) {
950                 return AST_TEST_FAIL;
951         }
952
953         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
954         if (!listener) {
955                 goto end;
956         }
957
958         pool = ast_threadpool_create(info->name, listener, &options);
959         if (!pool) {
960                 goto end;
961         }
962
963         std1 = simple_task_data_alloc();
964         std2 = simple_task_data_alloc();
965         std3 = simple_task_data_alloc();
966         std4 = simple_task_data_alloc();
967         if (!std1 || !std2 || !std3 || !std4) {
968                 goto end;
969         }
970
971         ast_threadpool_push(pool, simple_task, std1);
972
973         /* Pushing the task should result in the threadpool growing
974          * by three threads. This will allow the task to actually execute
975          */
976         res = wait_for_completion(test, std1);
977         if (res == AST_TEST_FAIL) {
978                 goto end;
979         }
980
981         res = wait_for_empty_notice(test, tld);
982         if (res == AST_TEST_FAIL) {
983                 goto end;
984         }
985
986         res = wait_until_thread_state(test, tld, 0, 3);
987         if (res == AST_TEST_FAIL) {
988                 goto end;
989         }
990
991         /* Now push three tasks into the pool and ensure the pool does not
992          * grow.
993          */
994         ast_threadpool_push(pool, simple_task, std2);
995         ast_threadpool_push(pool, simple_task, std3);
996         ast_threadpool_push(pool, simple_task, std4);
997
998         res = wait_for_completion(test, std2);
999         if (res == AST_TEST_FAIL) {
1000                 goto end;
1001         }
1002         res = wait_for_completion(test, std3);
1003         if (res == AST_TEST_FAIL) {
1004                 goto end;
1005         }
1006         res = wait_for_completion(test, std4);
1007         if (res == AST_TEST_FAIL) {
1008                 goto end;
1009         }
1010
1011         res = wait_for_empty_notice(test, tld);
1012         if (res == AST_TEST_FAIL) {
1013                 goto end;
1014         }
1015
1016         res = wait_until_thread_state(test, tld, 0, 3);
1017         if (res == AST_TEST_FAIL) {
1018                 goto end;
1019         }
1020         res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
1021
1022 end:
1023         ast_threadpool_shutdown(pool);
1024         ao2_cleanup(listener);
1025         ast_free(std1);
1026         ast_free(std2);
1027         ast_free(std3);
1028         ast_free(std4);
1029         ast_free(tld);
1030         return res;
1031 }
1032
1033 AST_TEST_DEFINE(threadpool_max_size)
1034 {
1035         struct ast_threadpool *pool = NULL;
1036         struct ast_threadpool_listener *listener = NULL;
1037         struct simple_task_data *std = NULL;
1038         enum ast_test_result_state res = AST_TEST_FAIL;
1039         struct test_listener_data *tld = NULL;
1040         struct ast_threadpool_options options = {
1041                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1042                 .idle_timeout = 0,
1043                 .auto_increment = 3,
1044                 .initial_size = 0,
1045                 .max_size = 2,
1046         };
1047
1048         switch (cmd) {
1049         case TEST_INIT:
1050                 info->name = "max_size";
1051                 info->category = "/main/threadpool/";
1052                 info->summary = "Test that the threadpool does not exceed its maximum size restriction";
1053                 info->description =
1054                         "Create an empty threadpool and push a task to it. Once the task is\n"
1055                         "pushed, the threadpool should attempt to grow by three threads, but the\n"
1056                         "pool's restrictions should only allow two threads to be added.";
1057                 return AST_TEST_NOT_RUN;
1058         case TEST_EXECUTE:
1059                 break;
1060         }
1061
1062         tld = test_alloc();
1063         if (!tld) {
1064                 return AST_TEST_FAIL;
1065         }
1066
1067         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1068         if (!listener) {
1069                 goto end;
1070         }
1071
1072         pool = ast_threadpool_create(info->name, listener, &options);
1073         if (!pool) {
1074                 goto end;
1075         }
1076
1077         std = simple_task_data_alloc();
1078         if (!std) {
1079                 goto end;
1080         }
1081
1082         ast_threadpool_push(pool, simple_task, std);
1083
1084         res = wait_for_completion(test, std);
1085         if (res == AST_TEST_FAIL) {
1086                 goto end;
1087         }
1088
1089         res = wait_until_thread_state(test, tld, 0, 2);
1090         if (res == AST_TEST_FAIL) {
1091                 goto end;
1092         }
1093
1094         res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1095 end:
1096         ast_threadpool_shutdown(pool);
1097         ao2_cleanup(listener);
1098         ast_free(std);
1099         ast_free(tld);
1100         return res;
1101 }
1102
1103 AST_TEST_DEFINE(threadpool_reactivation)
1104 {
1105         struct ast_threadpool *pool = NULL;
1106         struct ast_threadpool_listener *listener = NULL;
1107         struct simple_task_data *std1 = NULL;
1108         struct simple_task_data *std2 = NULL;
1109         enum ast_test_result_state res = AST_TEST_FAIL;
1110         struct test_listener_data *tld = NULL;
1111         struct ast_threadpool_options options = {
1112                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1113                 .idle_timeout = 0,
1114                 .auto_increment = 0,
1115                 .initial_size = 0,
1116                 .max_size = 0,
1117         };
1118
1119         switch (cmd) {
1120         case TEST_INIT:
1121                 info->name = "reactivation";
1122                 info->category = "/main/threadpool/";
1123                 info->summary = "Test that a threadpool reactivates when work is added";
1124                 info->description =
1125                         "Push a task into a threadpool. Make sure the task executes and the\n"
1126                         "thread goes idle. Then push a second task and ensure that the thread\n"
1127                         "awakens and executes the second task.";
1128                 return AST_TEST_NOT_RUN;
1129         case TEST_EXECUTE:
1130                 break;
1131         }
1132
1133         tld = test_alloc();
1134         if (!tld) {
1135                 return AST_TEST_FAIL;
1136         }
1137
1138         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1139         if (!listener) {
1140                 goto end;
1141         }
1142
1143         pool = ast_threadpool_create(info->name, listener, &options);
1144         if (!pool) {
1145                 goto end;
1146         }
1147
1148         std1 = simple_task_data_alloc();
1149         std2 = simple_task_data_alloc();
1150         if (!std1 || !std2) {
1151                 goto end;
1152         }
1153
1154         ast_threadpool_push(pool, simple_task, std1);
1155
1156         ast_threadpool_set_size(pool, 1);
1157
1158         res = wait_for_completion(test, std1);
1159         if (res == AST_TEST_FAIL) {
1160                 goto end;
1161         }
1162
1163         res = wait_for_empty_notice(test, tld);
1164         if (res == AST_TEST_FAIL) {
1165                 goto end;
1166         }
1167
1168         res = wait_until_thread_state(test, tld, 0, 1);
1169         if (res == AST_TEST_FAIL) {
1170                 goto end;
1171         }
1172
1173         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1174         if (res == AST_TEST_FAIL) {
1175                 goto end;
1176         }
1177
1178         /* Now make sure the threadpool reactivates when we add a second task */
1179         ast_threadpool_push(pool, simple_task, std2);
1180
1181         res = wait_for_completion(test, std2);
1182         if (res == AST_TEST_FAIL) {
1183                 goto end;
1184         }
1185
1186         res = wait_for_empty_notice(test, tld);
1187         if (res == AST_TEST_FAIL) {
1188                 goto end;
1189         }
1190
1191         res = wait_until_thread_state(test, tld, 0, 1);
1192         if (res == AST_TEST_FAIL) {
1193                 goto end;
1194         }
1195
1196         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1197
1198 end:
1199         ast_threadpool_shutdown(pool);
1200         ao2_cleanup(listener);
1201         ast_free(std1);
1202         ast_free(std2);
1203         ast_free(tld);
1204         return res;
1205
1206 }
1207
1208 struct complex_task_data {
1209         int task_started;
1210         int task_executed;
1211         int continue_task;
1212         ast_mutex_t lock;
1213         ast_cond_t stall_cond;
1214         ast_cond_t notify_cond;
1215 };
1216
1217 static struct complex_task_data *complex_task_data_alloc(void)
1218 {
1219         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1220
1221         if (!ctd) {
1222                 return NULL;
1223         }
1224         ast_mutex_init(&ctd->lock);
1225         ast_cond_init(&ctd->stall_cond, NULL);
1226         ast_cond_init(&ctd->notify_cond, NULL);
1227         return ctd;
1228 }
1229
1230 static int complex_task(void *data)
1231 {
1232         struct complex_task_data *ctd = data;
1233         SCOPED_MUTEX(lock, &ctd->lock);
1234         /* Notify that we started */
1235         ctd->task_started = 1;
1236         ast_cond_signal(&ctd->notify_cond);
1237         while (!ctd->continue_task) {
1238                 ast_cond_wait(&ctd->stall_cond, lock);
1239         }
1240         /* We got poked. Finish up */
1241         ctd->task_executed = 1;
1242         ast_cond_signal(&ctd->notify_cond);
1243         return 0;
1244 }
1245
1246 static void poke_worker(struct complex_task_data *ctd)
1247 {
1248         SCOPED_MUTEX(lock, &ctd->lock);
1249         ctd->continue_task = 1;
1250         ast_cond_signal(&ctd->stall_cond);
1251 }
1252
1253 static int wait_for_complex_start(struct complex_task_data *ctd)
1254 {
1255         struct timeval start = ast_tvnow();
1256         struct timespec end = {
1257                 .tv_sec = start.tv_sec + 5,
1258                 .tv_nsec = start.tv_usec * 1000
1259         };
1260         SCOPED_MUTEX(lock, &ctd->lock);
1261
1262         while (!ctd->task_started) {
1263                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1264                         break;
1265                 }
1266         }
1267
1268         return ctd->task_started;
1269 }
1270
1271 static int has_complex_started(struct complex_task_data *ctd)
1272 {
1273         struct timeval start = ast_tvnow();
1274         struct timespec end = {
1275                 .tv_sec = start.tv_sec + 1,
1276                 .tv_nsec = start.tv_usec * 1000
1277         };
1278         SCOPED_MUTEX(lock, &ctd->lock);
1279
1280         while (!ctd->task_started) {
1281                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1282                         break;
1283                 }
1284         }
1285
1286         return ctd->task_started;
1287 }
1288
1289 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
1290 {
1291         struct timeval start = ast_tvnow();
1292         struct timespec end = {
1293                 .tv_sec = start.tv_sec + 5,
1294                 .tv_nsec = start.tv_usec * 1000
1295         };
1296         enum ast_test_result_state res = AST_TEST_PASS;
1297         SCOPED_MUTEX(lock, &ctd->lock);
1298
1299         while (!ctd->task_executed) {
1300                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1301                         break;
1302                 }
1303         }
1304
1305         if (!ctd->task_executed) {
1306                 res = AST_TEST_FAIL;
1307         }
1308         return res;
1309 }
1310
1311 AST_TEST_DEFINE(threadpool_task_distribution)
1312 {
1313         struct ast_threadpool *pool = NULL;
1314         struct ast_threadpool_listener *listener = NULL;
1315         struct complex_task_data *ctd1 = NULL;
1316         struct complex_task_data *ctd2 = NULL;
1317         enum ast_test_result_state res = AST_TEST_FAIL;
1318         struct test_listener_data *tld = NULL;
1319         struct ast_threadpool_options options = {
1320                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1321                 .idle_timeout = 0,
1322                 .auto_increment = 0,
1323                 .initial_size = 0,
1324                 .max_size = 0,
1325         };
1326
1327         switch (cmd) {
1328         case TEST_INIT:
1329                 info->name = "task_distribution";
1330                 info->category = "/main/threadpool/";
1331                 info->summary = "Test that tasks are evenly distributed to threads";
1332                 info->description =
1333                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1334                         "a separate thread";
1335                 return AST_TEST_NOT_RUN;
1336         case TEST_EXECUTE:
1337                 break;
1338         }
1339
1340         tld = test_alloc();
1341         if (!tld) {
1342                 return AST_TEST_FAIL;
1343         }
1344
1345         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1346         if (!listener) {
1347                 goto end;
1348         }
1349
1350         pool = ast_threadpool_create(info->name, listener, &options);
1351         if (!pool) {
1352                 goto end;
1353         }
1354
1355         ctd1 = complex_task_data_alloc();
1356         ctd2 = complex_task_data_alloc();
1357         if (!ctd1 || !ctd2) {
1358                 goto end;
1359         }
1360
1361         ast_threadpool_push(pool, complex_task, ctd1);
1362         ast_threadpool_push(pool, complex_task, ctd2);
1363
1364         ast_threadpool_set_size(pool, 2);
1365
1366         res = wait_until_thread_state(test, tld, 2, 0);
1367         if (res == AST_TEST_FAIL) {
1368                 goto end;
1369         }
1370
1371         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1372         if (res == AST_TEST_FAIL) {
1373                 goto end;
1374         }
1375
1376         /* The tasks are stalled until we poke them */
1377         poke_worker(ctd1);
1378         poke_worker(ctd2);
1379
1380         res = wait_for_complex_completion(ctd1);
1381         if (res == AST_TEST_FAIL) {
1382                 goto end;
1383         }
1384         res = wait_for_complex_completion(ctd2);
1385         if (res == AST_TEST_FAIL) {
1386                 goto end;
1387         }
1388
1389         res = wait_until_thread_state(test, tld, 0, 2);
1390         if (res == AST_TEST_FAIL) {
1391                 goto end;
1392         }
1393
1394         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1395
1396 end:
1397         ast_threadpool_shutdown(pool);
1398         ao2_cleanup(listener);
1399         ast_free(ctd1);
1400         ast_free(ctd2);
1401         ast_free(tld);
1402         return res;
1403 }
1404
1405 AST_TEST_DEFINE(threadpool_more_destruction)
1406 {
1407         struct ast_threadpool *pool = NULL;
1408         struct ast_threadpool_listener *listener = NULL;
1409         struct complex_task_data *ctd1 = NULL;
1410         struct complex_task_data *ctd2 = NULL;
1411         enum ast_test_result_state res = AST_TEST_FAIL;
1412         struct test_listener_data *tld = NULL;
1413         struct ast_threadpool_options options = {
1414                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1415                 .idle_timeout = 0,
1416                 .auto_increment = 0,
1417                 .initial_size = 0,
1418                 .max_size = 0,
1419         };
1420
1421         switch (cmd) {
1422         case TEST_INIT:
1423                 info->name = "more_destruction";
1424                 info->category = "/main/threadpool/";
1425                 info->summary = "Test that threads are destroyed as expected";
1426                 info->description =
1427                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1428                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1429                         "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1430                         "and ensure that both tasks complete.";
1431                 return AST_TEST_NOT_RUN;
1432         case TEST_EXECUTE:
1433                 break;
1434         }
1435
1436         tld = test_alloc();
1437         if (!tld) {
1438                 return AST_TEST_FAIL;
1439         }
1440
1441         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1442         if (!listener) {
1443                 goto end;
1444         }
1445
1446         pool = ast_threadpool_create(info->name, listener, &options);
1447         if (!pool) {
1448                 goto end;
1449         }
1450
1451         ctd1 = complex_task_data_alloc();
1452         ctd2 = complex_task_data_alloc();
1453         if (!ctd1 || !ctd2) {
1454                 goto end;
1455         }
1456
1457         ast_threadpool_push(pool, complex_task, ctd1);
1458         ast_threadpool_push(pool, complex_task, ctd2);
1459
1460         ast_threadpool_set_size(pool, 4);
1461
1462         res = wait_until_thread_state(test, tld, 2, 2);
1463         if (res == AST_TEST_FAIL) {
1464                 goto end;
1465         }
1466
1467         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1468         if (res == AST_TEST_FAIL) {
1469                 goto end;
1470         }
1471
1472         ast_threadpool_set_size(pool, 1);
1473
1474         /* Shrinking the threadpool should kill off the two idle threads
1475          * and one of the active threads.
1476          */
1477         res = wait_until_thread_state(test, tld, 1, 0);
1478         if (res == AST_TEST_FAIL) {
1479                 goto end;
1480         }
1481
1482         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1483         if (res == AST_TEST_FAIL) {
1484                 goto end;
1485         }
1486
1487         /* The tasks are stalled until we poke them */
1488         poke_worker(ctd1);
1489         poke_worker(ctd2);
1490
1491         res = wait_for_complex_completion(ctd1);
1492         if (res == AST_TEST_FAIL) {
1493                 goto end;
1494         }
1495         res = wait_for_complex_completion(ctd2);
1496         if (res == AST_TEST_FAIL) {
1497                 goto end;
1498         }
1499
1500         res = wait_until_thread_state(test, tld, 0, 1);
1501         if (res == AST_TEST_FAIL) {
1502                 goto end;
1503         }
1504
1505         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1506
1507 end:
1508         ast_threadpool_shutdown(pool);
1509         ao2_cleanup(listener);
1510         ast_free(ctd1);
1511         ast_free(ctd2);
1512         ast_free(tld);
1513         return res;
1514 }
1515
1516 AST_TEST_DEFINE(threadpool_serializer)
1517 {
1518         int started = 0;
1519         int finished = 0;
1520         enum ast_test_result_state res = AST_TEST_FAIL;
1521         struct ast_threadpool *pool = NULL;
1522         struct ast_taskprocessor *uut = NULL;
1523         struct complex_task_data *data1 = NULL;
1524         struct complex_task_data *data2 = NULL;
1525         struct complex_task_data *data3 = NULL;
1526         struct ast_threadpool_options options = {
1527                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1528                 .idle_timeout = 0,
1529                 .auto_increment = 0,
1530                 .initial_size = 2,
1531                 .max_size = 0,
1532         };
1533
1534         switch (cmd) {
1535         case TEST_INIT:
1536                 info->name = "threadpool_serializer";
1537                 info->category = "/main/threadpool/";
1538                 info->summary = "Test that serializers";
1539                 info->description =
1540                         "Ensures that tasks enqueued to a serialize execute in sequence.";
1541                 return AST_TEST_NOT_RUN;
1542         case TEST_EXECUTE:
1543                 break;
1544         }
1545
1546         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1547         if (!pool) {
1548                 ast_test_status_update(test, "Could not create threadpool\n");
1549                 goto end;
1550         }
1551         uut = ast_threadpool_serializer("ser1", pool);
1552         data1 = complex_task_data_alloc();
1553         data2 = complex_task_data_alloc();
1554         data3 = complex_task_data_alloc();
1555         if (!uut || !data1 || !data2 || !data3) {
1556                 ast_test_status_update(test, "Allocation failed\n");
1557                 goto end;
1558         }
1559
1560         /* This should start right away */
1561         if (ast_taskprocessor_push(uut, complex_task, data1)) {
1562                 ast_test_status_update(test, "Failed to enqueue data1\n");
1563                 goto end;
1564         }
1565         started = wait_for_complex_start(data1);
1566         if (!started) {
1567                 ast_test_status_update(test, "Failed to start data1\n");
1568                 goto end;
1569         }
1570
1571         /* This should not start until data 1 is complete */
1572         if (ast_taskprocessor_push(uut, complex_task, data2)) {
1573                 ast_test_status_update(test, "Failed to enqueue data2\n");
1574                 goto end;
1575         }
1576         started = has_complex_started(data2);
1577         if (started) {
1578                 ast_test_status_update(test, "data2 started out of order\n");
1579                 goto end;
1580         }
1581
1582         /* But the free thread in the pool can still run */
1583         if (ast_threadpool_push(pool, complex_task, data3)) {
1584                 ast_test_status_update(test, "Failed to enqueue data3\n");
1585         }
1586         started = wait_for_complex_start(data3);
1587         if (!started) {
1588                 ast_test_status_update(test, "Failed to start data3\n");
1589                 goto end;
1590         }
1591
1592         /* Finishing data1 should allow data2 to start */
1593         poke_worker(data1);
1594         finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1595         if (!finished) {
1596                 ast_test_status_update(test, "data1 couldn't finish\n");
1597                 goto end;
1598         }
1599         started = wait_for_complex_start(data2);
1600         if (!started) {
1601                 ast_test_status_update(test, "Failed to start data2\n");
1602                 goto end;
1603         }
1604
1605         /* Finish up */
1606         poke_worker(data2);
1607         finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1608         if (!finished) {
1609                 ast_test_status_update(test, "data2 couldn't finish\n");
1610                 goto end;
1611         }
1612         poke_worker(data3);
1613         finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1614         if (!finished) {
1615                 ast_test_status_update(test, "data3 couldn't finish\n");
1616                 goto end;
1617         }
1618
1619         res = AST_TEST_PASS;
1620
1621 end:
1622         poke_worker(data1);
1623         poke_worker(data2);
1624         poke_worker(data3);
1625         ast_taskprocessor_unreference(uut);
1626         ast_threadpool_shutdown(pool);
1627         ast_free(data1);
1628         ast_free(data2);
1629         ast_free(data3);
1630         return res;
1631 }
1632
1633 AST_TEST_DEFINE(threadpool_serializer_dupe)
1634 {
1635         enum ast_test_result_state res = AST_TEST_FAIL;
1636         struct ast_threadpool *pool = NULL;
1637         struct ast_taskprocessor *uut = NULL;
1638         struct ast_taskprocessor *there_can_be_only_one = NULL;
1639         struct ast_threadpool_options options = {
1640                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1641                 .idle_timeout = 0,
1642                 .auto_increment = 0,
1643                 .initial_size = 2,
1644                 .max_size = 0,
1645         };
1646
1647         switch (cmd) {
1648         case TEST_INIT:
1649                 info->name = "threadpool_serializer_dupe";
1650                 info->category = "/main/threadpool/";
1651                 info->summary = "Test that serializers are uniquely named";
1652                 info->description =
1653                         "Creating two serializers with the same name should\n"
1654                         "result in error.";
1655                 return AST_TEST_NOT_RUN;
1656         case TEST_EXECUTE:
1657                 break;
1658         }
1659
1660         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1661         if (!pool) {
1662                 ast_test_status_update(test, "Could not create threadpool\n");
1663                 goto end;
1664         }
1665
1666         uut = ast_threadpool_serializer("highlander", pool);
1667         if (!uut) {
1668                 ast_test_status_update(test, "Allocation failed\n");
1669                 goto end;
1670         }
1671
1672         there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1673         if (there_can_be_only_one) {
1674                 ast_taskprocessor_unreference(there_can_be_only_one);
1675                 ast_test_status_update(test, "Duplicate name error\n");
1676                 goto end;
1677         }
1678
1679         res = AST_TEST_PASS;
1680
1681 end:
1682         ast_taskprocessor_unreference(uut);
1683         ast_threadpool_shutdown(pool);
1684         return res;
1685 }
1686
1687 static int unload_module(void)
1688 {
1689         ast_test_unregister(threadpool_push);
1690         ast_test_unregister(threadpool_initial_threads);
1691         ast_test_unregister(threadpool_thread_creation);
1692         ast_test_unregister(threadpool_thread_destruction);
1693         ast_test_unregister(threadpool_thread_timeout);
1694         ast_test_unregister(threadpool_thread_timeout_thrash);
1695         ast_test_unregister(threadpool_one_task_one_thread);
1696         ast_test_unregister(threadpool_one_thread_one_task);
1697         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1698         ast_test_unregister(threadpool_auto_increment);
1699         ast_test_unregister(threadpool_max_size);
1700         ast_test_unregister(threadpool_reactivation);
1701         ast_test_unregister(threadpool_task_distribution);
1702         ast_test_unregister(threadpool_more_destruction);
1703         ast_test_unregister(threadpool_serializer);
1704         ast_test_unregister(threadpool_serializer_dupe);
1705         return 0;
1706 }
1707
1708 static int load_module(void)
1709 {
1710         ast_test_register(threadpool_push);
1711         ast_test_register(threadpool_initial_threads);
1712         ast_test_register(threadpool_thread_creation);
1713         ast_test_register(threadpool_thread_destruction);
1714         ast_test_register(threadpool_thread_timeout);
1715         ast_test_register(threadpool_thread_timeout_thrash);
1716         ast_test_register(threadpool_one_task_one_thread);
1717         ast_test_register(threadpool_one_thread_one_task);
1718         ast_test_register(threadpool_one_thread_multiple_tasks);
1719         ast_test_register(threadpool_auto_increment);
1720         ast_test_register(threadpool_max_size);
1721         ast_test_register(threadpool_reactivation);
1722         ast_test_register(threadpool_task_distribution);
1723         ast_test_register(threadpool_more_destruction);
1724         ast_test_register(threadpool_serializer);
1725         ast_test_register(threadpool_serializer_dupe);
1726         return AST_MODULE_LOAD_SUCCESS;
1727 }
1728
1729 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");