Address review board feedback from Matt and Richard
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/test.h"
35 #include "asterisk/threadpool.h"
36 #include "asterisk/module.h"
37 #include "asterisk/lock.h"
38 #include "asterisk/astobj2.h"
39 #include "asterisk/logger.h"
40
41 struct test_listener_data {
42         int num_active;
43         int num_idle;
44         int task_pushed;
45         int num_tasks;
46         int empty_notice;
47         int was_empty;
48         ast_mutex_t lock;
49         ast_cond_t cond;
50 };
51
52 static void *test_alloc(struct ast_threadpool_listener *listener)
53 {
54         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
55         if (!tld) {
56                 return NULL;
57         }
58         ast_mutex_init(&tld->lock);
59         ast_cond_init(&tld->cond, NULL);
60         return tld;
61 }
62
63 static void test_state_changed(struct ast_threadpool *pool,
64                 struct ast_threadpool_listener *listener,
65                 int active_threads,
66                 int idle_threads)
67 {
68         struct test_listener_data *tld = listener->private_data;
69         SCOPED_MUTEX(lock, &tld->lock);
70         tld->num_active = active_threads;
71         tld->num_idle = idle_threads;
72         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
73         ast_cond_signal(&tld->cond);
74 }
75
76 static void test_task_pushed(struct ast_threadpool *pool,
77                 struct ast_threadpool_listener *listener,
78                 int was_empty)
79 {
80         struct test_listener_data *tld = listener->private_data;
81         SCOPED_MUTEX(lock, &tld->lock);
82         tld->task_pushed = 1;
83         ++tld->num_tasks;
84         tld->was_empty = was_empty;
85         ast_cond_signal(&tld->cond);
86 }
87
88 static void test_emptied(struct ast_threadpool *pool,
89                 struct ast_threadpool_listener *listener)
90 {
91         struct test_listener_data *tld = listener->private_data;
92         SCOPED_MUTEX(lock, &tld->lock);
93         tld->empty_notice = 1;
94         ast_cond_signal(&tld->cond);
95 }
96
97 static void test_destroy(void *private_data)
98 {
99         struct test_listener_data *tld = private_data;
100         ast_cond_destroy(&tld->cond);
101         ast_mutex_destroy(&tld->lock);
102         ast_free(tld);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .alloc = test_alloc,
107         .state_changed = test_state_changed,
108         .task_pushed = test_task_pushed,
109         .emptied = test_emptied,
110         .destroy = test_destroy,
111 };
112
113 struct simple_task_data {
114         int task_executed;
115         ast_mutex_t lock;
116         ast_cond_t cond;
117 };
118
119 static struct simple_task_data *simple_task_data_alloc(void)
120 {
121         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
122
123         if (!std) {
124                 return NULL;
125         }
126         ast_mutex_init(&std->lock);
127         ast_cond_init(&std->cond, NULL);
128         return std;
129 }
130
131 static int simple_task(void *data)
132 {
133         struct simple_task_data *std = data;
134         SCOPED_MUTEX(lock, &std->lock);
135         std->task_executed = 1;
136         ast_cond_signal(&std->cond);
137         return 0;
138 }
139
140 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
141 {
142         struct timeval start = ast_tvnow();
143         struct timespec end = {
144                 .tv_sec = start.tv_sec + 5,
145                 .tv_nsec = start.tv_usec * 1000
146         };
147         enum ast_test_result_state res = AST_TEST_PASS;
148         SCOPED_MUTEX(lock, &tld->lock);
149
150         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
151                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
152                         break;
153                 }
154         }
155
156         if (tld->num_active != num_active && tld->num_idle != num_idle) {
157                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
158                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
159                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
160                 res = AST_TEST_FAIL;
161         }
162
163         return res;
164 }
165
166 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
167 {
168         struct test_listener_data *tld = listener->private_data;
169         struct timeval start = ast_tvnow();
170         struct timespec end = {
171                 .tv_sec = start.tv_sec + 5,
172                 .tv_nsec = start.tv_usec * 1000
173         };
174         SCOPED_MUTEX(lock, &tld->lock);
175
176         while (!tld->task_pushed) {
177                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
178                         break;
179                 }
180         }
181 }
182
183 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
184 {
185         struct timeval start = ast_tvnow();
186         struct timespec end = {
187                 .tv_sec = start.tv_sec + 5,
188                 .tv_nsec = start.tv_usec * 1000
189         };
190         enum ast_test_result_state res = AST_TEST_PASS;
191         SCOPED_MUTEX(lock, &std->lock);
192
193         while (!std->task_executed) {
194                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
195                         break;
196                 }
197         }
198
199         if (!std->task_executed) {
200                 ast_test_status_update(test, "Task execution did not occur\n");
201                 res = AST_TEST_FAIL;
202         }
203         return res;
204 }
205
206 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
207 {
208         struct timeval start = ast_tvnow();
209         struct timespec end = {
210                 .tv_sec = start.tv_sec + 5,
211                 .tv_nsec = start.tv_usec * 1000
212         };
213         enum ast_test_result_state res = AST_TEST_PASS;
214         SCOPED_MUTEX(lock, &tld->lock);
215
216         while (!tld->empty_notice) {
217                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
218                         break;
219                 }
220         }
221
222         if (!tld->empty_notice) {
223                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
224                 res = AST_TEST_FAIL;
225         }
226
227         return res;
228 }
229
230 static enum ast_test_result_state listener_check(
231                 struct ast_test *test,
232                 struct ast_threadpool_listener *listener,
233                 int task_pushed,
234                 int was_empty,
235                 int num_tasks,
236                 int num_active,
237                 int num_idle,
238                 int empty_notice)
239 {
240         struct test_listener_data *tld = listener->private_data;
241         enum ast_test_result_state res = AST_TEST_PASS;
242
243         if (tld->task_pushed != task_pushed) {
244                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
245                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
246                 res = AST_TEST_FAIL;
247         }
248         if (tld->was_empty != was_empty) {
249                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
250                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
251                 res = AST_TEST_FAIL;
252         }
253         if (tld->num_tasks!= num_tasks) {
254                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
255                                 num_tasks, tld->num_tasks);
256                 res = AST_TEST_FAIL;
257         }
258         if (tld->num_active != num_active) {
259                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
260                                 num_active, tld->num_active);
261                 res = AST_TEST_FAIL;
262         }
263         if (tld->num_idle != num_idle) {
264                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
265                                 num_idle, tld->num_idle);
266                 res = AST_TEST_FAIL;
267         }
268         if (tld->empty_notice != empty_notice) {
269                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
270                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
271                 res = AST_TEST_FAIL;
272         }
273
274         return res;
275 }
276
277 AST_TEST_DEFINE(threadpool_push)
278 {
279         struct ast_threadpool *pool = NULL;
280         struct ast_threadpool_listener *listener = NULL;
281         struct simple_task_data *std = NULL;
282         enum ast_test_result_state res = AST_TEST_FAIL;
283         struct ast_threadpool_options options = {
284                 .version = AST_THREADPOOL_OPTIONS_VERSION,
285                 .idle_timeout = 0,
286                 .auto_increment = 0,
287         };
288
289         switch (cmd) {
290         case TEST_INIT:
291                 info->name = "push";
292                 info->category = "/main/threadpool/";
293                 info->summary = "Test task";
294                 info->description =
295                         "Basic threadpool test";
296                 return AST_TEST_NOT_RUN;
297         case TEST_EXECUTE:
298                 break;
299         }
300
301         listener = ast_threadpool_listener_alloc(&test_callbacks);
302         if (!listener) {
303                 return AST_TEST_FAIL;
304         }
305
306         pool = ast_threadpool_create(info->name, listener, 0, &options);
307         if (!pool) {
308                 goto end;
309         }
310
311         std = simple_task_data_alloc();
312         if (!std) {
313                 goto end;
314         }
315
316         ast_threadpool_push(pool, simple_task, std);
317
318         wait_for_task_pushed(listener);
319
320         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
321
322 end:
323         if (pool) {
324                 ast_threadpool_shutdown(pool);
325         }
326         ao2_cleanup(listener);
327         ast_free(std);
328         return res;
329 }
330
331 AST_TEST_DEFINE(threadpool_initial_threads)
332 {
333         struct ast_threadpool *pool = NULL;
334         struct ast_threadpool_listener *listener = NULL;
335         enum ast_test_result_state res = AST_TEST_FAIL;
336         struct test_listener_data *tld;
337         struct ast_threadpool_options options = {
338                 .version = AST_THREADPOOL_OPTIONS_VERSION,
339                 .idle_timeout = 0,
340                 .auto_increment = 0,
341         };
342
343         switch (cmd) {
344         case TEST_INIT:
345                 info->name = "initial_threads";
346                 info->category = "/main/threadpool/";
347                 info->summary = "Test threadpool initialization state";
348                 info->description =
349                         "Ensure that a threadpool created with a specific size contains the\n"
350                         "proper number of idle threads.";
351                 return AST_TEST_NOT_RUN;
352         case TEST_EXECUTE:
353                 break;
354         }
355
356         listener = ast_threadpool_listener_alloc(&test_callbacks);
357         if (!listener) {
358                 return AST_TEST_FAIL;
359         }
360         tld = listener->private_data;
361
362         pool = ast_threadpool_create(info->name, listener, 3, &options);
363         if (!pool) {
364                 goto end;
365         }
366
367         res = wait_until_thread_state(test, tld, 0, 3);
368
369 end:
370         if (pool) {
371                 ast_threadpool_shutdown(pool);
372         }
373         ao2_cleanup(listener);
374         return res;
375 }
376
377
378 AST_TEST_DEFINE(threadpool_thread_creation)
379 {
380         struct ast_threadpool *pool = NULL;
381         struct ast_threadpool_listener *listener = NULL;
382         enum ast_test_result_state res = AST_TEST_FAIL;
383         struct test_listener_data *tld;
384         struct ast_threadpool_options options = {
385                 .version = AST_THREADPOOL_OPTIONS_VERSION,
386                 .idle_timeout = 0,
387                 .auto_increment = 0,
388         };
389
390         switch (cmd) {
391         case TEST_INIT:
392                 info->name = "thread_creation";
393                 info->category = "/main/threadpool/";
394                 info->summary = "Test threadpool thread creation";
395                 info->description =
396                         "Ensure that threads can be added to a threadpool";
397                 return AST_TEST_NOT_RUN;
398         case TEST_EXECUTE:
399                 break;
400         }
401
402         listener = ast_threadpool_listener_alloc(&test_callbacks);
403         if (!listener) {
404                 return AST_TEST_FAIL;
405         }
406         tld = listener->private_data;
407
408         pool = ast_threadpool_create(info->name, listener, 0, &options);
409         if (!pool) {
410                 goto end;
411         }
412
413         /* Now let's create a thread. It should start active, then go
414          * idle immediately
415          */
416         ast_threadpool_set_size(pool, 1);
417
418         res = wait_until_thread_state(test, tld, 0, 1);
419
420 end:
421         if (pool) {
422                 ast_threadpool_shutdown(pool);
423         }
424         ao2_cleanup(listener);
425         return res;
426 }
427
428 AST_TEST_DEFINE(threadpool_thread_destruction)
429 {
430         struct ast_threadpool *pool = NULL;
431         struct ast_threadpool_listener *listener = NULL;
432         enum ast_test_result_state res = AST_TEST_FAIL;
433         struct test_listener_data *tld;
434         struct ast_threadpool_options options = {
435                 .version = AST_THREADPOOL_OPTIONS_VERSION,
436                 .idle_timeout = 0,
437                 .auto_increment = 0,
438         };
439
440         switch (cmd) {
441         case TEST_INIT:
442                 info->name = "thread_destruction";
443                 info->category = "/main/threadpool/";
444                 info->summary = "Test threadpool thread destruction";
445                 info->description =
446                         "Ensure that threads are properly destroyed in a threadpool";
447                 return AST_TEST_NOT_RUN;
448         case TEST_EXECUTE:
449                 break;
450         }
451
452         listener = ast_threadpool_listener_alloc(&test_callbacks);
453         if (!listener) {
454                 return AST_TEST_FAIL;
455         }
456         tld = listener->private_data;
457
458         pool = ast_threadpool_create(info->name, listener, 0, &options);
459         if (!pool) {
460                 goto end;
461         }
462
463         ast_threadpool_set_size(pool, 3);
464
465         res = wait_until_thread_state(test, tld, 0, 3);
466         if (res == AST_TEST_FAIL) {
467                 goto end;
468         }
469
470         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
471         if (res == AST_TEST_FAIL) {
472                 goto end;
473         }
474
475         ast_threadpool_set_size(pool, 2);
476
477         res = wait_until_thread_state(test, tld, 0, 2);
478
479 end:
480         if (pool) {
481                 ast_threadpool_shutdown(pool);
482         }
483         ao2_cleanup(listener);
484         return res;
485 }
486
487 AST_TEST_DEFINE(threadpool_thread_timeout)
488 {
489         struct ast_threadpool *pool = NULL;
490         struct ast_threadpool_listener *listener = NULL;
491         enum ast_test_result_state res = AST_TEST_FAIL;
492         struct test_listener_data *tld;
493         struct ast_threadpool_options options = {
494                 .version = AST_THREADPOOL_OPTIONS_VERSION,
495                 .idle_timeout = 2,
496                 .auto_increment = 0,
497         };
498
499         switch (cmd) {
500         case TEST_INIT:
501                 info->name = "thread_timeout";
502                 info->category = "/main/threadpool/";
503                 info->summary = "Test threadpool thread timeout";
504                 info->description =
505                         "Ensure that a thread with a two second timeout dies as expected.";
506                 return AST_TEST_NOT_RUN;
507         case TEST_EXECUTE:
508                 break;
509         }
510
511         listener = ast_threadpool_listener_alloc(&test_callbacks);
512         if (!listener) {
513                 return AST_TEST_FAIL;
514         }
515         tld = listener->private_data;
516
517         pool = ast_threadpool_create(info->name, listener, 0, &options);
518         if (!pool) {
519                 goto end;
520         }
521
522         ast_threadpool_set_size(pool, 1);
523
524         res = wait_until_thread_state(test, tld, 0, 1);
525         if (res == AST_TEST_FAIL) {
526                 goto end;
527         }
528
529         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
530         if (res == AST_TEST_FAIL) {
531                 goto end;
532         }
533
534         res = wait_until_thread_state(test, tld, 0, 0);
535         if (res == AST_TEST_FAIL) {
536                 goto end;
537         }
538
539         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
540
541 end:
542         if (pool) {
543                 ast_threadpool_shutdown(pool);
544         }
545         ao2_cleanup(listener);
546         return res;
547 }
548
549 AST_TEST_DEFINE(threadpool_one_task_one_thread)
550 {
551         struct ast_threadpool *pool = NULL;
552         struct ast_threadpool_listener *listener = NULL;
553         struct simple_task_data *std = NULL;
554         enum ast_test_result_state res = AST_TEST_FAIL;
555         struct test_listener_data *tld;
556         struct ast_threadpool_options options = {
557                 .version = AST_THREADPOOL_OPTIONS_VERSION,
558                 .idle_timeout = 0,
559                 .auto_increment = 0,
560         };
561
562         switch (cmd) {
563         case TEST_INIT:
564                 info->name = "one_task_one_thread";
565                 info->category = "/main/threadpool/";
566                 info->summary = "Test a single task with a single thread";
567                 info->description =
568                         "Push a task into an empty threadpool, then add a thread to the pool.";
569                 return AST_TEST_NOT_RUN;
570         case TEST_EXECUTE:
571                 break;
572         }
573
574         listener = ast_threadpool_listener_alloc(&test_callbacks);
575         if (!listener) {
576                 return AST_TEST_FAIL;
577         }
578         tld = listener->private_data;
579
580         pool = ast_threadpool_create(info->name, listener, 0, &options);
581         if (!pool) {
582                 goto end;
583         }
584
585         std = simple_task_data_alloc();
586         if (!std) {
587                 goto end;
588         }
589
590         ast_threadpool_push(pool, simple_task, std);
591
592         ast_threadpool_set_size(pool, 1);
593
594         /* Threads added to the pool are active when they start,
595          * so the newly-created thread should immediately execute
596          * the waiting task.
597          */
598         res = wait_for_completion(test, std);
599         if (res == AST_TEST_FAIL) {
600                 goto end;
601         }
602
603         res = wait_for_empty_notice(test, tld);
604         if (res == AST_TEST_FAIL) {
605                 goto end;
606         }
607
608         /* After completing the task, the thread should go idle */
609         res = wait_until_thread_state(test, tld, 0, 1);
610         if (res == AST_TEST_FAIL) {
611                 goto end;
612         }
613
614         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
615
616 end:
617         if (pool) {
618                 ast_threadpool_shutdown(pool);
619         }
620         ao2_cleanup(listener);
621         ast_free(std);
622         return res;
623
624 }
625
626 AST_TEST_DEFINE(threadpool_one_thread_one_task)
627 {
628         struct ast_threadpool *pool = NULL;
629         struct ast_threadpool_listener *listener = NULL;
630         struct simple_task_data *std = NULL;
631         enum ast_test_result_state res = AST_TEST_FAIL;
632         struct test_listener_data *tld;
633         struct ast_threadpool_options options = {
634                 .version = AST_THREADPOOL_OPTIONS_VERSION,
635                 .idle_timeout = 0,
636                 .auto_increment = 0,
637         };
638
639         switch (cmd) {
640         case TEST_INIT:
641                 info->name = "one_thread_one_task";
642                 info->category = "/main/threadpool/";
643                 info->summary = "Test a single thread with a single task";
644                 info->description =
645                         "Add a thread to the pool and then push a task to it.";
646                 return AST_TEST_NOT_RUN;
647         case TEST_EXECUTE:
648                 break;
649         }
650
651         listener = ast_threadpool_listener_alloc(&test_callbacks);
652         if (!listener) {
653                 return AST_TEST_FAIL;
654         }
655         tld = listener->private_data;
656
657         pool = ast_threadpool_create(info->name, listener, 0, &options);
658         if (!pool) {
659                 goto end;
660         }
661
662         std = simple_task_data_alloc();
663         if (!std) {
664                 goto end;
665         }
666
667         ast_threadpool_set_size(pool, 1);
668
669         res = wait_until_thread_state(test, tld, 0, 1);
670         if (res == AST_TEST_FAIL) {
671                 goto end;
672         }
673
674         ast_threadpool_push(pool, simple_task, std);
675
676         res = wait_for_completion(test, std);
677         if (res == AST_TEST_FAIL) {
678                 goto end;
679         }
680
681         res = wait_for_empty_notice(test, tld);
682         if (res == AST_TEST_FAIL) {
683                 goto end;
684         }
685
686         /* After completing the task, the thread should go idle */
687         res = wait_until_thread_state(test, tld, 0, 1);
688         if (res == AST_TEST_FAIL) {
689                 goto end;
690         }
691
692         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
693
694 end:
695         if (pool) {
696                 ast_threadpool_shutdown(pool);
697         }
698         ao2_cleanup(listener);
699         ast_free(std);
700         return res;
701 }
702
703 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
704 {
705         struct ast_threadpool *pool = NULL;
706         struct ast_threadpool_listener *listener = NULL;
707         struct simple_task_data *std1 = NULL;
708         struct simple_task_data *std2 = NULL;
709         struct simple_task_data *std3 = NULL;
710         enum ast_test_result_state res = AST_TEST_FAIL;
711         struct test_listener_data *tld;
712         struct ast_threadpool_options options = {
713                 .version = AST_THREADPOOL_OPTIONS_VERSION,
714                 .idle_timeout = 0,
715                 .auto_increment = 0,
716         };
717
718         switch (cmd) {
719         case TEST_INIT:
720                 info->name = "one_thread_multiple_tasks";
721                 info->category = "/main/threadpool/";
722                 info->summary = "Test a single thread with multiple tasks";
723                 info->description =
724                         "Add a thread to the pool and then push three tasks to it.";
725                 return AST_TEST_NOT_RUN;
726         case TEST_EXECUTE:
727                 break;
728         }
729
730         listener = ast_threadpool_listener_alloc(&test_callbacks);
731         if (!listener) {
732                 return AST_TEST_FAIL;
733         }
734         tld = listener->private_data;
735
736         pool = ast_threadpool_create(info->name, listener, 0, &options);
737         if (!pool) {
738                 goto end;
739         }
740
741         std1 = simple_task_data_alloc();
742         std2 = simple_task_data_alloc();
743         std3 = simple_task_data_alloc();
744         if (!std1 || !std2 || !std3) {
745                 goto end;
746         }
747
748         ast_threadpool_set_size(pool, 1);
749
750         res = wait_until_thread_state(test, tld, 0, 1);
751         if (res == AST_TEST_FAIL) {
752                 goto end;
753         }
754
755         ast_threadpool_push(pool, simple_task, std1);
756         ast_threadpool_push(pool, simple_task, std2);
757         ast_threadpool_push(pool, simple_task, std3);
758
759         res = wait_for_completion(test, std1);
760         if (res == AST_TEST_FAIL) {
761                 goto end;
762         }
763         res = wait_for_completion(test, std2);
764         if (res == AST_TEST_FAIL) {
765                 goto end;
766         }
767         res = wait_for_completion(test, std3);
768         if (res == AST_TEST_FAIL) {
769                 goto end;
770         }
771
772         res = wait_for_empty_notice(test, tld);
773         if (res == AST_TEST_FAIL) {
774                 goto end;
775         }
776
777         res = wait_until_thread_state(test, tld, 0, 1);
778         if (res == AST_TEST_FAIL) {
779                 goto end;
780         }
781
782         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
783
784 end:
785         if (pool) {
786                 ast_threadpool_shutdown(pool);
787         }
788         ao2_cleanup(listener);
789         ast_free(std1);
790         ast_free(std2);
791         ast_free(std3);
792         return res;
793 }
794
795 AST_TEST_DEFINE(threadpool_auto_increment)
796 {
797         struct ast_threadpool *pool = NULL;
798         struct ast_threadpool_listener *listener = NULL;
799         struct simple_task_data *std1 = NULL;
800         struct simple_task_data *std2 = NULL;
801         struct simple_task_data *std3 = NULL;
802         struct simple_task_data *std4 = NULL;
803         enum ast_test_result_state res = AST_TEST_FAIL;
804         struct test_listener_data *tld;
805         struct ast_threadpool_options options = {
806                 .version = AST_THREADPOOL_OPTIONS_VERSION,
807                 .idle_timeout = 0,
808                 .auto_increment = 3,
809         };
810
811         switch (cmd) {
812         case TEST_INIT:
813                 info->name = "auto_increment";
814                 info->category = "/main/threadpool/";
815                 info->summary = "Test that the threadpool grows as tasks are added";
816                 info->description =
817                         "Create an empty threadpool and push a task to it. Once the task is\n"
818                         "pushed, the threadpool should add three threads and be able to\n"
819                         "handle the task. The threads should then go idle\n";
820                 return AST_TEST_NOT_RUN;
821         case TEST_EXECUTE:
822                 break;
823         }
824
825         listener = ast_threadpool_listener_alloc(&test_callbacks);
826         if (!listener) {
827                 return AST_TEST_FAIL;
828         }
829         tld = listener->private_data;
830
831         pool = ast_threadpool_create(info->name, listener, 0, &options);
832         if (!pool) {
833                 goto end;
834         }
835
836         std1 = simple_task_data_alloc();
837         std2 = simple_task_data_alloc();
838         std3 = simple_task_data_alloc();
839         std4 = simple_task_data_alloc();
840         if (!std1 || !std2 || !std3 || !std4) {
841                 goto end;
842         }
843
844         ast_threadpool_push(pool, simple_task, std1);
845
846         /* Pushing the task should result in the threadpool growing
847          * by three threads. This will allow the task to actually execute
848          */
849         res = wait_for_completion(test, std1);
850         if (res == AST_TEST_FAIL) {
851                 goto end;
852         }
853
854         res = wait_for_empty_notice(test, tld);
855         if (res == AST_TEST_FAIL) {
856                 goto end;
857         }
858
859         res = wait_until_thread_state(test, tld, 0, 3);
860         if (res == AST_TEST_FAIL) {
861                 goto end;
862         }
863
864         /* Now push three tasks into the pool and ensure the pool does not
865          * grow.
866          */
867         ast_threadpool_push(pool, simple_task, std2);
868         ast_threadpool_push(pool, simple_task, std3);
869         ast_threadpool_push(pool, simple_task, std4);
870
871         res = wait_for_completion(test, std2);
872         if (res == AST_TEST_FAIL) {
873                 goto end;
874         }
875         res = wait_for_completion(test, std3);
876         if (res == AST_TEST_FAIL) {
877                 goto end;
878         }
879         res = wait_for_completion(test, std4);
880         if (res == AST_TEST_FAIL) {
881                 goto end;
882         }
883
884         res = wait_for_empty_notice(test, tld);
885         if (res == AST_TEST_FAIL) {
886                 goto end;
887         }
888
889         res = wait_until_thread_state(test, tld, 0, 3);
890         if (res == AST_TEST_FAIL) {
891                 goto end;
892         }
893         res = listener_check(test, listener, 1, 0, 4, 0, 3, 1);
894
895 end:
896         if (pool) {
897                 ast_threadpool_shutdown(pool);
898         }
899         ao2_cleanup(listener);
900         ast_free(std1);
901         ast_free(std2);
902         ast_free(std3);
903         ast_free(std4);
904         return res;
905 }
906
907 AST_TEST_DEFINE(threadpool_reactivation)
908 {
909         struct ast_threadpool *pool = NULL;
910         struct ast_threadpool_listener *listener = NULL;
911         struct simple_task_data *std1 = NULL;
912         struct simple_task_data *std2 = NULL;
913         enum ast_test_result_state res = AST_TEST_FAIL;
914         struct test_listener_data *tld;
915         struct ast_threadpool_options options = {
916                 .version = AST_THREADPOOL_OPTIONS_VERSION,
917                 .idle_timeout = 0,
918                 .auto_increment = 0,
919         };
920
921         switch (cmd) {
922         case TEST_INIT:
923                 info->name = "reactivation";
924                 info->category = "/main/threadpool/";
925                 info->summary = "Test that a threadpool reactivates when work is added";
926                 info->description =
927                         "Push a task into a threadpool. Make sure the task executes and the\n"
928                         "thread goes idle. Then push a second task and ensure that the thread\n"
929                         "awakens and executes the second task.\n";
930                 return AST_TEST_NOT_RUN;
931         case TEST_EXECUTE:
932                 break;
933         }
934
935         listener = ast_threadpool_listener_alloc(&test_callbacks);
936         if (!listener) {
937                 return AST_TEST_FAIL;
938         }
939         tld = listener->private_data;
940
941         pool = ast_threadpool_create(info->name, listener, 0, &options);
942         if (!pool) {
943                 goto end;
944         }
945
946         std1 = simple_task_data_alloc();
947         std2 = simple_task_data_alloc();
948         if (!std1 || !std2) {
949                 goto end;
950         }
951
952         ast_threadpool_push(pool, simple_task, std1);
953
954         ast_threadpool_set_size(pool, 1);
955
956         res = wait_for_completion(test, std1);
957         if (res == AST_TEST_FAIL) {
958                 goto end;
959         }
960
961         res = wait_for_empty_notice(test, tld);
962         if (res == AST_TEST_FAIL) {
963                 goto end;
964         }
965
966         res = wait_until_thread_state(test, tld, 0, 1);
967         if (res == AST_TEST_FAIL) {
968                 goto end;
969         }
970
971         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
972         if (res == AST_TEST_FAIL) {
973                 goto end;
974         }
975
976         /* Now make sure the threadpool reactivates when we add a second task */
977         ast_threadpool_push(pool, simple_task, std2);
978
979         res = wait_for_completion(test, std2);
980         if (res == AST_TEST_FAIL) {
981                 goto end;
982         }
983
984         res = wait_for_empty_notice(test, tld);
985         if (res == AST_TEST_FAIL) {
986                 goto end;
987         }
988
989         res = wait_until_thread_state(test, tld, 0, 1);
990         if (res == AST_TEST_FAIL) {
991                 goto end;
992         }
993
994         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
995
996 end:
997         if (pool) {
998                 ast_threadpool_shutdown(pool);
999         }
1000         ao2_cleanup(listener);
1001         ast_free(std1);
1002         ast_free(std2);
1003         return res;
1004
1005 }
1006
1007 struct complex_task_data {
1008         int task_executed;
1009         int continue_task;
1010         ast_mutex_t lock;
1011         ast_cond_t stall_cond;
1012         ast_cond_t done_cond;
1013 };
1014
1015 static struct complex_task_data *complex_task_data_alloc(void)
1016 {
1017         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1018
1019         if (!ctd) {
1020                 return NULL;
1021         }
1022         ast_mutex_init(&ctd->lock);
1023         ast_cond_init(&ctd->stall_cond, NULL);
1024         ast_cond_init(&ctd->done_cond, NULL);
1025         return ctd;
1026 }
1027
1028 static int complex_task(void *data)
1029 {
1030         struct complex_task_data *ctd = data;
1031         SCOPED_MUTEX(lock, &ctd->lock);
1032         while (!ctd->continue_task) {
1033                 ast_cond_wait(&ctd->stall_cond, lock);
1034         }
1035         /* We got poked. Finish up */
1036         ctd->task_executed = 1;
1037         ast_cond_signal(&ctd->done_cond);
1038         return 0;
1039 }
1040
1041 static void poke_worker(struct complex_task_data *ctd)
1042 {
1043         SCOPED_MUTEX(lock, &ctd->lock);
1044         ctd->continue_task = 1;
1045         ast_cond_signal(&ctd->stall_cond);
1046 }
1047
1048 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
1049 {
1050         struct timeval start = ast_tvnow();
1051         struct timespec end = {
1052                 .tv_sec = start.tv_sec + 5,
1053                 .tv_nsec = start.tv_usec * 1000
1054         };
1055         enum ast_test_result_state res = AST_TEST_PASS;
1056         SCOPED_MUTEX(lock, &ctd->lock);
1057
1058         while (!ctd->task_executed) {
1059                 if (ast_cond_timedwait(&ctd->done_cond, lock, &end) == ETIMEDOUT) {
1060                         break;
1061                 }
1062         }
1063
1064         if (!ctd->task_executed) {
1065                 res = AST_TEST_FAIL;
1066         }
1067         return res;
1068 }
1069
1070 AST_TEST_DEFINE(threadpool_task_distribution)
1071 {
1072         struct ast_threadpool *pool = NULL;
1073         struct ast_threadpool_listener *listener = NULL;
1074         struct complex_task_data *ctd1 = NULL;
1075         struct complex_task_data *ctd2 = NULL;
1076         enum ast_test_result_state res = AST_TEST_FAIL;
1077         struct test_listener_data *tld;
1078         struct ast_threadpool_options options = {
1079                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1080                 .idle_timeout = 0,
1081                 .auto_increment = 0,
1082         };
1083
1084         switch (cmd) {
1085         case TEST_INIT:
1086                 info->name = "task_distribution";
1087                 info->category = "/main/threadpool/";
1088                 info->summary = "Test that tasks are evenly distributed to threads";
1089                 info->description =
1090                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1091                         "a separate thread\n";
1092                 return AST_TEST_NOT_RUN;
1093         case TEST_EXECUTE:
1094                 break;
1095         }
1096
1097         listener = ast_threadpool_listener_alloc(&test_callbacks);
1098         if (!listener) {
1099                 return AST_TEST_FAIL;
1100         }
1101         tld = listener->private_data;
1102
1103         pool = ast_threadpool_create(info->name, listener, 0, &options);
1104         if (!pool) {
1105                 goto end;
1106         }
1107
1108         ctd1 = complex_task_data_alloc();
1109         ctd2 = complex_task_data_alloc();
1110         if (!ctd1 || !ctd2) {
1111                 goto end;
1112         }
1113
1114         ast_threadpool_push(pool, complex_task, ctd1);
1115         ast_threadpool_push(pool, complex_task, ctd2);
1116
1117         ast_threadpool_set_size(pool, 2);
1118
1119         res = wait_until_thread_state(test, tld, 2, 0);
1120         if (res == AST_TEST_FAIL) {
1121                 goto end;
1122         }
1123
1124         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1125         if (res == AST_TEST_FAIL) {
1126                 goto end;
1127         }
1128
1129         /* The tasks are stalled until we poke them */
1130         poke_worker(ctd1);
1131         poke_worker(ctd2);
1132
1133         res = wait_for_complex_completion(ctd1);
1134         if (res == AST_TEST_FAIL) {
1135                 goto end;
1136         }
1137         res = wait_for_complex_completion(ctd2);
1138         if (res == AST_TEST_FAIL) {
1139                 goto end;
1140         }
1141
1142         res = wait_until_thread_state(test, tld, 0, 2);
1143         if (res == AST_TEST_FAIL) {
1144                 goto end;
1145         }
1146
1147         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1148
1149 end:
1150         if (pool) {
1151                 ast_threadpool_shutdown(pool);
1152         }
1153         ao2_cleanup(listener);
1154         ast_free(ctd1);
1155         ast_free(ctd2);
1156         return res;
1157 }
1158
1159 AST_TEST_DEFINE(threadpool_more_destruction)
1160 {
1161         struct ast_threadpool *pool = NULL;
1162         struct ast_threadpool_listener *listener = NULL;
1163         struct complex_task_data *ctd1 = NULL;
1164         struct complex_task_data *ctd2 = NULL;
1165         enum ast_test_result_state res = AST_TEST_FAIL;
1166         struct test_listener_data *tld;
1167         struct ast_threadpool_options options = {
1168                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1169                 .idle_timeout = 0,
1170                 .auto_increment = 0,
1171         };
1172
1173         switch (cmd) {
1174         case TEST_INIT:
1175                 info->name = "more_destruction";
1176                 info->category = "/main/threadpool/";
1177                 info->summary = "Test that threads are destroyed as expected";
1178                 info->description =
1179                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1180                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1181                         "threadpool down to 1 thread. Ensure that the thread leftove is active\n"
1182                         "and ensure that both tasks complete.\n";
1183                 return AST_TEST_NOT_RUN;
1184         case TEST_EXECUTE:
1185                 break;
1186         }
1187
1188         listener = ast_threadpool_listener_alloc(&test_callbacks);
1189         if (!listener) {
1190                 return AST_TEST_FAIL;
1191         }
1192         tld = listener->private_data;
1193
1194         pool = ast_threadpool_create(info->name, listener, 0, &options);
1195         if (!pool) {
1196                 goto end;
1197         }
1198
1199         ctd1 = complex_task_data_alloc();
1200         ctd2 = complex_task_data_alloc();
1201         if (!ctd1 || !ctd2) {
1202                 goto end;
1203         }
1204
1205         ast_threadpool_push(pool, complex_task, ctd1);
1206         ast_threadpool_push(pool, complex_task, ctd2);
1207
1208         ast_threadpool_set_size(pool, 4);
1209
1210         res = wait_until_thread_state(test, tld, 2, 2);
1211         if (res == AST_TEST_FAIL) {
1212                 goto end;
1213         }
1214
1215         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1216         if (res == AST_TEST_FAIL) {
1217                 goto end;
1218         }
1219
1220         ast_threadpool_set_size(pool, 1);
1221
1222         /* Shrinking the threadpool should kill off the two idle threads
1223          * and one of the active threads.
1224          */
1225         res = wait_until_thread_state(test, tld, 1, 0);
1226         if (res == AST_TEST_FAIL) {
1227                 goto end;
1228         }
1229
1230         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1231         if (res == AST_TEST_FAIL) {
1232                 goto end;
1233         }
1234
1235         /* The tasks are stalled until we poke them */
1236         poke_worker(ctd1);
1237         poke_worker(ctd2);
1238
1239         res = wait_for_complex_completion(ctd1);
1240         if (res == AST_TEST_FAIL) {
1241                 goto end;
1242         }
1243         res = wait_for_complex_completion(ctd2);
1244         if (res == AST_TEST_FAIL) {
1245                 goto end;
1246         }
1247
1248         res = wait_until_thread_state(test, tld, 0, 1);
1249         if (res == AST_TEST_FAIL) {
1250                 goto end;
1251         }
1252
1253         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1254
1255 end:
1256         if (pool) {
1257                 ast_threadpool_shutdown(pool);
1258         }
1259         ao2_cleanup(listener);
1260         ast_free(ctd1);
1261         ast_free(ctd2);
1262         return res;
1263 }
1264
1265 static int unload_module(void)
1266 {
1267         ast_test_unregister(threadpool_push);
1268         ast_test_unregister(threadpool_initial_threads);
1269         ast_test_unregister(threadpool_thread_creation);
1270         ast_test_unregister(threadpool_thread_destruction);
1271         ast_test_unregister(threadpool_thread_timeout);
1272         ast_test_unregister(threadpool_one_task_one_thread);
1273         ast_test_unregister(threadpool_one_thread_one_task);
1274         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1275         ast_test_unregister(threadpool_auto_increment);
1276         ast_test_unregister(threadpool_reactivation);
1277         ast_test_unregister(threadpool_task_distribution);
1278         ast_test_unregister(threadpool_more_destruction);
1279         return 0;
1280 }
1281
1282 static int load_module(void)
1283 {
1284         ast_test_register(threadpool_push);
1285         ast_test_register(threadpool_initial_threads);
1286         ast_test_register(threadpool_thread_creation);
1287         ast_test_register(threadpool_thread_destruction);
1288         ast_test_register(threadpool_thread_timeout);
1289         ast_test_register(threadpool_one_task_one_thread);
1290         ast_test_register(threadpool_one_thread_one_task);
1291         ast_test_register(threadpool_one_thread_multiple_tasks);
1292         ast_test_register(threadpool_auto_increment);
1293         ast_test_register(threadpool_reactivation);
1294         ast_test_register(threadpool_task_distribution);
1295         ast_test_register(threadpool_more_destruction);
1296         return AST_MODULE_LOAD_SUCCESS;
1297 }
1298
1299 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");