stasis cache: Enhance to keep track of an item from different entities.
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/lock.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/module.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/test.h"
40 #include "asterisk/threadpool.h"
41
42 struct test_listener_data {
43         int num_active;
44         int num_idle;
45         int task_pushed;
46         int num_tasks;
47         int empty_notice;
48         int was_empty;
49         ast_mutex_t lock;
50         ast_cond_t cond;
51 };
52
53 static struct test_listener_data *test_alloc(void)
54 {
55         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56         if (!tld) {
57                 return NULL;
58         }
59         ast_mutex_init(&tld->lock);
60         ast_cond_init(&tld->cond, NULL);
61         return tld;
62 }
63
64 static void test_state_changed(struct ast_threadpool *pool,
65                 struct ast_threadpool_listener *listener,
66                 int active_threads,
67                 int idle_threads)
68 {
69         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
70         SCOPED_MUTEX(lock, &tld->lock);
71         tld->num_active = active_threads;
72         tld->num_idle = idle_threads;
73         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74         ast_cond_signal(&tld->cond);
75 }
76
77 static void test_task_pushed(struct ast_threadpool *pool,
78                 struct ast_threadpool_listener *listener,
79                 int was_empty)
80 {
81         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
82         SCOPED_MUTEX(lock, &tld->lock);
83         tld->task_pushed = 1;
84         ++tld->num_tasks;
85         tld->was_empty = was_empty;
86         ast_cond_signal(&tld->cond);
87 }
88
89 static void test_emptied(struct ast_threadpool *pool,
90                 struct ast_threadpool_listener *listener)
91 {
92         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
93         SCOPED_MUTEX(lock, &tld->lock);
94         tld->empty_notice = 1;
95         ast_cond_signal(&tld->cond);
96 }
97
98 static void test_shutdown(struct ast_threadpool_listener *listener)
99 {
100         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
101         ast_cond_destroy(&tld->cond);
102         ast_mutex_destroy(&tld->lock);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .state_changed = test_state_changed,
107         .task_pushed = test_task_pushed,
108         .emptied = test_emptied,
109         .shutdown = test_shutdown,
110 };
111
112 struct simple_task_data {
113         int task_executed;
114         ast_mutex_t lock;
115         ast_cond_t cond;
116 };
117
118 static struct simple_task_data *simple_task_data_alloc(void)
119 {
120         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121
122         if (!std) {
123                 return NULL;
124         }
125         ast_mutex_init(&std->lock);
126         ast_cond_init(&std->cond, NULL);
127         return std;
128 }
129
130 static int simple_task(void *data)
131 {
132         struct simple_task_data *std = data;
133         SCOPED_MUTEX(lock, &std->lock);
134         std->task_executed = 1;
135         ast_cond_signal(&std->cond);
136         return 0;
137 }
138
139 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
140 {
141         struct timeval start = ast_tvnow();
142         struct timespec end = {
143                 .tv_sec = start.tv_sec + 5,
144                 .tv_nsec = start.tv_usec * 1000
145         };
146         enum ast_test_result_state res = AST_TEST_PASS;
147         SCOPED_MUTEX(lock, &tld->lock);
148
149         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
150                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
151                         break;
152                 }
153         }
154
155         if (tld->num_active != num_active && tld->num_idle != num_idle) {
156                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
157                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
158                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
159                 res = AST_TEST_FAIL;
160         }
161
162         return res;
163 }
164
165 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
166 {
167         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
168         struct timeval start = ast_tvnow();
169         struct timespec end = {
170                 .tv_sec = start.tv_sec + 5,
171                 .tv_nsec = start.tv_usec * 1000
172         };
173         SCOPED_MUTEX(lock, &tld->lock);
174
175         while (!tld->task_pushed) {
176                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
177                         break;
178                 }
179         }
180 }
181
182 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
183 {
184         struct timeval start = ast_tvnow();
185         struct timespec end = {
186                 .tv_sec = start.tv_sec + 5,
187                 .tv_nsec = start.tv_usec * 1000
188         };
189         enum ast_test_result_state res = AST_TEST_PASS;
190         SCOPED_MUTEX(lock, &std->lock);
191
192         while (!std->task_executed) {
193                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
194                         break;
195                 }
196         }
197
198         if (!std->task_executed) {
199                 ast_test_status_update(test, "Task execution did not occur\n");
200                 res = AST_TEST_FAIL;
201         }
202         return res;
203 }
204
205 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
206 {
207         struct timeval start = ast_tvnow();
208         struct timespec end = {
209                 .tv_sec = start.tv_sec + 5,
210                 .tv_nsec = start.tv_usec * 1000
211         };
212         enum ast_test_result_state res = AST_TEST_PASS;
213         SCOPED_MUTEX(lock, &tld->lock);
214
215         while (!tld->empty_notice) {
216                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
217                         break;
218                 }
219         }
220
221         if (!tld->empty_notice) {
222                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
223                 res = AST_TEST_FAIL;
224         }
225
226         return res;
227 }
228
229 static enum ast_test_result_state listener_check(
230                 struct ast_test *test,
231                 struct ast_threadpool_listener *listener,
232                 int task_pushed,
233                 int was_empty,
234                 int num_tasks,
235                 int num_active,
236                 int num_idle,
237                 int empty_notice)
238 {
239         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
240         enum ast_test_result_state res = AST_TEST_PASS;
241
242         if (tld->task_pushed != task_pushed) {
243                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
244                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
245                 res = AST_TEST_FAIL;
246         }
247         if (tld->was_empty != was_empty) {
248                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
249                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
250                 res = AST_TEST_FAIL;
251         }
252         if (tld->num_tasks!= num_tasks) {
253                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
254                                 num_tasks, tld->num_tasks);
255                 res = AST_TEST_FAIL;
256         }
257         if (tld->num_active != num_active) {
258                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
259                                 num_active, tld->num_active);
260                 res = AST_TEST_FAIL;
261         }
262         if (tld->num_idle != num_idle) {
263                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
264                                 num_idle, tld->num_idle);
265                 res = AST_TEST_FAIL;
266         }
267         if (tld->empty_notice != empty_notice) {
268                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
269                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
270                 res = AST_TEST_FAIL;
271         }
272
273         return res;
274 }
275
276 AST_TEST_DEFINE(threadpool_push)
277 {
278         struct ast_threadpool *pool = NULL;
279         struct ast_threadpool_listener *listener = NULL;
280         struct simple_task_data *std = NULL;
281         struct test_listener_data *tld = NULL;
282         enum ast_test_result_state res = AST_TEST_FAIL;
283         struct ast_threadpool_options options = {
284                 .version = AST_THREADPOOL_OPTIONS_VERSION,
285                 .idle_timeout = 0,
286                 .auto_increment = 0,
287                 .initial_size = 0,
288                 .max_size = 0,
289         };
290
291         switch (cmd) {
292         case TEST_INIT:
293                 info->name = "push";
294                 info->category = "/main/threadpool/";
295                 info->summary = "Test task";
296                 info->description =
297                         "Basic threadpool test";
298                 return AST_TEST_NOT_RUN;
299         case TEST_EXECUTE:
300                 break;
301         }
302         tld = test_alloc();
303         if (!tld) {
304                 return AST_TEST_FAIL;
305         }
306
307         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
308         if (!listener) {
309                 goto end;
310         }
311
312         pool = ast_threadpool_create(info->name, listener, &options);
313         if (!pool) {
314                 goto end;
315         }
316
317         std = simple_task_data_alloc();
318         if (!std) {
319                 goto end;
320         }
321
322         ast_threadpool_push(pool, simple_task, std);
323
324         wait_for_task_pushed(listener);
325
326         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
327
328 end:
329         ast_threadpool_shutdown(pool);
330         ao2_cleanup(listener);
331         ast_free(std);
332         ast_free(tld);
333         return res;
334 }
335
336 AST_TEST_DEFINE(threadpool_initial_threads)
337 {
338         struct ast_threadpool *pool = NULL;
339         struct ast_threadpool_listener *listener = NULL;
340         enum ast_test_result_state res = AST_TEST_FAIL;
341         struct test_listener_data *tld = NULL;
342         struct ast_threadpool_options options = {
343                 .version = AST_THREADPOOL_OPTIONS_VERSION,
344                 .idle_timeout = 0,
345                 .auto_increment = 0,
346                 .initial_size = 3,
347                 .max_size = 0,
348         };
349
350         switch (cmd) {
351         case TEST_INIT:
352                 info->name = "initial_threads";
353                 info->category = "/main/threadpool/";
354                 info->summary = "Test threadpool initialization state";
355                 info->description =
356                         "Ensure that a threadpool created with a specific size contains the\n"
357                         "proper number of idle threads.";
358                 return AST_TEST_NOT_RUN;
359         case TEST_EXECUTE:
360                 break;
361         }
362
363         tld = test_alloc();
364         if (!tld) {
365                 return AST_TEST_FAIL;
366         }
367
368         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
369         if (!listener) {
370                 goto end;
371         }
372
373         pool = ast_threadpool_create(info->name, listener, &options);
374         if (!pool) {
375                 goto end;
376         }
377
378         res = wait_until_thread_state(test, tld, 0, 3);
379
380 end:
381         ast_threadpool_shutdown(pool);
382         ao2_cleanup(listener);
383         ast_free(tld);
384         return res;
385 }
386
387
388 AST_TEST_DEFINE(threadpool_thread_creation)
389 {
390         struct ast_threadpool *pool = NULL;
391         struct ast_threadpool_listener *listener = NULL;
392         enum ast_test_result_state res = AST_TEST_FAIL;
393         struct test_listener_data *tld = NULL;
394         struct ast_threadpool_options options = {
395                 .version = AST_THREADPOOL_OPTIONS_VERSION,
396                 .idle_timeout = 0,
397                 .auto_increment = 0,
398                 .initial_size = 0,
399                 .max_size = 0,
400         };
401
402         switch (cmd) {
403         case TEST_INIT:
404                 info->name = "thread_creation";
405                 info->category = "/main/threadpool/";
406                 info->summary = "Test threadpool thread creation";
407                 info->description =
408                         "Ensure that threads can be added to a threadpool";
409                 return AST_TEST_NOT_RUN;
410         case TEST_EXECUTE:
411                 break;
412         }
413
414         tld = test_alloc();
415         if (!tld) {
416                 return AST_TEST_FAIL;
417         }
418
419         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
420         if (!listener) {
421                 goto end;
422         }
423
424         pool = ast_threadpool_create(info->name, listener, &options);
425         if (!pool) {
426                 goto end;
427         }
428
429         /* Now let's create a thread. It should start active, then go
430          * idle immediately
431          */
432         ast_threadpool_set_size(pool, 1);
433
434         res = wait_until_thread_state(test, tld, 0, 1);
435
436 end:
437         ast_threadpool_shutdown(pool);
438         ao2_cleanup(listener);
439         ast_free(tld);
440         return res;
441 }
442
443 AST_TEST_DEFINE(threadpool_thread_destruction)
444 {
445         struct ast_threadpool *pool = NULL;
446         struct ast_threadpool_listener *listener = NULL;
447         enum ast_test_result_state res = AST_TEST_FAIL;
448         struct test_listener_data *tld = NULL;
449         struct ast_threadpool_options options = {
450                 .version = AST_THREADPOOL_OPTIONS_VERSION,
451                 .idle_timeout = 0,
452                 .auto_increment = 0,
453                 .initial_size = 0,
454                 .max_size = 0,
455         };
456
457         switch (cmd) {
458         case TEST_INIT:
459                 info->name = "thread_destruction";
460                 info->category = "/main/threadpool/";
461                 info->summary = "Test threadpool thread destruction";
462                 info->description =
463                         "Ensure that threads are properly destroyed in a threadpool";
464                 return AST_TEST_NOT_RUN;
465         case TEST_EXECUTE:
466                 break;
467         }
468
469         tld = test_alloc();
470         if (!tld) {
471                 return AST_TEST_FAIL;
472         }
473
474         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
475         if (!listener) {
476                 goto end;
477         }
478
479         pool = ast_threadpool_create(info->name, listener, &options);
480         if (!pool) {
481                 goto end;
482         }
483
484         ast_threadpool_set_size(pool, 3);
485
486         res = wait_until_thread_state(test, tld, 0, 3);
487         if (res == AST_TEST_FAIL) {
488                 goto end;
489         }
490
491         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
492         if (res == AST_TEST_FAIL) {
493                 goto end;
494         }
495
496         ast_threadpool_set_size(pool, 2);
497
498         res = wait_until_thread_state(test, tld, 0, 2);
499
500 end:
501         ast_threadpool_shutdown(pool);
502         ao2_cleanup(listener);
503         ast_free(tld);
504         return res;
505 }
506
507 AST_TEST_DEFINE(threadpool_thread_timeout)
508 {
509         struct ast_threadpool *pool = NULL;
510         struct ast_threadpool_listener *listener = NULL;
511         enum ast_test_result_state res = AST_TEST_FAIL;
512         struct test_listener_data *tld = NULL;
513         struct ast_threadpool_options options = {
514                 .version = AST_THREADPOOL_OPTIONS_VERSION,
515                 .idle_timeout = 2,
516                 .auto_increment = 0,
517                 .initial_size = 0,
518                 .max_size = 0,
519         };
520
521         switch (cmd) {
522         case TEST_INIT:
523                 info->name = "thread_timeout";
524                 info->category = "/main/threadpool/";
525                 info->summary = "Test threadpool thread timeout";
526                 info->description =
527                         "Ensure that a thread with a two second timeout dies as expected.";
528                 return AST_TEST_NOT_RUN;
529         case TEST_EXECUTE:
530                 break;
531         }
532
533         tld = test_alloc();
534         if (!tld) {
535                 return AST_TEST_FAIL;
536         }
537
538         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
539         if (!listener) {
540                 goto end;
541         }
542
543         pool = ast_threadpool_create(info->name, listener, &options);
544         if (!pool) {
545                 goto end;
546         }
547
548         ast_threadpool_set_size(pool, 1);
549
550         res = wait_until_thread_state(test, tld, 0, 1);
551         if (res == AST_TEST_FAIL) {
552                 goto end;
553         }
554
555         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
556         if (res == AST_TEST_FAIL) {
557                 goto end;
558         }
559
560         res = wait_until_thread_state(test, tld, 0, 0);
561         if (res == AST_TEST_FAIL) {
562                 goto end;
563         }
564
565         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
566
567 end:
568         ast_threadpool_shutdown(pool);
569         ao2_cleanup(listener);
570         ast_free(tld);
571         return res;
572 }
573
574 AST_TEST_DEFINE(threadpool_one_task_one_thread)
575 {
576         struct ast_threadpool *pool = NULL;
577         struct ast_threadpool_listener *listener = NULL;
578         struct simple_task_data *std = NULL;
579         enum ast_test_result_state res = AST_TEST_FAIL;
580         struct test_listener_data *tld = NULL;
581         struct ast_threadpool_options options = {
582                 .version = AST_THREADPOOL_OPTIONS_VERSION,
583                 .idle_timeout = 0,
584                 .auto_increment = 0,
585                 .initial_size = 0,
586                 .max_size = 0,
587         };
588
589         switch (cmd) {
590         case TEST_INIT:
591                 info->name = "one_task_one_thread";
592                 info->category = "/main/threadpool/";
593                 info->summary = "Test a single task with a single thread";
594                 info->description =
595                         "Push a task into an empty threadpool, then add a thread to the pool.";
596                 return AST_TEST_NOT_RUN;
597         case TEST_EXECUTE:
598                 break;
599         }
600
601         tld = test_alloc();
602         if (!tld) {
603                 return AST_TEST_FAIL;
604         }
605
606         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
607         if (!listener) {
608                 goto end;
609         }
610
611         pool = ast_threadpool_create(info->name, listener, &options);
612         if (!pool) {
613                 goto end;
614         }
615
616         std = simple_task_data_alloc();
617         if (!std) {
618                 goto end;
619         }
620
621         ast_threadpool_push(pool, simple_task, std);
622
623         ast_threadpool_set_size(pool, 1);
624
625         /* Threads added to the pool are active when they start,
626          * so the newly-created thread should immediately execute
627          * the waiting task.
628          */
629         res = wait_for_completion(test, std);
630         if (res == AST_TEST_FAIL) {
631                 goto end;
632         }
633
634         res = wait_for_empty_notice(test, tld);
635         if (res == AST_TEST_FAIL) {
636                 goto end;
637         }
638
639         /* After completing the task, the thread should go idle */
640         res = wait_until_thread_state(test, tld, 0, 1);
641         if (res == AST_TEST_FAIL) {
642                 goto end;
643         }
644
645         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
646
647 end:
648         ast_threadpool_shutdown(pool);
649         ao2_cleanup(listener);
650         ast_free(std);
651         ast_free(tld);
652         return res;
653
654 }
655
656 AST_TEST_DEFINE(threadpool_one_thread_one_task)
657 {
658         struct ast_threadpool *pool = NULL;
659         struct ast_threadpool_listener *listener = NULL;
660         struct simple_task_data *std = NULL;
661         enum ast_test_result_state res = AST_TEST_FAIL;
662         struct test_listener_data *tld = NULL;
663         struct ast_threadpool_options options = {
664                 .version = AST_THREADPOOL_OPTIONS_VERSION,
665                 .idle_timeout = 0,
666                 .auto_increment = 0,
667                 .initial_size = 0,
668                 .max_size = 0,
669         };
670
671         switch (cmd) {
672         case TEST_INIT:
673                 info->name = "one_thread_one_task";
674                 info->category = "/main/threadpool/";
675                 info->summary = "Test a single thread with a single task";
676                 info->description =
677                         "Add a thread to the pool and then push a task to it.";
678                 return AST_TEST_NOT_RUN;
679         case TEST_EXECUTE:
680                 break;
681         }
682
683         tld = test_alloc();
684         if (!tld) {
685                 return AST_TEST_FAIL;
686         }
687
688         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
689         if (!listener) {
690                 goto end;
691         }
692
693         pool = ast_threadpool_create(info->name, listener, &options);
694         if (!pool) {
695                 goto end;
696         }
697
698         std = simple_task_data_alloc();
699         if (!std) {
700                 goto end;
701         }
702
703         ast_threadpool_set_size(pool, 1);
704
705         res = wait_until_thread_state(test, tld, 0, 1);
706         if (res == AST_TEST_FAIL) {
707                 goto end;
708         }
709
710         ast_threadpool_push(pool, simple_task, std);
711
712         res = wait_for_completion(test, std);
713         if (res == AST_TEST_FAIL) {
714                 goto end;
715         }
716
717         res = wait_for_empty_notice(test, tld);
718         if (res == AST_TEST_FAIL) {
719                 goto end;
720         }
721
722         /* After completing the task, the thread should go idle */
723         res = wait_until_thread_state(test, tld, 0, 1);
724         if (res == AST_TEST_FAIL) {
725                 goto end;
726         }
727
728         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
729
730 end:
731         ast_threadpool_shutdown(pool);
732         ao2_cleanup(listener);
733         ast_free(std);
734         ast_free(tld);
735         return res;
736 }
737
738 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
739 {
740         struct ast_threadpool *pool = NULL;
741         struct ast_threadpool_listener *listener = NULL;
742         struct simple_task_data *std1 = NULL;
743         struct simple_task_data *std2 = NULL;
744         struct simple_task_data *std3 = NULL;
745         enum ast_test_result_state res = AST_TEST_FAIL;
746         struct test_listener_data *tld = NULL;
747         struct ast_threadpool_options options = {
748                 .version = AST_THREADPOOL_OPTIONS_VERSION,
749                 .idle_timeout = 0,
750                 .auto_increment = 0,
751                 .initial_size = 0,
752                 .max_size = 0,
753         };
754
755         switch (cmd) {
756         case TEST_INIT:
757                 info->name = "one_thread_multiple_tasks";
758                 info->category = "/main/threadpool/";
759                 info->summary = "Test a single thread with multiple tasks";
760                 info->description =
761                         "Add a thread to the pool and then push three tasks to it.";
762                 return AST_TEST_NOT_RUN;
763         case TEST_EXECUTE:
764                 break;
765         }
766
767         tld = test_alloc();
768         if (!tld) {
769                 return AST_TEST_FAIL;
770         }
771
772         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
773         if (!listener) {
774                 goto end;
775         }
776
777         pool = ast_threadpool_create(info->name, listener, &options);
778         if (!pool) {
779                 goto end;
780         }
781
782         std1 = simple_task_data_alloc();
783         std2 = simple_task_data_alloc();
784         std3 = simple_task_data_alloc();
785         if (!std1 || !std2 || !std3) {
786                 goto end;
787         }
788
789         ast_threadpool_set_size(pool, 1);
790
791         res = wait_until_thread_state(test, tld, 0, 1);
792         if (res == AST_TEST_FAIL) {
793                 goto end;
794         }
795
796         ast_threadpool_push(pool, simple_task, std1);
797         ast_threadpool_push(pool, simple_task, std2);
798         ast_threadpool_push(pool, simple_task, std3);
799
800         res = wait_for_completion(test, std1);
801         if (res == AST_TEST_FAIL) {
802                 goto end;
803         }
804         res = wait_for_completion(test, std2);
805         if (res == AST_TEST_FAIL) {
806                 goto end;
807         }
808         res = wait_for_completion(test, std3);
809         if (res == AST_TEST_FAIL) {
810                 goto end;
811         }
812
813         res = wait_for_empty_notice(test, tld);
814         if (res == AST_TEST_FAIL) {
815                 goto end;
816         }
817
818         res = wait_until_thread_state(test, tld, 0, 1);
819         if (res == AST_TEST_FAIL) {
820                 goto end;
821         }
822
823         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
824
825 end:
826         ast_threadpool_shutdown(pool);
827         ao2_cleanup(listener);
828         ast_free(std1);
829         ast_free(std2);
830         ast_free(std3);
831         ast_free(tld);
832         return res;
833 }
834
835 AST_TEST_DEFINE(threadpool_auto_increment)
836 {
837         struct ast_threadpool *pool = NULL;
838         struct ast_threadpool_listener *listener = NULL;
839         struct simple_task_data *std1 = NULL;
840         struct simple_task_data *std2 = NULL;
841         struct simple_task_data *std3 = NULL;
842         struct simple_task_data *std4 = NULL;
843         enum ast_test_result_state res = AST_TEST_FAIL;
844         struct test_listener_data *tld = NULL;
845         struct ast_threadpool_options options = {
846                 .version = AST_THREADPOOL_OPTIONS_VERSION,
847                 .idle_timeout = 0,
848                 .auto_increment = 3,
849                 .initial_size = 0,
850                 .max_size = 0,
851         };
852
853         switch (cmd) {
854         case TEST_INIT:
855                 info->name = "auto_increment";
856                 info->category = "/main/threadpool/";
857                 info->summary = "Test that the threadpool grows as tasks are added";
858                 info->description =
859                         "Create an empty threadpool and push a task to it. Once the task is\n"
860                         "pushed, the threadpool should add three threads and be able to\n"
861                         "handle the task. The threads should then go idle\n";
862                 return AST_TEST_NOT_RUN;
863         case TEST_EXECUTE:
864                 break;
865         }
866
867         tld = test_alloc();
868         if (!tld) {
869                 return AST_TEST_FAIL;
870         }
871
872         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
873         if (!listener) {
874                 goto end;
875         }
876
877         pool = ast_threadpool_create(info->name, listener, &options);
878         if (!pool) {
879                 goto end;
880         }
881
882         std1 = simple_task_data_alloc();
883         std2 = simple_task_data_alloc();
884         std3 = simple_task_data_alloc();
885         std4 = simple_task_data_alloc();
886         if (!std1 || !std2 || !std3 || !std4) {
887                 goto end;
888         }
889
890         ast_threadpool_push(pool, simple_task, std1);
891
892         /* Pushing the task should result in the threadpool growing
893          * by three threads. This will allow the task to actually execute
894          */
895         res = wait_for_completion(test, std1);
896         if (res == AST_TEST_FAIL) {
897                 goto end;
898         }
899
900         res = wait_for_empty_notice(test, tld);
901         if (res == AST_TEST_FAIL) {
902                 goto end;
903         }
904
905         res = wait_until_thread_state(test, tld, 0, 3);
906         if (res == AST_TEST_FAIL) {
907                 goto end;
908         }
909
910         /* Now push three tasks into the pool and ensure the pool does not
911          * grow.
912          */
913         ast_threadpool_push(pool, simple_task, std2);
914         ast_threadpool_push(pool, simple_task, std3);
915         ast_threadpool_push(pool, simple_task, std4);
916
917         res = wait_for_completion(test, std2);
918         if (res == AST_TEST_FAIL) {
919                 goto end;
920         }
921         res = wait_for_completion(test, std3);
922         if (res == AST_TEST_FAIL) {
923                 goto end;
924         }
925         res = wait_for_completion(test, std4);
926         if (res == AST_TEST_FAIL) {
927                 goto end;
928         }
929
930         res = wait_for_empty_notice(test, tld);
931         if (res == AST_TEST_FAIL) {
932                 goto end;
933         }
934
935         res = wait_until_thread_state(test, tld, 0, 3);
936         if (res == AST_TEST_FAIL) {
937                 goto end;
938         }
939         res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
940
941 end:
942         ast_threadpool_shutdown(pool);
943         ao2_cleanup(listener);
944         ast_free(std1);
945         ast_free(std2);
946         ast_free(std3);
947         ast_free(std4);
948         ast_free(tld);
949         return res;
950 }
951
952 AST_TEST_DEFINE(threadpool_max_size)
953 {
954         struct ast_threadpool *pool = NULL;
955         struct ast_threadpool_listener *listener = NULL;
956         struct simple_task_data *std = NULL;
957         enum ast_test_result_state res = AST_TEST_FAIL;
958         struct test_listener_data *tld = NULL;
959         struct ast_threadpool_options options = {
960                 .version = AST_THREADPOOL_OPTIONS_VERSION,
961                 .idle_timeout = 0,
962                 .auto_increment = 3,
963                 .initial_size = 0,
964                 .max_size = 2,
965         };
966
967         switch (cmd) {
968         case TEST_INIT:
969                 info->name = "max_size";
970                 info->category = "/main/threadpool/";
971                 info->summary = "Test that the threadpool does not exceed its maximum size restriction";
972                 info->description =
973                         "Create an empty threadpool and push a task to it. Once the task is\n"
974                         "pushed, the threadpool should attempt to grow by three threads, but the\n"
975                         "pool's restrictions should only allow two threads to be added.\n";
976                 return AST_TEST_NOT_RUN;
977         case TEST_EXECUTE:
978                 break;
979         }
980
981         tld = test_alloc();
982         if (!tld) {
983                 return AST_TEST_FAIL;
984         }
985
986         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
987         if (!listener) {
988                 goto end;
989         }
990
991         pool = ast_threadpool_create(info->name, listener, &options);
992         if (!pool) {
993                 goto end;
994         }
995
996         std = simple_task_data_alloc();
997         if (!std) {
998                 goto end;
999         }
1000
1001         ast_threadpool_push(pool, simple_task, std);
1002
1003         res = wait_for_completion(test, std);
1004         if (res == AST_TEST_FAIL) {
1005                 goto end;
1006         }
1007
1008         res = wait_until_thread_state(test, tld, 0, 2);
1009         if (res == AST_TEST_FAIL) {
1010                 goto end;
1011         }
1012
1013         res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1014 end:
1015         ast_threadpool_shutdown(pool);
1016         ao2_cleanup(listener);
1017         ast_free(std);
1018         ast_free(tld);
1019         return res;
1020 }
1021
1022 AST_TEST_DEFINE(threadpool_reactivation)
1023 {
1024         struct ast_threadpool *pool = NULL;
1025         struct ast_threadpool_listener *listener = NULL;
1026         struct simple_task_data *std1 = NULL;
1027         struct simple_task_data *std2 = NULL;
1028         enum ast_test_result_state res = AST_TEST_FAIL;
1029         struct test_listener_data *tld = NULL;
1030         struct ast_threadpool_options options = {
1031                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1032                 .idle_timeout = 0,
1033                 .auto_increment = 0,
1034                 .initial_size = 0,
1035                 .max_size = 0,
1036         };
1037
1038         switch (cmd) {
1039         case TEST_INIT:
1040                 info->name = "reactivation";
1041                 info->category = "/main/threadpool/";
1042                 info->summary = "Test that a threadpool reactivates when work is added";
1043                 info->description =
1044                         "Push a task into a threadpool. Make sure the task executes and the\n"
1045                         "thread goes idle. Then push a second task and ensure that the thread\n"
1046                         "awakens and executes the second task.\n";
1047                 return AST_TEST_NOT_RUN;
1048         case TEST_EXECUTE:
1049                 break;
1050         }
1051
1052         tld = test_alloc();
1053         if (!tld) {
1054                 return AST_TEST_FAIL;
1055         }
1056
1057         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1058         if (!listener) {
1059                 goto end;
1060         }
1061
1062         pool = ast_threadpool_create(info->name, listener, &options);
1063         if (!pool) {
1064                 goto end;
1065         }
1066
1067         std1 = simple_task_data_alloc();
1068         std2 = simple_task_data_alloc();
1069         if (!std1 || !std2) {
1070                 goto end;
1071         }
1072
1073         ast_threadpool_push(pool, simple_task, std1);
1074
1075         ast_threadpool_set_size(pool, 1);
1076
1077         res = wait_for_completion(test, std1);
1078         if (res == AST_TEST_FAIL) {
1079                 goto end;
1080         }
1081
1082         res = wait_for_empty_notice(test, tld);
1083         if (res == AST_TEST_FAIL) {
1084                 goto end;
1085         }
1086
1087         res = wait_until_thread_state(test, tld, 0, 1);
1088         if (res == AST_TEST_FAIL) {
1089                 goto end;
1090         }
1091
1092         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1093         if (res == AST_TEST_FAIL) {
1094                 goto end;
1095         }
1096
1097         /* Now make sure the threadpool reactivates when we add a second task */
1098         ast_threadpool_push(pool, simple_task, std2);
1099
1100         res = wait_for_completion(test, std2);
1101         if (res == AST_TEST_FAIL) {
1102                 goto end;
1103         }
1104
1105         res = wait_for_empty_notice(test, tld);
1106         if (res == AST_TEST_FAIL) {
1107                 goto end;
1108         }
1109
1110         res = wait_until_thread_state(test, tld, 0, 1);
1111         if (res == AST_TEST_FAIL) {
1112                 goto end;
1113         }
1114
1115         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1116
1117 end:
1118         ast_threadpool_shutdown(pool);
1119         ao2_cleanup(listener);
1120         ast_free(std1);
1121         ast_free(std2);
1122         ast_free(tld);
1123         return res;
1124
1125 }
1126
1127 struct complex_task_data {
1128         int task_started;
1129         int task_executed;
1130         int continue_task;
1131         ast_mutex_t lock;
1132         ast_cond_t stall_cond;
1133         ast_cond_t notify_cond;
1134 };
1135
1136 static struct complex_task_data *complex_task_data_alloc(void)
1137 {
1138         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1139
1140         if (!ctd) {
1141                 return NULL;
1142         }
1143         ast_mutex_init(&ctd->lock);
1144         ast_cond_init(&ctd->stall_cond, NULL);
1145         ast_cond_init(&ctd->notify_cond, NULL);
1146         return ctd;
1147 }
1148
1149 static int complex_task(void *data)
1150 {
1151         struct complex_task_data *ctd = data;
1152         SCOPED_MUTEX(lock, &ctd->lock);
1153         /* Notify that we started */
1154         ctd->task_started = 1;
1155         ast_cond_signal(&ctd->notify_cond);
1156         while (!ctd->continue_task) {
1157                 ast_cond_wait(&ctd->stall_cond, lock);
1158         }
1159         /* We got poked. Finish up */
1160         ctd->task_executed = 1;
1161         ast_cond_signal(&ctd->notify_cond);
1162         return 0;
1163 }
1164
1165 static void poke_worker(struct complex_task_data *ctd)
1166 {
1167         SCOPED_MUTEX(lock, &ctd->lock);
1168         ctd->continue_task = 1;
1169         ast_cond_signal(&ctd->stall_cond);
1170 }
1171
1172 static int wait_for_complex_start(struct complex_task_data *ctd)
1173 {
1174         struct timeval start = ast_tvnow();
1175         struct timespec end = {
1176                 .tv_sec = start.tv_sec + 5,
1177                 .tv_nsec = start.tv_usec * 1000
1178         };
1179         SCOPED_MUTEX(lock, &ctd->lock);
1180
1181         while (!ctd->task_started) {
1182                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1183                         break;
1184                 }
1185         }
1186
1187         return ctd->task_started;
1188 }
1189
1190 static int has_complex_started(struct complex_task_data *ctd)
1191 {
1192         struct timeval start = ast_tvnow();
1193         struct timespec end = {
1194                 .tv_sec = start.tv_sec + 1,
1195                 .tv_nsec = start.tv_usec * 1000
1196         };
1197         SCOPED_MUTEX(lock, &ctd->lock);
1198
1199         while (!ctd->task_started) {
1200                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1201                         break;
1202                 }
1203         }
1204
1205         return ctd->task_started;
1206 }
1207
1208 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
1209 {
1210         struct timeval start = ast_tvnow();
1211         struct timespec end = {
1212                 .tv_sec = start.tv_sec + 5,
1213                 .tv_nsec = start.tv_usec * 1000
1214         };
1215         enum ast_test_result_state res = AST_TEST_PASS;
1216         SCOPED_MUTEX(lock, &ctd->lock);
1217
1218         while (!ctd->task_executed) {
1219                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1220                         break;
1221                 }
1222         }
1223
1224         if (!ctd->task_executed) {
1225                 res = AST_TEST_FAIL;
1226         }
1227         return res;
1228 }
1229
1230 AST_TEST_DEFINE(threadpool_task_distribution)
1231 {
1232         struct ast_threadpool *pool = NULL;
1233         struct ast_threadpool_listener *listener = NULL;
1234         struct complex_task_data *ctd1 = NULL;
1235         struct complex_task_data *ctd2 = NULL;
1236         enum ast_test_result_state res = AST_TEST_FAIL;
1237         struct test_listener_data *tld = NULL;
1238         struct ast_threadpool_options options = {
1239                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1240                 .idle_timeout = 0,
1241                 .auto_increment = 0,
1242                 .initial_size = 0,
1243                 .max_size = 0,
1244         };
1245
1246         switch (cmd) {
1247         case TEST_INIT:
1248                 info->name = "task_distribution";
1249                 info->category = "/main/threadpool/";
1250                 info->summary = "Test that tasks are evenly distributed to threads";
1251                 info->description =
1252                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1253                         "a separate thread\n";
1254                 return AST_TEST_NOT_RUN;
1255         case TEST_EXECUTE:
1256                 break;
1257         }
1258
1259         tld = test_alloc();
1260         if (!tld) {
1261                 return AST_TEST_FAIL;
1262         }
1263
1264         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1265         if (!listener) {
1266                 goto end;
1267         }
1268
1269         pool = ast_threadpool_create(info->name, listener, &options);
1270         if (!pool) {
1271                 goto end;
1272         }
1273
1274         ctd1 = complex_task_data_alloc();
1275         ctd2 = complex_task_data_alloc();
1276         if (!ctd1 || !ctd2) {
1277                 goto end;
1278         }
1279
1280         ast_threadpool_push(pool, complex_task, ctd1);
1281         ast_threadpool_push(pool, complex_task, ctd2);
1282
1283         ast_threadpool_set_size(pool, 2);
1284
1285         res = wait_until_thread_state(test, tld, 2, 0);
1286         if (res == AST_TEST_FAIL) {
1287                 goto end;
1288         }
1289
1290         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1291         if (res == AST_TEST_FAIL) {
1292                 goto end;
1293         }
1294
1295         /* The tasks are stalled until we poke them */
1296         poke_worker(ctd1);
1297         poke_worker(ctd2);
1298
1299         res = wait_for_complex_completion(ctd1);
1300         if (res == AST_TEST_FAIL) {
1301                 goto end;
1302         }
1303         res = wait_for_complex_completion(ctd2);
1304         if (res == AST_TEST_FAIL) {
1305                 goto end;
1306         }
1307
1308         res = wait_until_thread_state(test, tld, 0, 2);
1309         if (res == AST_TEST_FAIL) {
1310                 goto end;
1311         }
1312
1313         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1314
1315 end:
1316         ast_threadpool_shutdown(pool);
1317         ao2_cleanup(listener);
1318         ast_free(ctd1);
1319         ast_free(ctd2);
1320         ast_free(tld);
1321         return res;
1322 }
1323
1324 AST_TEST_DEFINE(threadpool_more_destruction)
1325 {
1326         struct ast_threadpool *pool = NULL;
1327         struct ast_threadpool_listener *listener = NULL;
1328         struct complex_task_data *ctd1 = NULL;
1329         struct complex_task_data *ctd2 = NULL;
1330         enum ast_test_result_state res = AST_TEST_FAIL;
1331         struct test_listener_data *tld = NULL;
1332         struct ast_threadpool_options options = {
1333                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1334                 .idle_timeout = 0,
1335                 .auto_increment = 0,
1336                 .initial_size = 0,
1337                 .max_size = 0,
1338         };
1339
1340         switch (cmd) {
1341         case TEST_INIT:
1342                 info->name = "more_destruction";
1343                 info->category = "/main/threadpool/";
1344                 info->summary = "Test that threads are destroyed as expected";
1345                 info->description =
1346                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1347                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1348                         "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1349                         "and ensure that both tasks complete.\n";
1350                 return AST_TEST_NOT_RUN;
1351         case TEST_EXECUTE:
1352                 break;
1353         }
1354
1355         tld = test_alloc();
1356         if (!tld) {
1357                 return AST_TEST_FAIL;
1358         }
1359
1360         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1361         if (!listener) {
1362                 goto end;
1363         }
1364
1365         pool = ast_threadpool_create(info->name, listener, &options);
1366         if (!pool) {
1367                 goto end;
1368         }
1369
1370         ctd1 = complex_task_data_alloc();
1371         ctd2 = complex_task_data_alloc();
1372         if (!ctd1 || !ctd2) {
1373                 goto end;
1374         }
1375
1376         ast_threadpool_push(pool, complex_task, ctd1);
1377         ast_threadpool_push(pool, complex_task, ctd2);
1378
1379         ast_threadpool_set_size(pool, 4);
1380
1381         res = wait_until_thread_state(test, tld, 2, 2);
1382         if (res == AST_TEST_FAIL) {
1383                 goto end;
1384         }
1385
1386         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1387         if (res == AST_TEST_FAIL) {
1388                 goto end;
1389         }
1390
1391         ast_threadpool_set_size(pool, 1);
1392
1393         /* Shrinking the threadpool should kill off the two idle threads
1394          * and one of the active threads.
1395          */
1396         res = wait_until_thread_state(test, tld, 1, 0);
1397         if (res == AST_TEST_FAIL) {
1398                 goto end;
1399         }
1400
1401         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1402         if (res == AST_TEST_FAIL) {
1403                 goto end;
1404         }
1405
1406         /* The tasks are stalled until we poke them */
1407         poke_worker(ctd1);
1408         poke_worker(ctd2);
1409
1410         res = wait_for_complex_completion(ctd1);
1411         if (res == AST_TEST_FAIL) {
1412                 goto end;
1413         }
1414         res = wait_for_complex_completion(ctd2);
1415         if (res == AST_TEST_FAIL) {
1416                 goto end;
1417         }
1418
1419         res = wait_until_thread_state(test, tld, 0, 1);
1420         if (res == AST_TEST_FAIL) {
1421                 goto end;
1422         }
1423
1424         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1425
1426 end:
1427         ast_threadpool_shutdown(pool);
1428         ao2_cleanup(listener);
1429         ast_free(ctd1);
1430         ast_free(ctd2);
1431         ast_free(tld);
1432         return res;
1433 }
1434
1435 AST_TEST_DEFINE(threadpool_serializer)
1436 {
1437         int started = 0;
1438         int finished = 0;
1439         enum ast_test_result_state res = AST_TEST_FAIL;
1440         struct ast_threadpool *pool = NULL;
1441         struct ast_taskprocessor *uut = NULL;
1442         struct complex_task_data *data1 = NULL;
1443         struct complex_task_data *data2 = NULL;
1444         struct complex_task_data *data3 = NULL;
1445         struct ast_threadpool_options options = {
1446                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1447                 .idle_timeout = 0,
1448                 .auto_increment = 0,
1449                 .initial_size = 2,
1450                 .max_size = 0,
1451         };
1452
1453         switch (cmd) {
1454         case TEST_INIT:
1455                 info->name = "threadpool_serializer";
1456                 info->category = "/main/threadpool/";
1457                 info->summary = "Test that serializers";
1458                 info->description =
1459                         "Ensures that tasks enqueued to a serialize execute in sequence.\n";
1460                 return AST_TEST_NOT_RUN;
1461         case TEST_EXECUTE:
1462                 break;
1463         }
1464
1465         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1466         if (!pool) {
1467                 ast_test_status_update(test, "Could not create threadpool\n");
1468                 goto end;
1469         }
1470         uut = ast_threadpool_serializer("ser1", pool);
1471         data1 = complex_task_data_alloc();
1472         data2 = complex_task_data_alloc();
1473         data3 = complex_task_data_alloc();
1474         if (!uut || !data1 || !data2 || !data3) {
1475                 ast_test_status_update(test, "Allocation failed\n");
1476                 goto end;
1477         }
1478
1479         /* This should start right away */
1480         if (ast_taskprocessor_push(uut, complex_task, data1)) {
1481                 ast_test_status_update(test, "Failed to enqueue data1\n");
1482                 goto end;
1483         }
1484         started = wait_for_complex_start(data1);
1485         if (!started) {
1486                 ast_test_status_update(test, "Failed to start data1\n");
1487                 goto end;
1488         }
1489
1490         /* This should not start until data 1 is complete */
1491         if (ast_taskprocessor_push(uut, complex_task, data2)) {
1492                 ast_test_status_update(test, "Failed to enqueue data2\n");
1493                 goto end;
1494         }
1495         started = has_complex_started(data2);
1496         if (started) {
1497                 ast_test_status_update(test, "data2 started out of order\n");
1498                 goto end;
1499         }
1500
1501         /* But the free thread in the pool can still run */
1502         if (ast_threadpool_push(pool, complex_task, data3)) {
1503                 ast_test_status_update(test, "Failed to enqueue data3\n");
1504         }
1505         started = wait_for_complex_start(data3);
1506         if (!started) {
1507                 ast_test_status_update(test, "Failed to start data3\n");
1508                 goto end;
1509         }
1510
1511         /* Finishing data1 should allow data2 to start */
1512         poke_worker(data1);
1513         finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1514         if (!finished) {
1515                 ast_test_status_update(test, "data1 couldn't finish\n");
1516                 goto end;
1517         }
1518         started = wait_for_complex_start(data2);
1519         if (!started) {
1520                 ast_test_status_update(test, "Failed to start data2\n");
1521                 goto end;
1522         }
1523
1524         /* Finish up */
1525         poke_worker(data2);
1526         finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1527         if (!finished) {
1528                 ast_test_status_update(test, "data2 couldn't finish\n");
1529                 goto end;
1530         }
1531         poke_worker(data3);
1532         finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1533         if (!finished) {
1534                 ast_test_status_update(test, "data3 couldn't finish\n");
1535                 goto end;
1536         }
1537
1538         res = AST_TEST_PASS;
1539
1540 end:
1541         poke_worker(data1);
1542         poke_worker(data2);
1543         poke_worker(data3);
1544         ast_taskprocessor_unreference(uut);
1545         ast_threadpool_shutdown(pool);
1546         ast_free(data1);
1547         ast_free(data2);
1548         ast_free(data3);
1549         return res;
1550 }
1551
1552 AST_TEST_DEFINE(threadpool_serializer_dupe)
1553 {
1554         enum ast_test_result_state res = AST_TEST_FAIL;
1555         struct ast_threadpool *pool = NULL;
1556         struct ast_taskprocessor *uut = NULL;
1557         struct ast_taskprocessor *there_can_be_only_one = NULL;
1558         struct ast_threadpool_options options = {
1559                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1560                 .idle_timeout = 0,
1561                 .auto_increment = 0,
1562                 .initial_size = 2,
1563                 .max_size = 0,
1564         };
1565
1566         switch (cmd) {
1567         case TEST_INIT:
1568                 info->name = "threadpool_serializer_dupe";
1569                 info->category = "/main/threadpool/";
1570                 info->summary = "Test that serializers are uniquely named";
1571                 info->description =
1572                         "Creating two serializers with the same name should\n"
1573                         "result in error.\n";
1574                 return AST_TEST_NOT_RUN;
1575         case TEST_EXECUTE:
1576                 break;
1577         }
1578
1579         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1580         if (!pool) {
1581                 ast_test_status_update(test, "Could not create threadpool\n");
1582                 goto end;
1583         }
1584
1585         uut = ast_threadpool_serializer("highlander", pool);
1586         if (!uut) {
1587                 ast_test_status_update(test, "Allocation failed\n");
1588                 goto end;
1589         }
1590
1591         there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1592         if (there_can_be_only_one) {
1593                 ast_taskprocessor_unreference(there_can_be_only_one);
1594                 ast_test_status_update(test, "Duplicate name error\n");
1595                 goto end;
1596         }
1597
1598         res = AST_TEST_PASS;
1599
1600 end:
1601         ast_taskprocessor_unreference(uut);
1602         ast_threadpool_shutdown(pool);
1603         return res;
1604 }
1605
1606 static int unload_module(void)
1607 {
1608         ast_test_unregister(threadpool_push);
1609         ast_test_unregister(threadpool_initial_threads);
1610         ast_test_unregister(threadpool_thread_creation);
1611         ast_test_unregister(threadpool_thread_destruction);
1612         ast_test_unregister(threadpool_thread_timeout);
1613         ast_test_unregister(threadpool_one_task_one_thread);
1614         ast_test_unregister(threadpool_one_thread_one_task);
1615         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1616         ast_test_unregister(threadpool_auto_increment);
1617         ast_test_unregister(threadpool_max_size);
1618         ast_test_unregister(threadpool_reactivation);
1619         ast_test_unregister(threadpool_task_distribution);
1620         ast_test_unregister(threadpool_more_destruction);
1621         ast_test_unregister(threadpool_serializer);
1622         ast_test_unregister(threadpool_serializer_dupe);
1623         return 0;
1624 }
1625
1626 static int load_module(void)
1627 {
1628         ast_test_register(threadpool_push);
1629         ast_test_register(threadpool_initial_threads);
1630         ast_test_register(threadpool_thread_creation);
1631         ast_test_register(threadpool_thread_destruction);
1632         ast_test_register(threadpool_thread_timeout);
1633         ast_test_register(threadpool_one_task_one_thread);
1634         ast_test_register(threadpool_one_thread_one_task);
1635         ast_test_register(threadpool_one_thread_multiple_tasks);
1636         ast_test_register(threadpool_auto_increment);
1637         ast_test_register(threadpool_max_size);
1638         ast_test_register(threadpool_reactivation);
1639         ast_test_register(threadpool_task_distribution);
1640         ast_test_register(threadpool_more_destruction);
1641         ast_test_register(threadpool_serializer);
1642         ast_test_register(threadpool_serializer_dupe);
1643         return AST_MODULE_LOAD_SUCCESS;
1644 }
1645
1646 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");