Merge "Revert "app_voicemail: Remove need to subscribe to stasis""
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/lock.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/module.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/test.h"
40 #include "asterisk/threadpool.h"
41
42 struct test_listener_data {
43         int num_active;
44         int num_idle;
45         int task_pushed;
46         int num_tasks;
47         int empty_notice;
48         int was_empty;
49         ast_mutex_t lock;
50         ast_cond_t cond;
51 };
52
53 static struct test_listener_data *test_alloc(void)
54 {
55         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56         if (!tld) {
57                 return NULL;
58         }
59         ast_mutex_init(&tld->lock);
60         ast_cond_init(&tld->cond, NULL);
61         return tld;
62 }
63
64 static void test_state_changed(struct ast_threadpool *pool,
65                 struct ast_threadpool_listener *listener,
66                 int active_threads,
67                 int idle_threads)
68 {
69         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
70         SCOPED_MUTEX(lock, &tld->lock);
71         tld->num_active = active_threads;
72         tld->num_idle = idle_threads;
73         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74         ast_cond_signal(&tld->cond);
75 }
76
77 static void test_task_pushed(struct ast_threadpool *pool,
78                 struct ast_threadpool_listener *listener,
79                 int was_empty)
80 {
81         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
82         SCOPED_MUTEX(lock, &tld->lock);
83         tld->task_pushed = 1;
84         ++tld->num_tasks;
85         tld->was_empty = was_empty;
86         ast_cond_signal(&tld->cond);
87 }
88
89 static void test_emptied(struct ast_threadpool *pool,
90                 struct ast_threadpool_listener *listener)
91 {
92         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
93         SCOPED_MUTEX(lock, &tld->lock);
94         tld->empty_notice = 1;
95         ast_cond_signal(&tld->cond);
96 }
97
98 static void test_shutdown(struct ast_threadpool_listener *listener)
99 {
100         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
101         ast_cond_destroy(&tld->cond);
102         ast_mutex_destroy(&tld->lock);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .state_changed = test_state_changed,
107         .task_pushed = test_task_pushed,
108         .emptied = test_emptied,
109         .shutdown = test_shutdown,
110 };
111
112 struct simple_task_data {
113         int task_executed;
114         ast_mutex_t lock;
115         ast_cond_t cond;
116 };
117
118 static struct simple_task_data *simple_task_data_alloc(void)
119 {
120         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121
122         if (!std) {
123                 return NULL;
124         }
125         ast_mutex_init(&std->lock);
126         ast_cond_init(&std->cond, NULL);
127         return std;
128 }
129
130 static void simple_task_data_free(struct simple_task_data *std)
131 {
132         if (!std) {
133                 return;
134         }
135
136         ast_mutex_destroy(&std->lock);
137         ast_cond_destroy(&std->cond);
138
139         ast_free(std);
140 }
141
142 static int simple_task(void *data)
143 {
144         struct simple_task_data *std = data;
145         SCOPED_MUTEX(lock, &std->lock);
146         std->task_executed = 1;
147         ast_cond_signal(&std->cond);
148         return 0;
149 }
150
151 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
152 {
153         struct timeval start = ast_tvnow();
154         struct timespec end = {
155                 .tv_sec = start.tv_sec + 5,
156                 .tv_nsec = start.tv_usec * 1000
157         };
158         enum ast_test_result_state res = AST_TEST_PASS;
159         SCOPED_MUTEX(lock, &tld->lock);
160
161         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
162                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
163                         break;
164                 }
165         }
166
167         if (tld->num_active != num_active && tld->num_idle != num_idle) {
168                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
169                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
170                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
171                 res = AST_TEST_FAIL;
172         }
173
174         return res;
175 }
176
177 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
178 {
179         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
180         struct timeval start = ast_tvnow();
181         struct timespec end = {
182                 .tv_sec = start.tv_sec + 5,
183                 .tv_nsec = start.tv_usec * 1000
184         };
185         SCOPED_MUTEX(lock, &tld->lock);
186
187         while (!tld->task_pushed) {
188                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
189                         break;
190                 }
191         }
192 }
193
194 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
195 {
196         struct timeval start = ast_tvnow();
197         struct timespec end = {
198                 .tv_sec = start.tv_sec + 5,
199                 .tv_nsec = start.tv_usec * 1000
200         };
201         enum ast_test_result_state res = AST_TEST_PASS;
202         SCOPED_MUTEX(lock, &std->lock);
203
204         while (!std->task_executed) {
205                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
206                         break;
207                 }
208         }
209
210         if (!std->task_executed) {
211                 ast_test_status_update(test, "Task execution did not occur\n");
212                 res = AST_TEST_FAIL;
213         }
214         return res;
215 }
216
217 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
218 {
219         struct timeval start = ast_tvnow();
220         struct timespec end = {
221                 .tv_sec = start.tv_sec + 5,
222                 .tv_nsec = start.tv_usec * 1000
223         };
224         enum ast_test_result_state res = AST_TEST_PASS;
225         SCOPED_MUTEX(lock, &tld->lock);
226
227         while (!tld->empty_notice) {
228                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
229                         break;
230                 }
231         }
232
233         if (!tld->empty_notice) {
234                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
235                 res = AST_TEST_FAIL;
236         }
237
238         return res;
239 }
240
241 static enum ast_test_result_state listener_check(
242                 struct ast_test *test,
243                 struct ast_threadpool_listener *listener,
244                 int task_pushed,
245                 int was_empty,
246                 int num_tasks,
247                 int num_active,
248                 int num_idle,
249                 int empty_notice)
250 {
251         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
252         enum ast_test_result_state res = AST_TEST_PASS;
253
254         if (tld->task_pushed != task_pushed) {
255                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
256                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
257                 res = AST_TEST_FAIL;
258         }
259         if (tld->was_empty != was_empty) {
260                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
261                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
262                 res = AST_TEST_FAIL;
263         }
264         if (tld->num_tasks!= num_tasks) {
265                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
266                                 num_tasks, tld->num_tasks);
267                 res = AST_TEST_FAIL;
268         }
269         if (tld->num_active != num_active) {
270                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
271                                 num_active, tld->num_active);
272                 res = AST_TEST_FAIL;
273         }
274         if (tld->num_idle != num_idle) {
275                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
276                                 num_idle, tld->num_idle);
277                 res = AST_TEST_FAIL;
278         }
279         if (tld->empty_notice != empty_notice) {
280                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
281                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
282                 res = AST_TEST_FAIL;
283         }
284
285         return res;
286 }
287
288 AST_TEST_DEFINE(threadpool_push)
289 {
290         struct ast_threadpool *pool = NULL;
291         struct ast_threadpool_listener *listener = NULL;
292         struct simple_task_data *std = NULL;
293         struct test_listener_data *tld = NULL;
294         enum ast_test_result_state res = AST_TEST_FAIL;
295         struct ast_threadpool_options options = {
296                 .version = AST_THREADPOOL_OPTIONS_VERSION,
297                 .idle_timeout = 0,
298                 .auto_increment = 0,
299                 .initial_size = 0,
300                 .max_size = 0,
301         };
302
303         switch (cmd) {
304         case TEST_INIT:
305                 info->name = "push";
306                 info->category = "/main/threadpool/";
307                 info->summary = "Test task";
308                 info->description =
309                         "Basic threadpool test";
310                 return AST_TEST_NOT_RUN;
311         case TEST_EXECUTE:
312                 break;
313         }
314         tld = test_alloc();
315         if (!tld) {
316                 return AST_TEST_FAIL;
317         }
318
319         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
320         if (!listener) {
321                 goto end;
322         }
323
324         pool = ast_threadpool_create(info->name, listener, &options);
325         if (!pool) {
326                 goto end;
327         }
328
329         std = simple_task_data_alloc();
330         if (!std) {
331                 goto end;
332         }
333
334         if (ast_threadpool_push(pool, simple_task, std)) {
335                 goto end;
336         }
337
338         wait_for_task_pushed(listener);
339
340         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
341
342 end:
343         ast_threadpool_shutdown(pool);
344         ao2_cleanup(listener);
345         simple_task_data_free(std);
346         ast_free(tld);
347         return res;
348 }
349
350 AST_TEST_DEFINE(threadpool_initial_threads)
351 {
352         struct ast_threadpool *pool = NULL;
353         struct ast_threadpool_listener *listener = NULL;
354         enum ast_test_result_state res = AST_TEST_FAIL;
355         struct test_listener_data *tld = NULL;
356         struct ast_threadpool_options options = {
357                 .version = AST_THREADPOOL_OPTIONS_VERSION,
358                 .idle_timeout = 0,
359                 .auto_increment = 0,
360                 .initial_size = 3,
361                 .max_size = 0,
362         };
363
364         switch (cmd) {
365         case TEST_INIT:
366                 info->name = "initial_threads";
367                 info->category = "/main/threadpool/";
368                 info->summary = "Test threadpool initialization state";
369                 info->description =
370                         "Ensure that a threadpool created with a specific size contains the\n"
371                         "proper number of idle threads.";
372                 return AST_TEST_NOT_RUN;
373         case TEST_EXECUTE:
374                 break;
375         }
376
377         tld = test_alloc();
378         if (!tld) {
379                 return AST_TEST_FAIL;
380         }
381
382         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
383         if (!listener) {
384                 goto end;
385         }
386
387         pool = ast_threadpool_create(info->name, listener, &options);
388         if (!pool) {
389                 goto end;
390         }
391
392         res = wait_until_thread_state(test, tld, 0, 3);
393
394 end:
395         ast_threadpool_shutdown(pool);
396         ao2_cleanup(listener);
397         ast_free(tld);
398         return res;
399 }
400
401
402 AST_TEST_DEFINE(threadpool_thread_creation)
403 {
404         struct ast_threadpool *pool = NULL;
405         struct ast_threadpool_listener *listener = NULL;
406         enum ast_test_result_state res = AST_TEST_FAIL;
407         struct test_listener_data *tld = NULL;
408         struct ast_threadpool_options options = {
409                 .version = AST_THREADPOOL_OPTIONS_VERSION,
410                 .idle_timeout = 0,
411                 .auto_increment = 0,
412                 .initial_size = 0,
413                 .max_size = 0,
414         };
415
416         switch (cmd) {
417         case TEST_INIT:
418                 info->name = "thread_creation";
419                 info->category = "/main/threadpool/";
420                 info->summary = "Test threadpool thread creation";
421                 info->description =
422                         "Ensure that threads can be added to a threadpool";
423                 return AST_TEST_NOT_RUN;
424         case TEST_EXECUTE:
425                 break;
426         }
427
428         tld = test_alloc();
429         if (!tld) {
430                 return AST_TEST_FAIL;
431         }
432
433         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
434         if (!listener) {
435                 goto end;
436         }
437
438         pool = ast_threadpool_create(info->name, listener, &options);
439         if (!pool) {
440                 goto end;
441         }
442
443         /* Now let's create a thread. It should start active, then go
444          * idle immediately
445          */
446         ast_threadpool_set_size(pool, 1);
447
448         res = wait_until_thread_state(test, tld, 0, 1);
449
450 end:
451         ast_threadpool_shutdown(pool);
452         ao2_cleanup(listener);
453         ast_free(tld);
454         return res;
455 }
456
457 AST_TEST_DEFINE(threadpool_thread_destruction)
458 {
459         struct ast_threadpool *pool = NULL;
460         struct ast_threadpool_listener *listener = NULL;
461         enum ast_test_result_state res = AST_TEST_FAIL;
462         struct test_listener_data *tld = NULL;
463         struct ast_threadpool_options options = {
464                 .version = AST_THREADPOOL_OPTIONS_VERSION,
465                 .idle_timeout = 0,
466                 .auto_increment = 0,
467                 .initial_size = 0,
468                 .max_size = 0,
469         };
470
471         switch (cmd) {
472         case TEST_INIT:
473                 info->name = "thread_destruction";
474                 info->category = "/main/threadpool/";
475                 info->summary = "Test threadpool thread destruction";
476                 info->description =
477                         "Ensure that threads are properly destroyed in a threadpool";
478                 return AST_TEST_NOT_RUN;
479         case TEST_EXECUTE:
480                 break;
481         }
482
483         tld = test_alloc();
484         if (!tld) {
485                 return AST_TEST_FAIL;
486         }
487
488         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
489         if (!listener) {
490                 goto end;
491         }
492
493         pool = ast_threadpool_create(info->name, listener, &options);
494         if (!pool) {
495                 goto end;
496         }
497
498         ast_threadpool_set_size(pool, 3);
499
500         res = wait_until_thread_state(test, tld, 0, 3);
501         if (res == AST_TEST_FAIL) {
502                 goto end;
503         }
504
505         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
506         if (res == AST_TEST_FAIL) {
507                 goto end;
508         }
509
510         ast_threadpool_set_size(pool, 2);
511
512         res = wait_until_thread_state(test, tld, 0, 2);
513
514 end:
515         ast_threadpool_shutdown(pool);
516         ao2_cleanup(listener);
517         ast_free(tld);
518         return res;
519 }
520
521 AST_TEST_DEFINE(threadpool_thread_timeout)
522 {
523         struct ast_threadpool *pool = NULL;
524         struct ast_threadpool_listener *listener = NULL;
525         enum ast_test_result_state res = AST_TEST_FAIL;
526         struct test_listener_data *tld = NULL;
527         struct ast_threadpool_options options = {
528                 .version = AST_THREADPOOL_OPTIONS_VERSION,
529                 .idle_timeout = 2,
530                 .auto_increment = 0,
531                 .initial_size = 0,
532                 .max_size = 0,
533         };
534
535         switch (cmd) {
536         case TEST_INIT:
537                 info->name = "thread_timeout";
538                 info->category = "/main/threadpool/";
539                 info->summary = "Test threadpool thread timeout";
540                 info->description =
541                         "Ensure that a thread with a two second timeout dies as expected.";
542                 return AST_TEST_NOT_RUN;
543         case TEST_EXECUTE:
544                 break;
545         }
546
547         tld = test_alloc();
548         if (!tld) {
549                 return AST_TEST_FAIL;
550         }
551
552         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
553         if (!listener) {
554                 goto end;
555         }
556
557         pool = ast_threadpool_create(info->name, listener, &options);
558         if (!pool) {
559                 goto end;
560         }
561
562         ast_threadpool_set_size(pool, 1);
563
564         res = wait_until_thread_state(test, tld, 0, 1);
565         if (res == AST_TEST_FAIL) {
566                 goto end;
567         }
568
569         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
570         if (res == AST_TEST_FAIL) {
571                 goto end;
572         }
573
574         res = wait_until_thread_state(test, tld, 0, 0);
575         if (res == AST_TEST_FAIL) {
576                 goto end;
577         }
578
579         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
580
581 end:
582         ast_threadpool_shutdown(pool);
583         ao2_cleanup(listener);
584         ast_free(tld);
585         return res;
586 }
587
588 AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
589 {
590         struct ast_threadpool *pool = NULL;
591         struct ast_threadpool_listener *listener = NULL;
592         enum ast_test_result_state res = AST_TEST_FAIL;
593         struct test_listener_data *tld = NULL;
594         struct ast_threadpool_options options = {
595                 .version = AST_THREADPOOL_OPTIONS_VERSION,
596                 .idle_timeout = 1,
597                 .auto_increment = 1,
598                 .initial_size = 0,
599                 .max_size = 1,
600         };
601         int iteration;
602
603         switch (cmd) {
604         case TEST_INIT:
605                 info->name = "thread_timeout_thrash";
606                 info->category = "/main/threadpool/";
607                 info->summary = "Thrash threadpool thread timeout";
608                 info->description =
609                         "Repeatedly queue a task when a threadpool thread should timeout.";
610                 return AST_TEST_NOT_RUN;
611         case TEST_EXECUTE:
612                 break;
613         }
614
615         tld = test_alloc();
616         if (!tld) {
617                 return AST_TEST_FAIL;
618         }
619
620         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
621         if (!listener) {
622                 goto end;
623         }
624
625         pool = ast_threadpool_create(info->name, listener, &options);
626         if (!pool) {
627                 goto end;
628         }
629
630         ast_threadpool_set_size(pool, 1);
631
632         for (iteration = 0; iteration < 30; ++iteration) {
633                 struct simple_task_data *std = NULL;
634                 struct timeval start = ast_tvnow();
635                 struct timespec end = {
636                         .tv_sec = start.tv_sec + options.idle_timeout,
637                         .tv_nsec = start.tv_usec * 1000
638                 };
639
640                 std = simple_task_data_alloc();
641                 if (!std) {
642                         goto end;
643                 }
644
645                 /* Wait until the threadpool thread should timeout due to being idle */
646                 ast_mutex_lock(&tld->lock);
647                 while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
648                         /* This purposely left empty as we want to loop waiting for a time out */
649                 }
650                 ast_mutex_unlock(&tld->lock);
651
652                 if (ast_threadpool_push(pool, simple_task, std)) {
653                         res = AST_TEST_FAIL;
654                 } else {
655                         res = wait_for_completion(test, std);
656                 }
657
658                 simple_task_data_free(std);
659
660                 if (res == AST_TEST_FAIL) {
661                         goto end;
662                 }
663         }
664
665         res = wait_until_thread_state(test, tld, 0, 0);
666         if (res == AST_TEST_FAIL) {
667                 goto end;
668         }
669
670         res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
671
672 end:
673         ast_threadpool_shutdown(pool);
674         ao2_cleanup(listener);
675         ast_free(tld);
676         return res;
677 }
678
679 AST_TEST_DEFINE(threadpool_one_task_one_thread)
680 {
681         struct ast_threadpool *pool = NULL;
682         struct ast_threadpool_listener *listener = NULL;
683         struct simple_task_data *std = NULL;
684         enum ast_test_result_state res = AST_TEST_FAIL;
685         struct test_listener_data *tld = NULL;
686         struct ast_threadpool_options options = {
687                 .version = AST_THREADPOOL_OPTIONS_VERSION,
688                 .idle_timeout = 0,
689                 .auto_increment = 0,
690                 .initial_size = 0,
691                 .max_size = 0,
692         };
693
694         switch (cmd) {
695         case TEST_INIT:
696                 info->name = "one_task_one_thread";
697                 info->category = "/main/threadpool/";
698                 info->summary = "Test a single task with a single thread";
699                 info->description =
700                         "Push a task into an empty threadpool, then add a thread to the pool.";
701                 return AST_TEST_NOT_RUN;
702         case TEST_EXECUTE:
703                 break;
704         }
705
706         tld = test_alloc();
707         if (!tld) {
708                 return AST_TEST_FAIL;
709         }
710
711         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
712         if (!listener) {
713                 goto end;
714         }
715
716         pool = ast_threadpool_create(info->name, listener, &options);
717         if (!pool) {
718                 goto end;
719         }
720
721         std = simple_task_data_alloc();
722         if (!std) {
723                 goto end;
724         }
725
726         if (ast_threadpool_push(pool, simple_task, std)) {
727                 goto end;
728         }
729
730         ast_threadpool_set_size(pool, 1);
731
732         /* Threads added to the pool are active when they start,
733          * so the newly-created thread should immediately execute
734          * the waiting task.
735          */
736         res = wait_for_completion(test, std);
737         if (res == AST_TEST_FAIL) {
738                 goto end;
739         }
740
741         res = wait_for_empty_notice(test, tld);
742         if (res == AST_TEST_FAIL) {
743                 goto end;
744         }
745
746         /* After completing the task, the thread should go idle */
747         res = wait_until_thread_state(test, tld, 0, 1);
748         if (res == AST_TEST_FAIL) {
749                 goto end;
750         }
751
752         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
753
754 end:
755         ast_threadpool_shutdown(pool);
756         ao2_cleanup(listener);
757         simple_task_data_free(std);
758         ast_free(tld);
759         return res;
760
761 }
762
763 AST_TEST_DEFINE(threadpool_one_thread_one_task)
764 {
765         struct ast_threadpool *pool = NULL;
766         struct ast_threadpool_listener *listener = NULL;
767         struct simple_task_data *std = NULL;
768         enum ast_test_result_state res = AST_TEST_FAIL;
769         struct test_listener_data *tld = NULL;
770         struct ast_threadpool_options options = {
771                 .version = AST_THREADPOOL_OPTIONS_VERSION,
772                 .idle_timeout = 0,
773                 .auto_increment = 0,
774                 .initial_size = 0,
775                 .max_size = 0,
776         };
777
778         switch (cmd) {
779         case TEST_INIT:
780                 info->name = "one_thread_one_task";
781                 info->category = "/main/threadpool/";
782                 info->summary = "Test a single thread with a single task";
783                 info->description =
784                         "Add a thread to the pool and then push a task to it.";
785                 return AST_TEST_NOT_RUN;
786         case TEST_EXECUTE:
787                 break;
788         }
789
790         tld = test_alloc();
791         if (!tld) {
792                 return AST_TEST_FAIL;
793         }
794
795         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
796         if (!listener) {
797                 goto end;
798         }
799
800         pool = ast_threadpool_create(info->name, listener, &options);
801         if (!pool) {
802                 goto end;
803         }
804
805         std = simple_task_data_alloc();
806         if (!std) {
807                 goto end;
808         }
809
810         ast_threadpool_set_size(pool, 1);
811
812         res = wait_until_thread_state(test, tld, 0, 1);
813         if (res == AST_TEST_FAIL) {
814                 goto end;
815         }
816
817         if (ast_threadpool_push(pool, simple_task, std)) {
818                 res = AST_TEST_FAIL;
819                 goto end;
820         }
821
822         res = wait_for_completion(test, std);
823         if (res == AST_TEST_FAIL) {
824                 goto end;
825         }
826
827         res = wait_for_empty_notice(test, tld);
828         if (res == AST_TEST_FAIL) {
829                 goto end;
830         }
831
832         /* After completing the task, the thread should go idle */
833         res = wait_until_thread_state(test, tld, 0, 1);
834         if (res == AST_TEST_FAIL) {
835                 goto end;
836         }
837
838         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
839
840 end:
841         ast_threadpool_shutdown(pool);
842         ao2_cleanup(listener);
843         simple_task_data_free(std);
844         ast_free(tld);
845         return res;
846 }
847
848 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
849 {
850         struct ast_threadpool *pool = NULL;
851         struct ast_threadpool_listener *listener = NULL;
852         struct simple_task_data *std1 = NULL;
853         struct simple_task_data *std2 = NULL;
854         struct simple_task_data *std3 = NULL;
855         enum ast_test_result_state res = AST_TEST_FAIL;
856         struct test_listener_data *tld = NULL;
857         struct ast_threadpool_options options = {
858                 .version = AST_THREADPOOL_OPTIONS_VERSION,
859                 .idle_timeout = 0,
860                 .auto_increment = 0,
861                 .initial_size = 0,
862                 .max_size = 0,
863         };
864
865         switch (cmd) {
866         case TEST_INIT:
867                 info->name = "one_thread_multiple_tasks";
868                 info->category = "/main/threadpool/";
869                 info->summary = "Test a single thread with multiple tasks";
870                 info->description =
871                         "Add a thread to the pool and then push three tasks to it.";
872                 return AST_TEST_NOT_RUN;
873         case TEST_EXECUTE:
874                 break;
875         }
876
877         tld = test_alloc();
878         if (!tld) {
879                 return AST_TEST_FAIL;
880         }
881
882         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
883         if (!listener) {
884                 goto end;
885         }
886
887         pool = ast_threadpool_create(info->name, listener, &options);
888         if (!pool) {
889                 goto end;
890         }
891
892         std1 = simple_task_data_alloc();
893         std2 = simple_task_data_alloc();
894         std3 = simple_task_data_alloc();
895         if (!std1 || !std2 || !std3) {
896                 goto end;
897         }
898
899         ast_threadpool_set_size(pool, 1);
900
901         res = wait_until_thread_state(test, tld, 0, 1);
902         if (res == AST_TEST_FAIL) {
903                 goto end;
904         }
905
906         res = AST_TEST_FAIL;
907         if (ast_threadpool_push(pool, simple_task, std1)) {
908                 goto end;
909         }
910
911         if (ast_threadpool_push(pool, simple_task, std2)) {
912                 goto end;
913         }
914
915         if (ast_threadpool_push(pool, simple_task, std3)) {
916                 goto end;
917         }
918
919         res = wait_for_completion(test, std1);
920         if (res == AST_TEST_FAIL) {
921                 goto end;
922         }
923         res = wait_for_completion(test, std2);
924         if (res == AST_TEST_FAIL) {
925                 goto end;
926         }
927         res = wait_for_completion(test, std3);
928         if (res == AST_TEST_FAIL) {
929                 goto end;
930         }
931
932         res = wait_for_empty_notice(test, tld);
933         if (res == AST_TEST_FAIL) {
934                 goto end;
935         }
936
937         res = wait_until_thread_state(test, tld, 0, 1);
938         if (res == AST_TEST_FAIL) {
939                 goto end;
940         }
941
942         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
943
944 end:
945         ast_threadpool_shutdown(pool);
946         ao2_cleanup(listener);
947         simple_task_data_free(std1);
948         simple_task_data_free(std2);
949         simple_task_data_free(std3);
950         ast_free(tld);
951         return res;
952 }
953
954 static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test,
955                 struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
956 {
957         enum ast_test_result_state res = AST_TEST_PASS;
958         struct timeval start;
959         struct timespec end;
960
961         res = wait_until_thread_state(test, tld, num_active, num_idle);
962         if (res == AST_TEST_FAIL) {
963                 return res;
964         }
965
966         start = ast_tvnow();
967         end.tv_sec = start.tv_sec + 5;
968         end.tv_nsec = start.tv_usec * 1000;
969
970         ast_mutex_lock(&tld->lock);
971
972         while (tld->num_tasks != num_tasks) {
973                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
974                         break;
975                 }
976         }
977
978         if (tld->num_tasks != num_tasks) {
979                 ast_test_status_update(test, "Number of tasks pushed %d does not match expected %d\n",
980                                 tld->num_tasks, num_tasks);
981                 res = AST_TEST_FAIL;
982         }
983
984         ast_mutex_unlock(&tld->lock);
985
986         return res;
987 }
988
989 AST_TEST_DEFINE(threadpool_auto_increment)
990 {
991         struct ast_threadpool *pool = NULL;
992         struct ast_threadpool_listener *listener = NULL;
993         struct simple_task_data *std1 = NULL;
994         struct simple_task_data *std2 = NULL;
995         struct simple_task_data *std3 = NULL;
996         struct simple_task_data *std4 = NULL;
997         enum ast_test_result_state res = AST_TEST_FAIL;
998         struct test_listener_data *tld = NULL;
999         struct ast_threadpool_options options = {
1000                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1001                 .idle_timeout = 0,
1002                 .auto_increment = 3,
1003                 .initial_size = 0,
1004                 .max_size = 0,
1005         };
1006
1007         switch (cmd) {
1008         case TEST_INIT:
1009                 info->name = "auto_increment";
1010                 info->category = "/main/threadpool/";
1011                 info->summary = "Test that the threadpool grows as tasks are added";
1012                 info->description =
1013                         "Create an empty threadpool and push a task to it. Once the task is\n"
1014                         "pushed, the threadpool should add three threads and be able to\n"
1015                         "handle the task. The threads should then go idle";
1016                 return AST_TEST_NOT_RUN;
1017         case TEST_EXECUTE:
1018                 break;
1019         }
1020
1021         tld = test_alloc();
1022         if (!tld) {
1023                 return AST_TEST_FAIL;
1024         }
1025
1026         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1027         if (!listener) {
1028                 goto end;
1029         }
1030
1031         pool = ast_threadpool_create(info->name, listener, &options);
1032         if (!pool) {
1033                 goto end;
1034         }
1035
1036         std1 = simple_task_data_alloc();
1037         std2 = simple_task_data_alloc();
1038         std3 = simple_task_data_alloc();
1039         std4 = simple_task_data_alloc();
1040         if (!std1 || !std2 || !std3 || !std4) {
1041                 goto end;
1042         }
1043
1044         if (ast_threadpool_push(pool, simple_task, std1)) {
1045                 goto end;
1046         }
1047
1048         /* Pushing the task should result in the threadpool growing
1049          * by three threads. This will allow the task to actually execute
1050          */
1051         res = wait_for_completion(test, std1);
1052         if (res == AST_TEST_FAIL) {
1053                 goto end;
1054         }
1055
1056         res = wait_for_empty_notice(test, tld);
1057         if (res == AST_TEST_FAIL) {
1058                 goto end;
1059         }
1060
1061         res = wait_until_thread_state(test, tld, 0, 3);
1062         if (res == AST_TEST_FAIL) {
1063                 goto end;
1064         }
1065
1066         /* Now push three tasks into the pool and ensure the pool does not
1067          * grow.
1068          */
1069         res = AST_TEST_FAIL;
1070
1071         if (ast_threadpool_push(pool, simple_task, std2)) {
1072                 goto end;
1073         }
1074
1075         if (ast_threadpool_push(pool, simple_task, std3)) {
1076                 goto end;
1077         }
1078
1079         if (ast_threadpool_push(pool, simple_task, std4)) {
1080                 goto end;
1081         }
1082
1083         res = wait_for_completion(test, std2);
1084         if (res == AST_TEST_FAIL) {
1085                 goto end;
1086         }
1087         res = wait_for_completion(test, std3);
1088         if (res == AST_TEST_FAIL) {
1089                 goto end;
1090         }
1091         res = wait_for_completion(test, std4);
1092         if (res == AST_TEST_FAIL) {
1093                 goto end;
1094         }
1095
1096         res = wait_for_empty_notice(test, tld);
1097         if (res == AST_TEST_FAIL) {
1098                 goto end;
1099         }
1100
1101         res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
1102         if (res == AST_TEST_FAIL) {
1103                 goto end;
1104         }
1105
1106 end:
1107         ast_threadpool_shutdown(pool);
1108         ao2_cleanup(listener);
1109         simple_task_data_free(std1);
1110         simple_task_data_free(std2);
1111         simple_task_data_free(std3);
1112         simple_task_data_free(std4);
1113         ast_free(tld);
1114         return res;
1115 }
1116
1117 AST_TEST_DEFINE(threadpool_max_size)
1118 {
1119         struct ast_threadpool *pool = NULL;
1120         struct ast_threadpool_listener *listener = NULL;
1121         struct simple_task_data *std = NULL;
1122         enum ast_test_result_state res = AST_TEST_FAIL;
1123         struct test_listener_data *tld = NULL;
1124         struct ast_threadpool_options options = {
1125                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1126                 .idle_timeout = 0,
1127                 .auto_increment = 3,
1128                 .initial_size = 0,
1129                 .max_size = 2,
1130         };
1131
1132         switch (cmd) {
1133         case TEST_INIT:
1134                 info->name = "max_size";
1135                 info->category = "/main/threadpool/";
1136                 info->summary = "Test that the threadpool does not exceed its maximum size restriction";
1137                 info->description =
1138                         "Create an empty threadpool and push a task to it. Once the task is\n"
1139                         "pushed, the threadpool should attempt to grow by three threads, but the\n"
1140                         "pool's restrictions should only allow two threads to be added.";
1141                 return AST_TEST_NOT_RUN;
1142         case TEST_EXECUTE:
1143                 break;
1144         }
1145
1146         tld = test_alloc();
1147         if (!tld) {
1148                 return AST_TEST_FAIL;
1149         }
1150
1151         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1152         if (!listener) {
1153                 goto end;
1154         }
1155
1156         pool = ast_threadpool_create(info->name, listener, &options);
1157         if (!pool) {
1158                 goto end;
1159         }
1160
1161         std = simple_task_data_alloc();
1162         if (!std) {
1163                 goto end;
1164         }
1165
1166         if (ast_threadpool_push(pool, simple_task, std)) {
1167                 goto end;
1168         }
1169
1170         res = wait_for_completion(test, std);
1171         if (res == AST_TEST_FAIL) {
1172                 goto end;
1173         }
1174
1175         res = wait_until_thread_state(test, tld, 0, 2);
1176         if (res == AST_TEST_FAIL) {
1177                 goto end;
1178         }
1179
1180         res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1181 end:
1182         ast_threadpool_shutdown(pool);
1183         ao2_cleanup(listener);
1184         simple_task_data_free(std);
1185         ast_free(tld);
1186         return res;
1187 }
1188
1189 AST_TEST_DEFINE(threadpool_reactivation)
1190 {
1191         struct ast_threadpool *pool = NULL;
1192         struct ast_threadpool_listener *listener = NULL;
1193         struct simple_task_data *std1 = NULL;
1194         struct simple_task_data *std2 = NULL;
1195         enum ast_test_result_state res = AST_TEST_FAIL;
1196         struct test_listener_data *tld = NULL;
1197         struct ast_threadpool_options options = {
1198                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1199                 .idle_timeout = 0,
1200                 .auto_increment = 0,
1201                 .initial_size = 0,
1202                 .max_size = 0,
1203         };
1204
1205         switch (cmd) {
1206         case TEST_INIT:
1207                 info->name = "reactivation";
1208                 info->category = "/main/threadpool/";
1209                 info->summary = "Test that a threadpool reactivates when work is added";
1210                 info->description =
1211                         "Push a task into a threadpool. Make sure the task executes and the\n"
1212                         "thread goes idle. Then push a second task and ensure that the thread\n"
1213                         "awakens and executes the second task.";
1214                 return AST_TEST_NOT_RUN;
1215         case TEST_EXECUTE:
1216                 break;
1217         }
1218
1219         tld = test_alloc();
1220         if (!tld) {
1221                 return AST_TEST_FAIL;
1222         }
1223
1224         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1225         if (!listener) {
1226                 goto end;
1227         }
1228
1229         pool = ast_threadpool_create(info->name, listener, &options);
1230         if (!pool) {
1231                 goto end;
1232         }
1233
1234         std1 = simple_task_data_alloc();
1235         std2 = simple_task_data_alloc();
1236         if (!std1 || !std2) {
1237                 goto end;
1238         }
1239
1240         if (ast_threadpool_push(pool, simple_task, std1)) {
1241                 goto end;
1242         }
1243
1244         ast_threadpool_set_size(pool, 1);
1245
1246         res = wait_for_completion(test, std1);
1247         if (res == AST_TEST_FAIL) {
1248                 goto end;
1249         }
1250
1251         res = wait_for_empty_notice(test, tld);
1252         if (res == AST_TEST_FAIL) {
1253                 goto end;
1254         }
1255
1256         res = wait_until_thread_state(test, tld, 0, 1);
1257         if (res == AST_TEST_FAIL) {
1258                 goto end;
1259         }
1260
1261         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1262         if (res == AST_TEST_FAIL) {
1263                 goto end;
1264         }
1265
1266         /* Now make sure the threadpool reactivates when we add a second task */
1267         if (ast_threadpool_push(pool, simple_task, std2)) {
1268                 res = AST_TEST_FAIL;
1269                 goto end;
1270         }
1271
1272         res = wait_for_completion(test, std2);
1273         if (res == AST_TEST_FAIL) {
1274                 goto end;
1275         }
1276
1277         res = wait_for_empty_notice(test, tld);
1278         if (res == AST_TEST_FAIL) {
1279                 goto end;
1280         }
1281
1282         res = wait_until_thread_state(test, tld, 0, 1);
1283         if (res == AST_TEST_FAIL) {
1284                 goto end;
1285         }
1286
1287         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1288
1289 end:
1290         ast_threadpool_shutdown(pool);
1291         ao2_cleanup(listener);
1292         simple_task_data_free(std1);
1293         simple_task_data_free(std2);
1294         ast_free(tld);
1295         return res;
1296
1297 }
1298
1299 struct complex_task_data {
1300         int task_started;
1301         int task_executed;
1302         int continue_task;
1303         ast_mutex_t lock;
1304         ast_cond_t stall_cond;
1305         ast_cond_t notify_cond;
1306 };
1307
1308 static struct complex_task_data *complex_task_data_alloc(void)
1309 {
1310         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1311
1312         if (!ctd) {
1313                 return NULL;
1314         }
1315         ast_mutex_init(&ctd->lock);
1316         ast_cond_init(&ctd->stall_cond, NULL);
1317         ast_cond_init(&ctd->notify_cond, NULL);
1318         return ctd;
1319 }
1320
1321 static void complex_task_data_free(struct complex_task_data *ctd)
1322 {
1323         if (!ctd) {
1324                 return;
1325         }
1326
1327         ast_mutex_destroy(&ctd->lock);
1328         ast_cond_destroy(&ctd->stall_cond);
1329         ast_cond_destroy(&ctd->notify_cond);
1330
1331         ast_free(ctd);
1332 }
1333
1334 static int complex_task(void *data)
1335 {
1336         struct complex_task_data *ctd = data;
1337         SCOPED_MUTEX(lock, &ctd->lock);
1338         /* Notify that we started */
1339         ctd->task_started = 1;
1340         ast_cond_signal(&ctd->notify_cond);
1341         while (!ctd->continue_task) {
1342                 ast_cond_wait(&ctd->stall_cond, lock);
1343         }
1344         /* We got poked. Finish up */
1345         ctd->task_executed = 1;
1346         ast_cond_signal(&ctd->notify_cond);
1347         return 0;
1348 }
1349
1350 static void poke_worker(struct complex_task_data *ctd)
1351 {
1352         SCOPED_MUTEX(lock, &ctd->lock);
1353         ctd->continue_task = 1;
1354         ast_cond_signal(&ctd->stall_cond);
1355 }
1356
1357 static int wait_for_complex_start(struct complex_task_data *ctd)
1358 {
1359         struct timeval start = ast_tvnow();
1360         struct timespec end = {
1361                 .tv_sec = start.tv_sec + 5,
1362                 .tv_nsec = start.tv_usec * 1000
1363         };
1364         SCOPED_MUTEX(lock, &ctd->lock);
1365
1366         while (!ctd->task_started) {
1367                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1368                         break;
1369                 }
1370         }
1371
1372         return ctd->task_started;
1373 }
1374
1375 static int has_complex_started(struct complex_task_data *ctd)
1376 {
1377         struct timeval start = ast_tvnow();
1378         struct timespec end = {
1379                 .tv_sec = start.tv_sec + 1,
1380                 .tv_nsec = start.tv_usec * 1000
1381         };
1382         SCOPED_MUTEX(lock, &ctd->lock);
1383
1384         while (!ctd->task_started) {
1385                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1386                         break;
1387                 }
1388         }
1389
1390         return ctd->task_started;
1391 }
1392
1393 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
1394 {
1395         struct timeval start = ast_tvnow();
1396         struct timespec end = {
1397                 .tv_sec = start.tv_sec + 5,
1398                 .tv_nsec = start.tv_usec * 1000
1399         };
1400         enum ast_test_result_state res = AST_TEST_PASS;
1401         SCOPED_MUTEX(lock, &ctd->lock);
1402
1403         while (!ctd->task_executed) {
1404                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1405                         break;
1406                 }
1407         }
1408
1409         if (!ctd->task_executed) {
1410                 res = AST_TEST_FAIL;
1411         }
1412         return res;
1413 }
1414
1415 AST_TEST_DEFINE(threadpool_task_distribution)
1416 {
1417         struct ast_threadpool *pool = NULL;
1418         struct ast_threadpool_listener *listener = NULL;
1419         struct complex_task_data *ctd1 = NULL;
1420         struct complex_task_data *ctd2 = NULL;
1421         enum ast_test_result_state res = AST_TEST_FAIL;
1422         struct test_listener_data *tld = NULL;
1423         struct ast_threadpool_options options = {
1424                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1425                 .idle_timeout = 0,
1426                 .auto_increment = 0,
1427                 .initial_size = 0,
1428                 .max_size = 0,
1429         };
1430
1431         switch (cmd) {
1432         case TEST_INIT:
1433                 info->name = "task_distribution";
1434                 info->category = "/main/threadpool/";
1435                 info->summary = "Test that tasks are evenly distributed to threads";
1436                 info->description =
1437                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1438                         "a separate thread";
1439                 return AST_TEST_NOT_RUN;
1440         case TEST_EXECUTE:
1441                 break;
1442         }
1443
1444         tld = test_alloc();
1445         if (!tld) {
1446                 return AST_TEST_FAIL;
1447         }
1448
1449         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1450         if (!listener) {
1451                 goto end;
1452         }
1453
1454         pool = ast_threadpool_create(info->name, listener, &options);
1455         if (!pool) {
1456                 goto end;
1457         }
1458
1459         ctd1 = complex_task_data_alloc();
1460         ctd2 = complex_task_data_alloc();
1461         if (!ctd1 || !ctd2) {
1462                 goto end;
1463         }
1464
1465         if (ast_threadpool_push(pool, complex_task, ctd1)) {
1466                 goto end;
1467         }
1468
1469         if (ast_threadpool_push(pool, complex_task, ctd2)) {
1470                 goto end;
1471         }
1472
1473         ast_threadpool_set_size(pool, 2);
1474
1475         res = wait_until_thread_state(test, tld, 2, 0);
1476         if (res == AST_TEST_FAIL) {
1477                 goto end;
1478         }
1479
1480         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1481         if (res == AST_TEST_FAIL) {
1482                 goto end;
1483         }
1484
1485         /* The tasks are stalled until we poke them */
1486         poke_worker(ctd1);
1487         poke_worker(ctd2);
1488
1489         res = wait_for_complex_completion(ctd1);
1490         if (res == AST_TEST_FAIL) {
1491                 goto end;
1492         }
1493         res = wait_for_complex_completion(ctd2);
1494         if (res == AST_TEST_FAIL) {
1495                 goto end;
1496         }
1497
1498         res = wait_until_thread_state(test, tld, 0, 2);
1499         if (res == AST_TEST_FAIL) {
1500                 goto end;
1501         }
1502
1503         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1504
1505 end:
1506         ast_threadpool_shutdown(pool);
1507         ao2_cleanup(listener);
1508         complex_task_data_free(ctd1);
1509         complex_task_data_free(ctd2);
1510         ast_free(tld);
1511         return res;
1512 }
1513
1514 AST_TEST_DEFINE(threadpool_more_destruction)
1515 {
1516         struct ast_threadpool *pool = NULL;
1517         struct ast_threadpool_listener *listener = NULL;
1518         struct complex_task_data *ctd1 = NULL;
1519         struct complex_task_data *ctd2 = NULL;
1520         enum ast_test_result_state res = AST_TEST_FAIL;
1521         struct test_listener_data *tld = NULL;
1522         struct ast_threadpool_options options = {
1523                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1524                 .idle_timeout = 0,
1525                 .auto_increment = 0,
1526                 .initial_size = 0,
1527                 .max_size = 0,
1528         };
1529
1530         switch (cmd) {
1531         case TEST_INIT:
1532                 info->name = "more_destruction";
1533                 info->category = "/main/threadpool/";
1534                 info->summary = "Test that threads are destroyed as expected";
1535                 info->description =
1536                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1537                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1538                         "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1539                         "and ensure that both tasks complete.";
1540                 return AST_TEST_NOT_RUN;
1541         case TEST_EXECUTE:
1542                 break;
1543         }
1544
1545         tld = test_alloc();
1546         if (!tld) {
1547                 return AST_TEST_FAIL;
1548         }
1549
1550         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1551         if (!listener) {
1552                 goto end;
1553         }
1554
1555         pool = ast_threadpool_create(info->name, listener, &options);
1556         if (!pool) {
1557                 goto end;
1558         }
1559
1560         ctd1 = complex_task_data_alloc();
1561         ctd2 = complex_task_data_alloc();
1562         if (!ctd1 || !ctd2) {
1563                 goto end;
1564         }
1565
1566         if (ast_threadpool_push(pool, complex_task, ctd1)) {
1567                 goto end;
1568         }
1569
1570         if (ast_threadpool_push(pool, complex_task, ctd2)) {
1571                 goto end;
1572         }
1573
1574         ast_threadpool_set_size(pool, 4);
1575
1576         res = wait_until_thread_state(test, tld, 2, 2);
1577         if (res == AST_TEST_FAIL) {
1578                 goto end;
1579         }
1580
1581         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1582         if (res == AST_TEST_FAIL) {
1583                 goto end;
1584         }
1585
1586         ast_threadpool_set_size(pool, 1);
1587
1588         /* Shrinking the threadpool should kill off the two idle threads
1589          * and one of the active threads.
1590          */
1591         res = wait_until_thread_state(test, tld, 1, 0);
1592         if (res == AST_TEST_FAIL) {
1593                 goto end;
1594         }
1595
1596         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1597         if (res == AST_TEST_FAIL) {
1598                 goto end;
1599         }
1600
1601         /* The tasks are stalled until we poke them */
1602         poke_worker(ctd1);
1603         poke_worker(ctd2);
1604
1605         res = wait_for_complex_completion(ctd1);
1606         if (res == AST_TEST_FAIL) {
1607                 goto end;
1608         }
1609         res = wait_for_complex_completion(ctd2);
1610         if (res == AST_TEST_FAIL) {
1611                 goto end;
1612         }
1613
1614         res = wait_until_thread_state(test, tld, 0, 1);
1615         if (res == AST_TEST_FAIL) {
1616                 goto end;
1617         }
1618
1619         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1620
1621 end:
1622         ast_threadpool_shutdown(pool);
1623         ao2_cleanup(listener);
1624         complex_task_data_free(ctd1);
1625         complex_task_data_free(ctd2);
1626         ast_free(tld);
1627         return res;
1628 }
1629
1630 AST_TEST_DEFINE(threadpool_serializer)
1631 {
1632         int started = 0;
1633         int finished = 0;
1634         enum ast_test_result_state res = AST_TEST_FAIL;
1635         struct ast_threadpool *pool = NULL;
1636         struct ast_taskprocessor *uut = NULL;
1637         struct complex_task_data *data1 = NULL;
1638         struct complex_task_data *data2 = NULL;
1639         struct complex_task_data *data3 = NULL;
1640         struct ast_threadpool_options options = {
1641                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1642                 .idle_timeout = 0,
1643                 .auto_increment = 0,
1644                 .initial_size = 2,
1645                 .max_size = 0,
1646         };
1647
1648         switch (cmd) {
1649         case TEST_INIT:
1650                 info->name = "threadpool_serializer";
1651                 info->category = "/main/threadpool/";
1652                 info->summary = "Test that serializers";
1653                 info->description =
1654                         "Ensures that tasks enqueued to a serialize execute in sequence.";
1655                 return AST_TEST_NOT_RUN;
1656         case TEST_EXECUTE:
1657                 break;
1658         }
1659
1660         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1661         if (!pool) {
1662                 ast_test_status_update(test, "Could not create threadpool\n");
1663                 goto end;
1664         }
1665         uut = ast_threadpool_serializer("ser1", pool);
1666         data1 = complex_task_data_alloc();
1667         data2 = complex_task_data_alloc();
1668         data3 = complex_task_data_alloc();
1669         if (!uut || !data1 || !data2 || !data3) {
1670                 ast_test_status_update(test, "Allocation failed\n");
1671                 goto end;
1672         }
1673
1674         /* This should start right away */
1675         if (ast_taskprocessor_push(uut, complex_task, data1)) {
1676                 ast_test_status_update(test, "Failed to enqueue data1\n");
1677                 goto end;
1678         }
1679         started = wait_for_complex_start(data1);
1680         if (!started) {
1681                 ast_test_status_update(test, "Failed to start data1\n");
1682                 goto end;
1683         }
1684
1685         /* This should not start until data 1 is complete */
1686         if (ast_taskprocessor_push(uut, complex_task, data2)) {
1687                 ast_test_status_update(test, "Failed to enqueue data2\n");
1688                 goto end;
1689         }
1690         started = has_complex_started(data2);
1691         if (started) {
1692                 ast_test_status_update(test, "data2 started out of order\n");
1693                 goto end;
1694         }
1695
1696         /* But the free thread in the pool can still run */
1697         if (ast_threadpool_push(pool, complex_task, data3)) {
1698                 ast_test_status_update(test, "Failed to enqueue data3\n");
1699         }
1700         started = wait_for_complex_start(data3);
1701         if (!started) {
1702                 ast_test_status_update(test, "Failed to start data3\n");
1703                 goto end;
1704         }
1705
1706         /* Finishing data1 should allow data2 to start */
1707         poke_worker(data1);
1708         finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1709         if (!finished) {
1710                 ast_test_status_update(test, "data1 couldn't finish\n");
1711                 goto end;
1712         }
1713         started = wait_for_complex_start(data2);
1714         if (!started) {
1715                 ast_test_status_update(test, "Failed to start data2\n");
1716                 goto end;
1717         }
1718
1719         /* Finish up */
1720         poke_worker(data2);
1721         finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1722         if (!finished) {
1723                 ast_test_status_update(test, "data2 couldn't finish\n");
1724                 goto end;
1725         }
1726         poke_worker(data3);
1727         finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1728         if (!finished) {
1729                 ast_test_status_update(test, "data3 couldn't finish\n");
1730                 goto end;
1731         }
1732
1733         res = AST_TEST_PASS;
1734
1735 end:
1736         poke_worker(data1);
1737         poke_worker(data2);
1738         poke_worker(data3);
1739         ast_taskprocessor_unreference(uut);
1740         ast_threadpool_shutdown(pool);
1741         complex_task_data_free(data1);
1742         complex_task_data_free(data2);
1743         complex_task_data_free(data3);
1744         return res;
1745 }
1746
1747 AST_TEST_DEFINE(threadpool_serializer_dupe)
1748 {
1749         enum ast_test_result_state res = AST_TEST_FAIL;
1750         struct ast_threadpool *pool = NULL;
1751         struct ast_taskprocessor *uut = NULL;
1752         struct ast_taskprocessor *there_can_be_only_one = NULL;
1753         struct ast_threadpool_options options = {
1754                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1755                 .idle_timeout = 0,
1756                 .auto_increment = 0,
1757                 .initial_size = 2,
1758                 .max_size = 0,
1759         };
1760
1761         switch (cmd) {
1762         case TEST_INIT:
1763                 info->name = "threadpool_serializer_dupe";
1764                 info->category = "/main/threadpool/";
1765                 info->summary = "Test that serializers are uniquely named";
1766                 info->description =
1767                         "Creating two serializers with the same name should\n"
1768                         "result in error.";
1769                 return AST_TEST_NOT_RUN;
1770         case TEST_EXECUTE:
1771                 break;
1772         }
1773
1774         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1775         if (!pool) {
1776                 ast_test_status_update(test, "Could not create threadpool\n");
1777                 goto end;
1778         }
1779
1780         uut = ast_threadpool_serializer("highlander", pool);
1781         if (!uut) {
1782                 ast_test_status_update(test, "Allocation failed\n");
1783                 goto end;
1784         }
1785
1786         there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1787         if (there_can_be_only_one) {
1788                 ast_taskprocessor_unreference(there_can_be_only_one);
1789                 ast_test_status_update(test, "Duplicate name error\n");
1790                 goto end;
1791         }
1792
1793         res = AST_TEST_PASS;
1794
1795 end:
1796         ast_taskprocessor_unreference(uut);
1797         ast_threadpool_shutdown(pool);
1798         return res;
1799 }
1800
1801 static int unload_module(void)
1802 {
1803         ast_test_unregister(threadpool_push);
1804         ast_test_unregister(threadpool_initial_threads);
1805         ast_test_unregister(threadpool_thread_creation);
1806         ast_test_unregister(threadpool_thread_destruction);
1807         ast_test_unregister(threadpool_thread_timeout);
1808         ast_test_unregister(threadpool_thread_timeout_thrash);
1809         ast_test_unregister(threadpool_one_task_one_thread);
1810         ast_test_unregister(threadpool_one_thread_one_task);
1811         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1812         ast_test_unregister(threadpool_auto_increment);
1813         ast_test_unregister(threadpool_max_size);
1814         ast_test_unregister(threadpool_reactivation);
1815         ast_test_unregister(threadpool_task_distribution);
1816         ast_test_unregister(threadpool_more_destruction);
1817         ast_test_unregister(threadpool_serializer);
1818         ast_test_unregister(threadpool_serializer_dupe);
1819         return 0;
1820 }
1821
1822 static int load_module(void)
1823 {
1824         ast_test_register(threadpool_push);
1825         ast_test_register(threadpool_initial_threads);
1826         ast_test_register(threadpool_thread_creation);
1827         ast_test_register(threadpool_thread_destruction);
1828         ast_test_register(threadpool_thread_timeout);
1829         ast_test_register(threadpool_thread_timeout_thrash);
1830         ast_test_register(threadpool_one_task_one_thread);
1831         ast_test_register(threadpool_one_thread_one_task);
1832         ast_test_register(threadpool_one_thread_multiple_tasks);
1833         ast_test_register(threadpool_auto_increment);
1834         ast_test_register(threadpool_max_size);
1835         ast_test_register(threadpool_reactivation);
1836         ast_test_register(threadpool_task_distribution);
1837         ast_test_register(threadpool_more_destruction);
1838         ast_test_register(threadpool_serializer);
1839         ast_test_register(threadpool_serializer_dupe);
1840         return AST_MODULE_LOAD_SUCCESS;
1841 }
1842
1843 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");