Merge "doc/lang/language-criteria.txt: Link to wiki."
[asterisk/asterisk.git] / tests / test_threadpool.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief threadpool unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/lock.h"
36 #include "asterisk/logger.h"
37 #include "asterisk/module.h"
38 #include "asterisk/taskprocessor.h"
39 #include "asterisk/test.h"
40 #include "asterisk/threadpool.h"
41
42 struct test_listener_data {
43         int num_active;
44         int num_idle;
45         int task_pushed;
46         int num_tasks;
47         int empty_notice;
48         int was_empty;
49         ast_mutex_t lock;
50         ast_cond_t cond;
51 };
52
53 static struct test_listener_data *test_alloc(void)
54 {
55         struct test_listener_data *tld = ast_calloc(1, sizeof(*tld));
56         if (!tld) {
57                 return NULL;
58         }
59         ast_mutex_init(&tld->lock);
60         ast_cond_init(&tld->cond, NULL);
61         return tld;
62 }
63
64 static void test_state_changed(struct ast_threadpool *pool,
65                 struct ast_threadpool_listener *listener,
66                 int active_threads,
67                 int idle_threads)
68 {
69         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
70         SCOPED_MUTEX(lock, &tld->lock);
71         tld->num_active = active_threads;
72         tld->num_idle = idle_threads;
73         ast_log(LOG_NOTICE, "Thread state: %d active, %d idle\n", tld->num_active, tld->num_idle);
74         ast_cond_signal(&tld->cond);
75 }
76
77 static void test_task_pushed(struct ast_threadpool *pool,
78                 struct ast_threadpool_listener *listener,
79                 int was_empty)
80 {
81         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
82         SCOPED_MUTEX(lock, &tld->lock);
83         tld->task_pushed = 1;
84         ++tld->num_tasks;
85         tld->was_empty = was_empty;
86         ast_cond_signal(&tld->cond);
87 }
88
89 static void test_emptied(struct ast_threadpool *pool,
90                 struct ast_threadpool_listener *listener)
91 {
92         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
93         SCOPED_MUTEX(lock, &tld->lock);
94         tld->empty_notice = 1;
95         ast_cond_signal(&tld->cond);
96 }
97
98 static void test_shutdown(struct ast_threadpool_listener *listener)
99 {
100         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
101         ast_cond_destroy(&tld->cond);
102         ast_mutex_destroy(&tld->lock);
103 }
104
105 static const struct ast_threadpool_listener_callbacks test_callbacks = {
106         .state_changed = test_state_changed,
107         .task_pushed = test_task_pushed,
108         .emptied = test_emptied,
109         .shutdown = test_shutdown,
110 };
111
112 struct simple_task_data {
113         int task_executed;
114         ast_mutex_t lock;
115         ast_cond_t cond;
116 };
117
118 static struct simple_task_data *simple_task_data_alloc(void)
119 {
120         struct simple_task_data *std = ast_calloc(1, sizeof(*std));
121
122         if (!std) {
123                 return NULL;
124         }
125         ast_mutex_init(&std->lock);
126         ast_cond_init(&std->cond, NULL);
127         return std;
128 }
129
130 static int simple_task(void *data)
131 {
132         struct simple_task_data *std = data;
133         SCOPED_MUTEX(lock, &std->lock);
134         std->task_executed = 1;
135         ast_cond_signal(&std->cond);
136         return 0;
137 }
138
139 static enum ast_test_result_state wait_until_thread_state(struct ast_test *test, struct test_listener_data *tld, int num_active, int num_idle)
140 {
141         struct timeval start = ast_tvnow();
142         struct timespec end = {
143                 .tv_sec = start.tv_sec + 5,
144                 .tv_nsec = start.tv_usec * 1000
145         };
146         enum ast_test_result_state res = AST_TEST_PASS;
147         SCOPED_MUTEX(lock, &tld->lock);
148
149         while (!(tld->num_active == num_active && tld->num_idle == num_idle)) {
150                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
151                         break;
152                 }
153         }
154
155         if (tld->num_active != num_active && tld->num_idle != num_idle) {
156                 ast_test_status_update(test, "Number of active threads and idle threads not what was expected.\n");
157                 ast_test_status_update(test, "Expected %d active threads but got %d\n", num_active, tld->num_active);
158                 ast_test_status_update(test, "Expected %d idle threads but got %d\n", num_idle, tld->num_idle);
159                 res = AST_TEST_FAIL;
160         }
161
162         return res;
163 }
164
165 static void wait_for_task_pushed(struct ast_threadpool_listener *listener)
166 {
167         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
168         struct timeval start = ast_tvnow();
169         struct timespec end = {
170                 .tv_sec = start.tv_sec + 5,
171                 .tv_nsec = start.tv_usec * 1000
172         };
173         SCOPED_MUTEX(lock, &tld->lock);
174
175         while (!tld->task_pushed) {
176                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
177                         break;
178                 }
179         }
180 }
181
182 static enum ast_test_result_state wait_for_completion(struct ast_test *test, struct simple_task_data *std)
183 {
184         struct timeval start = ast_tvnow();
185         struct timespec end = {
186                 .tv_sec = start.tv_sec + 5,
187                 .tv_nsec = start.tv_usec * 1000
188         };
189         enum ast_test_result_state res = AST_TEST_PASS;
190         SCOPED_MUTEX(lock, &std->lock);
191
192         while (!std->task_executed) {
193                 if (ast_cond_timedwait(&std->cond, lock, &end) == ETIMEDOUT) {
194                         break;
195                 }
196         }
197
198         if (!std->task_executed) {
199                 ast_test_status_update(test, "Task execution did not occur\n");
200                 res = AST_TEST_FAIL;
201         }
202         return res;
203 }
204
205 static enum ast_test_result_state wait_for_empty_notice(struct ast_test *test, struct test_listener_data *tld)
206 {
207         struct timeval start = ast_tvnow();
208         struct timespec end = {
209                 .tv_sec = start.tv_sec + 5,
210                 .tv_nsec = start.tv_usec * 1000
211         };
212         enum ast_test_result_state res = AST_TEST_PASS;
213         SCOPED_MUTEX(lock, &tld->lock);
214
215         while (!tld->empty_notice) {
216                 if (ast_cond_timedwait(&tld->cond, lock, &end) == ETIMEDOUT) {
217                         break;
218                 }
219         }
220
221         if (!tld->empty_notice) {
222                 ast_test_status_update(test, "Test listener not notified that threadpool is empty\n");
223                 res = AST_TEST_FAIL;
224         }
225
226         return res;
227 }
228
229 static enum ast_test_result_state listener_check(
230                 struct ast_test *test,
231                 struct ast_threadpool_listener *listener,
232                 int task_pushed,
233                 int was_empty,
234                 int num_tasks,
235                 int num_active,
236                 int num_idle,
237                 int empty_notice)
238 {
239         struct test_listener_data *tld = ast_threadpool_listener_get_user_data(listener);
240         enum ast_test_result_state res = AST_TEST_PASS;
241
242         if (tld->task_pushed != task_pushed) {
243                 ast_test_status_update(test, "Expected task %sto be pushed, but it was%s\n",
244                                 task_pushed ? "" : "not ", tld->task_pushed ? "" : " not");
245                 res = AST_TEST_FAIL;
246         }
247         if (tld->was_empty != was_empty) {
248                 ast_test_status_update(test, "Expected %sto be empty, but it was%s\n",
249                                 was_empty ? "" : "not ", tld->was_empty ? "" : " not");
250                 res = AST_TEST_FAIL;
251         }
252         if (tld->num_tasks!= num_tasks) {
253                 ast_test_status_update(test, "Expected %d tasks to be pushed, but got %d\n",
254                                 num_tasks, tld->num_tasks);
255                 res = AST_TEST_FAIL;
256         }
257         if (tld->num_active != num_active) {
258                 ast_test_status_update(test, "Expected %d active threads, but got %d\n",
259                                 num_active, tld->num_active);
260                 res = AST_TEST_FAIL;
261         }
262         if (tld->num_idle != num_idle) {
263                 ast_test_status_update(test, "Expected %d idle threads, but got %d\n",
264                                 num_idle, tld->num_idle);
265                 res = AST_TEST_FAIL;
266         }
267         if (tld->empty_notice != empty_notice) {
268                 ast_test_status_update(test, "Expected %s empty notice, but got %s\n",
269                                 was_empty ? "an" : "no", tld->task_pushed ? "one" : "none");
270                 res = AST_TEST_FAIL;
271         }
272
273         return res;
274 }
275
276 AST_TEST_DEFINE(threadpool_push)
277 {
278         struct ast_threadpool *pool = NULL;
279         struct ast_threadpool_listener *listener = NULL;
280         struct simple_task_data *std = NULL;
281         struct test_listener_data *tld = NULL;
282         enum ast_test_result_state res = AST_TEST_FAIL;
283         struct ast_threadpool_options options = {
284                 .version = AST_THREADPOOL_OPTIONS_VERSION,
285                 .idle_timeout = 0,
286                 .auto_increment = 0,
287                 .initial_size = 0,
288                 .max_size = 0,
289         };
290
291         switch (cmd) {
292         case TEST_INIT:
293                 info->name = "push";
294                 info->category = "/main/threadpool/";
295                 info->summary = "Test task";
296                 info->description =
297                         "Basic threadpool test";
298                 return AST_TEST_NOT_RUN;
299         case TEST_EXECUTE:
300                 break;
301         }
302         tld = test_alloc();
303         if (!tld) {
304                 return AST_TEST_FAIL;
305         }
306
307         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
308         if (!listener) {
309                 goto end;
310         }
311
312         pool = ast_threadpool_create(info->name, listener, &options);
313         if (!pool) {
314                 goto end;
315         }
316
317         std = simple_task_data_alloc();
318         if (!std) {
319                 goto end;
320         }
321
322         ast_threadpool_push(pool, simple_task, std);
323
324         wait_for_task_pushed(listener);
325
326         res = listener_check(test, listener, 1, 1, 1, 0, 0, 0);
327
328 end:
329         ast_threadpool_shutdown(pool);
330         ao2_cleanup(listener);
331         ast_free(std);
332         ast_free(tld);
333         return res;
334 }
335
336 AST_TEST_DEFINE(threadpool_initial_threads)
337 {
338         struct ast_threadpool *pool = NULL;
339         struct ast_threadpool_listener *listener = NULL;
340         enum ast_test_result_state res = AST_TEST_FAIL;
341         struct test_listener_data *tld = NULL;
342         struct ast_threadpool_options options = {
343                 .version = AST_THREADPOOL_OPTIONS_VERSION,
344                 .idle_timeout = 0,
345                 .auto_increment = 0,
346                 .initial_size = 3,
347                 .max_size = 0,
348         };
349
350         switch (cmd) {
351         case TEST_INIT:
352                 info->name = "initial_threads";
353                 info->category = "/main/threadpool/";
354                 info->summary = "Test threadpool initialization state";
355                 info->description =
356                         "Ensure that a threadpool created with a specific size contains the\n"
357                         "proper number of idle threads.";
358                 return AST_TEST_NOT_RUN;
359         case TEST_EXECUTE:
360                 break;
361         }
362
363         tld = test_alloc();
364         if (!tld) {
365                 return AST_TEST_FAIL;
366         }
367
368         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
369         if (!listener) {
370                 goto end;
371         }
372
373         pool = ast_threadpool_create(info->name, listener, &options);
374         if (!pool) {
375                 goto end;
376         }
377
378         res = wait_until_thread_state(test, tld, 0, 3);
379
380 end:
381         ast_threadpool_shutdown(pool);
382         ao2_cleanup(listener);
383         ast_free(tld);
384         return res;
385 }
386
387
388 AST_TEST_DEFINE(threadpool_thread_creation)
389 {
390         struct ast_threadpool *pool = NULL;
391         struct ast_threadpool_listener *listener = NULL;
392         enum ast_test_result_state res = AST_TEST_FAIL;
393         struct test_listener_data *tld = NULL;
394         struct ast_threadpool_options options = {
395                 .version = AST_THREADPOOL_OPTIONS_VERSION,
396                 .idle_timeout = 0,
397                 .auto_increment = 0,
398                 .initial_size = 0,
399                 .max_size = 0,
400         };
401
402         switch (cmd) {
403         case TEST_INIT:
404                 info->name = "thread_creation";
405                 info->category = "/main/threadpool/";
406                 info->summary = "Test threadpool thread creation";
407                 info->description =
408                         "Ensure that threads can be added to a threadpool";
409                 return AST_TEST_NOT_RUN;
410         case TEST_EXECUTE:
411                 break;
412         }
413
414         tld = test_alloc();
415         if (!tld) {
416                 return AST_TEST_FAIL;
417         }
418
419         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
420         if (!listener) {
421                 goto end;
422         }
423
424         pool = ast_threadpool_create(info->name, listener, &options);
425         if (!pool) {
426                 goto end;
427         }
428
429         /* Now let's create a thread. It should start active, then go
430          * idle immediately
431          */
432         ast_threadpool_set_size(pool, 1);
433
434         res = wait_until_thread_state(test, tld, 0, 1);
435
436 end:
437         ast_threadpool_shutdown(pool);
438         ao2_cleanup(listener);
439         ast_free(tld);
440         return res;
441 }
442
443 AST_TEST_DEFINE(threadpool_thread_destruction)
444 {
445         struct ast_threadpool *pool = NULL;
446         struct ast_threadpool_listener *listener = NULL;
447         enum ast_test_result_state res = AST_TEST_FAIL;
448         struct test_listener_data *tld = NULL;
449         struct ast_threadpool_options options = {
450                 .version = AST_THREADPOOL_OPTIONS_VERSION,
451                 .idle_timeout = 0,
452                 .auto_increment = 0,
453                 .initial_size = 0,
454                 .max_size = 0,
455         };
456
457         switch (cmd) {
458         case TEST_INIT:
459                 info->name = "thread_destruction";
460                 info->category = "/main/threadpool/";
461                 info->summary = "Test threadpool thread destruction";
462                 info->description =
463                         "Ensure that threads are properly destroyed in a threadpool";
464                 return AST_TEST_NOT_RUN;
465         case TEST_EXECUTE:
466                 break;
467         }
468
469         tld = test_alloc();
470         if (!tld) {
471                 return AST_TEST_FAIL;
472         }
473
474         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
475         if (!listener) {
476                 goto end;
477         }
478
479         pool = ast_threadpool_create(info->name, listener, &options);
480         if (!pool) {
481                 goto end;
482         }
483
484         ast_threadpool_set_size(pool, 3);
485
486         res = wait_until_thread_state(test, tld, 0, 3);
487         if (res == AST_TEST_FAIL) {
488                 goto end;
489         }
490
491         res = listener_check(test, listener, 0, 0, 0, 0, 3, 0);
492         if (res == AST_TEST_FAIL) {
493                 goto end;
494         }
495
496         ast_threadpool_set_size(pool, 2);
497
498         res = wait_until_thread_state(test, tld, 0, 2);
499
500 end:
501         ast_threadpool_shutdown(pool);
502         ao2_cleanup(listener);
503         ast_free(tld);
504         return res;
505 }
506
507 AST_TEST_DEFINE(threadpool_thread_timeout)
508 {
509         struct ast_threadpool *pool = NULL;
510         struct ast_threadpool_listener *listener = NULL;
511         enum ast_test_result_state res = AST_TEST_FAIL;
512         struct test_listener_data *tld = NULL;
513         struct ast_threadpool_options options = {
514                 .version = AST_THREADPOOL_OPTIONS_VERSION,
515                 .idle_timeout = 2,
516                 .auto_increment = 0,
517                 .initial_size = 0,
518                 .max_size = 0,
519         };
520
521         switch (cmd) {
522         case TEST_INIT:
523                 info->name = "thread_timeout";
524                 info->category = "/main/threadpool/";
525                 info->summary = "Test threadpool thread timeout";
526                 info->description =
527                         "Ensure that a thread with a two second timeout dies as expected.";
528                 return AST_TEST_NOT_RUN;
529         case TEST_EXECUTE:
530                 break;
531         }
532
533         tld = test_alloc();
534         if (!tld) {
535                 return AST_TEST_FAIL;
536         }
537
538         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
539         if (!listener) {
540                 goto end;
541         }
542
543         pool = ast_threadpool_create(info->name, listener, &options);
544         if (!pool) {
545                 goto end;
546         }
547
548         ast_threadpool_set_size(pool, 1);
549
550         res = wait_until_thread_state(test, tld, 0, 1);
551         if (res == AST_TEST_FAIL) {
552                 goto end;
553         }
554
555         res = listener_check(test, listener, 0, 0, 0, 0, 1, 0);
556         if (res == AST_TEST_FAIL) {
557                 goto end;
558         }
559
560         res = wait_until_thread_state(test, tld, 0, 0);
561         if (res == AST_TEST_FAIL) {
562                 goto end;
563         }
564
565         res = listener_check(test, listener, 0, 0, 0, 0, 0, 0);
566
567 end:
568         ast_threadpool_shutdown(pool);
569         ao2_cleanup(listener);
570         ast_free(tld);
571         return res;
572 }
573
574 AST_TEST_DEFINE(threadpool_thread_timeout_thrash)
575 {
576         struct ast_threadpool *pool = NULL;
577         struct ast_threadpool_listener *listener = NULL;
578         enum ast_test_result_state res = AST_TEST_FAIL;
579         struct test_listener_data *tld = NULL;
580         struct ast_threadpool_options options = {
581                 .version = AST_THREADPOOL_OPTIONS_VERSION,
582                 .idle_timeout = 1,
583                 .auto_increment = 1,
584                 .initial_size = 0,
585                 .max_size = 1,
586         };
587         int iteration;
588
589         switch (cmd) {
590         case TEST_INIT:
591                 info->name = "thread_timeout_thrash";
592                 info->category = "/main/threadpool/";
593                 info->summary = "Thrash threadpool thread timeout";
594                 info->description =
595                         "Repeatedly queue a task when a threadpool thread should timeout.";
596                 return AST_TEST_NOT_RUN;
597         case TEST_EXECUTE:
598                 break;
599         }
600
601         tld = test_alloc();
602         if (!tld) {
603                 return AST_TEST_FAIL;
604         }
605
606         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
607         if (!listener) {
608                 goto end;
609         }
610
611         pool = ast_threadpool_create(info->name, listener, &options);
612         if (!pool) {
613                 goto end;
614         }
615
616         ast_threadpool_set_size(pool, 1);
617
618         for (iteration = 0; iteration < 30; ++iteration) {
619                 struct simple_task_data *std = NULL;
620                 struct timeval start = ast_tvnow();
621                 struct timespec end = {
622                         .tv_sec = start.tv_sec + options.idle_timeout,
623                         .tv_nsec = start.tv_usec * 1000
624                 };
625
626                 std = simple_task_data_alloc();
627                 if (!std) {
628                         goto end;
629                 }
630
631                 /* Wait until the threadpool thread should timeout due to being idle */
632                 ast_mutex_lock(&tld->lock);
633                 while (ast_cond_timedwait(&tld->cond, &tld->lock, &end) != ETIMEDOUT) {
634                         /* This purposely left empty as we want to loop waiting for a time out */
635                 }
636                 ast_mutex_unlock(&tld->lock);
637
638                 ast_threadpool_push(pool, simple_task, std);
639
640                 res = wait_for_completion(test, std);
641
642                 ast_free(std);
643
644                 if (res == AST_TEST_FAIL) {
645                         goto end;
646                 }
647         }
648
649         res = wait_until_thread_state(test, tld, 0, 0);
650         if (res == AST_TEST_FAIL) {
651                 goto end;
652         }
653
654         res = listener_check(test, listener, 1, 1, 30, 0, 0, 1);
655
656 end:
657         ast_threadpool_shutdown(pool);
658         ao2_cleanup(listener);
659         ast_free(tld);
660         return res;
661 }
662
663 AST_TEST_DEFINE(threadpool_one_task_one_thread)
664 {
665         struct ast_threadpool *pool = NULL;
666         struct ast_threadpool_listener *listener = NULL;
667         struct simple_task_data *std = NULL;
668         enum ast_test_result_state res = AST_TEST_FAIL;
669         struct test_listener_data *tld = NULL;
670         struct ast_threadpool_options options = {
671                 .version = AST_THREADPOOL_OPTIONS_VERSION,
672                 .idle_timeout = 0,
673                 .auto_increment = 0,
674                 .initial_size = 0,
675                 .max_size = 0,
676         };
677
678         switch (cmd) {
679         case TEST_INIT:
680                 info->name = "one_task_one_thread";
681                 info->category = "/main/threadpool/";
682                 info->summary = "Test a single task with a single thread";
683                 info->description =
684                         "Push a task into an empty threadpool, then add a thread to the pool.";
685                 return AST_TEST_NOT_RUN;
686         case TEST_EXECUTE:
687                 break;
688         }
689
690         tld = test_alloc();
691         if (!tld) {
692                 return AST_TEST_FAIL;
693         }
694
695         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
696         if (!listener) {
697                 goto end;
698         }
699
700         pool = ast_threadpool_create(info->name, listener, &options);
701         if (!pool) {
702                 goto end;
703         }
704
705         std = simple_task_data_alloc();
706         if (!std) {
707                 goto end;
708         }
709
710         ast_threadpool_push(pool, simple_task, std);
711
712         ast_threadpool_set_size(pool, 1);
713
714         /* Threads added to the pool are active when they start,
715          * so the newly-created thread should immediately execute
716          * the waiting task.
717          */
718         res = wait_for_completion(test, std);
719         if (res == AST_TEST_FAIL) {
720                 goto end;
721         }
722
723         res = wait_for_empty_notice(test, tld);
724         if (res == AST_TEST_FAIL) {
725                 goto end;
726         }
727
728         /* After completing the task, the thread should go idle */
729         res = wait_until_thread_state(test, tld, 0, 1);
730         if (res == AST_TEST_FAIL) {
731                 goto end;
732         }
733
734         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
735
736 end:
737         ast_threadpool_shutdown(pool);
738         ao2_cleanup(listener);
739         ast_free(std);
740         ast_free(tld);
741         return res;
742
743 }
744
745 AST_TEST_DEFINE(threadpool_one_thread_one_task)
746 {
747         struct ast_threadpool *pool = NULL;
748         struct ast_threadpool_listener *listener = NULL;
749         struct simple_task_data *std = NULL;
750         enum ast_test_result_state res = AST_TEST_FAIL;
751         struct test_listener_data *tld = NULL;
752         struct ast_threadpool_options options = {
753                 .version = AST_THREADPOOL_OPTIONS_VERSION,
754                 .idle_timeout = 0,
755                 .auto_increment = 0,
756                 .initial_size = 0,
757                 .max_size = 0,
758         };
759
760         switch (cmd) {
761         case TEST_INIT:
762                 info->name = "one_thread_one_task";
763                 info->category = "/main/threadpool/";
764                 info->summary = "Test a single thread with a single task";
765                 info->description =
766                         "Add a thread to the pool and then push a task to it.";
767                 return AST_TEST_NOT_RUN;
768         case TEST_EXECUTE:
769                 break;
770         }
771
772         tld = test_alloc();
773         if (!tld) {
774                 return AST_TEST_FAIL;
775         }
776
777         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
778         if (!listener) {
779                 goto end;
780         }
781
782         pool = ast_threadpool_create(info->name, listener, &options);
783         if (!pool) {
784                 goto end;
785         }
786
787         std = simple_task_data_alloc();
788         if (!std) {
789                 goto end;
790         }
791
792         ast_threadpool_set_size(pool, 1);
793
794         res = wait_until_thread_state(test, tld, 0, 1);
795         if (res == AST_TEST_FAIL) {
796                 goto end;
797         }
798
799         ast_threadpool_push(pool, simple_task, std);
800
801         res = wait_for_completion(test, std);
802         if (res == AST_TEST_FAIL) {
803                 goto end;
804         }
805
806         res = wait_for_empty_notice(test, tld);
807         if (res == AST_TEST_FAIL) {
808                 goto end;
809         }
810
811         /* After completing the task, the thread should go idle */
812         res = wait_until_thread_state(test, tld, 0, 1);
813         if (res == AST_TEST_FAIL) {
814                 goto end;
815         }
816
817         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
818
819 end:
820         ast_threadpool_shutdown(pool);
821         ao2_cleanup(listener);
822         ast_free(std);
823         ast_free(tld);
824         return res;
825 }
826
827 AST_TEST_DEFINE(threadpool_one_thread_multiple_tasks)
828 {
829         struct ast_threadpool *pool = NULL;
830         struct ast_threadpool_listener *listener = NULL;
831         struct simple_task_data *std1 = NULL;
832         struct simple_task_data *std2 = NULL;
833         struct simple_task_data *std3 = NULL;
834         enum ast_test_result_state res = AST_TEST_FAIL;
835         struct test_listener_data *tld = NULL;
836         struct ast_threadpool_options options = {
837                 .version = AST_THREADPOOL_OPTIONS_VERSION,
838                 .idle_timeout = 0,
839                 .auto_increment = 0,
840                 .initial_size = 0,
841                 .max_size = 0,
842         };
843
844         switch (cmd) {
845         case TEST_INIT:
846                 info->name = "one_thread_multiple_tasks";
847                 info->category = "/main/threadpool/";
848                 info->summary = "Test a single thread with multiple tasks";
849                 info->description =
850                         "Add a thread to the pool and then push three tasks to it.";
851                 return AST_TEST_NOT_RUN;
852         case TEST_EXECUTE:
853                 break;
854         }
855
856         tld = test_alloc();
857         if (!tld) {
858                 return AST_TEST_FAIL;
859         }
860
861         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
862         if (!listener) {
863                 goto end;
864         }
865
866         pool = ast_threadpool_create(info->name, listener, &options);
867         if (!pool) {
868                 goto end;
869         }
870
871         std1 = simple_task_data_alloc();
872         std2 = simple_task_data_alloc();
873         std3 = simple_task_data_alloc();
874         if (!std1 || !std2 || !std3) {
875                 goto end;
876         }
877
878         ast_threadpool_set_size(pool, 1);
879
880         res = wait_until_thread_state(test, tld, 0, 1);
881         if (res == AST_TEST_FAIL) {
882                 goto end;
883         }
884
885         ast_threadpool_push(pool, simple_task, std1);
886         ast_threadpool_push(pool, simple_task, std2);
887         ast_threadpool_push(pool, simple_task, std3);
888
889         res = wait_for_completion(test, std1);
890         if (res == AST_TEST_FAIL) {
891                 goto end;
892         }
893         res = wait_for_completion(test, std2);
894         if (res == AST_TEST_FAIL) {
895                 goto end;
896         }
897         res = wait_for_completion(test, std3);
898         if (res == AST_TEST_FAIL) {
899                 goto end;
900         }
901
902         res = wait_for_empty_notice(test, tld);
903         if (res == AST_TEST_FAIL) {
904                 goto end;
905         }
906
907         res = wait_until_thread_state(test, tld, 0, 1);
908         if (res == AST_TEST_FAIL) {
909                 goto end;
910         }
911
912         res = listener_check(test, listener, 1, 0, 3, 0, 1, 1);
913
914 end:
915         ast_threadpool_shutdown(pool);
916         ao2_cleanup(listener);
917         ast_free(std1);
918         ast_free(std2);
919         ast_free(std3);
920         ast_free(tld);
921         return res;
922 }
923
924 static enum ast_test_result_state wait_until_thread_state_task_pushed(struct ast_test *test,
925                 struct test_listener_data *tld, int num_active, int num_idle, int num_tasks)
926 {
927         enum ast_test_result_state res = AST_TEST_PASS;
928         struct timeval start;
929         struct timespec end;
930
931         res = wait_until_thread_state(test, tld, num_active, num_idle);
932         if (res == AST_TEST_FAIL) {
933                 return res;
934         }
935
936         start = ast_tvnow();
937         end.tv_sec = start.tv_sec + 5;
938         end.tv_nsec = start.tv_usec * 1000;
939
940         ast_mutex_lock(&tld->lock);
941
942         while (tld->num_tasks != num_tasks) {
943                 if (ast_cond_timedwait(&tld->cond, &tld->lock, &end) == ETIMEDOUT) {
944                         break;
945                 }
946         }
947
948         if (tld->num_tasks != num_tasks) {
949                 ast_test_status_update(test, "Number of tasks pushed %d does not match expected %d\n",
950                                 tld->num_tasks, num_tasks);
951                 res = AST_TEST_FAIL;
952         }
953
954         ast_mutex_unlock(&tld->lock);
955
956         return res;
957 }
958
959 AST_TEST_DEFINE(threadpool_auto_increment)
960 {
961         struct ast_threadpool *pool = NULL;
962         struct ast_threadpool_listener *listener = NULL;
963         struct simple_task_data *std1 = NULL;
964         struct simple_task_data *std2 = NULL;
965         struct simple_task_data *std3 = NULL;
966         struct simple_task_data *std4 = NULL;
967         enum ast_test_result_state res = AST_TEST_FAIL;
968         struct test_listener_data *tld = NULL;
969         struct ast_threadpool_options options = {
970                 .version = AST_THREADPOOL_OPTIONS_VERSION,
971                 .idle_timeout = 0,
972                 .auto_increment = 3,
973                 .initial_size = 0,
974                 .max_size = 0,
975         };
976
977         switch (cmd) {
978         case TEST_INIT:
979                 info->name = "auto_increment";
980                 info->category = "/main/threadpool/";
981                 info->summary = "Test that the threadpool grows as tasks are added";
982                 info->description =
983                         "Create an empty threadpool and push a task to it. Once the task is\n"
984                         "pushed, the threadpool should add three threads and be able to\n"
985                         "handle the task. The threads should then go idle";
986                 return AST_TEST_NOT_RUN;
987         case TEST_EXECUTE:
988                 break;
989         }
990
991         tld = test_alloc();
992         if (!tld) {
993                 return AST_TEST_FAIL;
994         }
995
996         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
997         if (!listener) {
998                 goto end;
999         }
1000
1001         pool = ast_threadpool_create(info->name, listener, &options);
1002         if (!pool) {
1003                 goto end;
1004         }
1005
1006         std1 = simple_task_data_alloc();
1007         std2 = simple_task_data_alloc();
1008         std3 = simple_task_data_alloc();
1009         std4 = simple_task_data_alloc();
1010         if (!std1 || !std2 || !std3 || !std4) {
1011                 goto end;
1012         }
1013
1014         ast_threadpool_push(pool, simple_task, std1);
1015
1016         /* Pushing the task should result in the threadpool growing
1017          * by three threads. This will allow the task to actually execute
1018          */
1019         res = wait_for_completion(test, std1);
1020         if (res == AST_TEST_FAIL) {
1021                 goto end;
1022         }
1023
1024         res = wait_for_empty_notice(test, tld);
1025         if (res == AST_TEST_FAIL) {
1026                 goto end;
1027         }
1028
1029         res = wait_until_thread_state(test, tld, 0, 3);
1030         if (res == AST_TEST_FAIL) {
1031                 goto end;
1032         }
1033
1034         /* Now push three tasks into the pool and ensure the pool does not
1035          * grow.
1036          */
1037         ast_threadpool_push(pool, simple_task, std2);
1038         ast_threadpool_push(pool, simple_task, std3);
1039         ast_threadpool_push(pool, simple_task, std4);
1040
1041         res = wait_for_completion(test, std2);
1042         if (res == AST_TEST_FAIL) {
1043                 goto end;
1044         }
1045         res = wait_for_completion(test, std3);
1046         if (res == AST_TEST_FAIL) {
1047                 goto end;
1048         }
1049         res = wait_for_completion(test, std4);
1050         if (res == AST_TEST_FAIL) {
1051                 goto end;
1052         }
1053
1054         res = wait_for_empty_notice(test, tld);
1055         if (res == AST_TEST_FAIL) {
1056                 goto end;
1057         }
1058
1059         res = wait_until_thread_state_task_pushed(test, tld, 0, 3, 4);
1060         if (res == AST_TEST_FAIL) {
1061                 goto end;
1062         }
1063
1064 end:
1065         ast_threadpool_shutdown(pool);
1066         ao2_cleanup(listener);
1067         ast_free(std1);
1068         ast_free(std2);
1069         ast_free(std3);
1070         ast_free(std4);
1071         ast_free(tld);
1072         return res;
1073 }
1074
1075 AST_TEST_DEFINE(threadpool_max_size)
1076 {
1077         struct ast_threadpool *pool = NULL;
1078         struct ast_threadpool_listener *listener = NULL;
1079         struct simple_task_data *std = NULL;
1080         enum ast_test_result_state res = AST_TEST_FAIL;
1081         struct test_listener_data *tld = NULL;
1082         struct ast_threadpool_options options = {
1083                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1084                 .idle_timeout = 0,
1085                 .auto_increment = 3,
1086                 .initial_size = 0,
1087                 .max_size = 2,
1088         };
1089
1090         switch (cmd) {
1091         case TEST_INIT:
1092                 info->name = "max_size";
1093                 info->category = "/main/threadpool/";
1094                 info->summary = "Test that the threadpool does not exceed its maximum size restriction";
1095                 info->description =
1096                         "Create an empty threadpool and push a task to it. Once the task is\n"
1097                         "pushed, the threadpool should attempt to grow by three threads, but the\n"
1098                         "pool's restrictions should only allow two threads to be added.";
1099                 return AST_TEST_NOT_RUN;
1100         case TEST_EXECUTE:
1101                 break;
1102         }
1103
1104         tld = test_alloc();
1105         if (!tld) {
1106                 return AST_TEST_FAIL;
1107         }
1108
1109         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1110         if (!listener) {
1111                 goto end;
1112         }
1113
1114         pool = ast_threadpool_create(info->name, listener, &options);
1115         if (!pool) {
1116                 goto end;
1117         }
1118
1119         std = simple_task_data_alloc();
1120         if (!std) {
1121                 goto end;
1122         }
1123
1124         ast_threadpool_push(pool, simple_task, std);
1125
1126         res = wait_for_completion(test, std);
1127         if (res == AST_TEST_FAIL) {
1128                 goto end;
1129         }
1130
1131         res = wait_until_thread_state(test, tld, 0, 2);
1132         if (res == AST_TEST_FAIL) {
1133                 goto end;
1134         }
1135
1136         res = listener_check(test, listener, 1, 1, 1, 0, 2, 1);
1137 end:
1138         ast_threadpool_shutdown(pool);
1139         ao2_cleanup(listener);
1140         ast_free(std);
1141         ast_free(tld);
1142         return res;
1143 }
1144
1145 AST_TEST_DEFINE(threadpool_reactivation)
1146 {
1147         struct ast_threadpool *pool = NULL;
1148         struct ast_threadpool_listener *listener = NULL;
1149         struct simple_task_data *std1 = NULL;
1150         struct simple_task_data *std2 = NULL;
1151         enum ast_test_result_state res = AST_TEST_FAIL;
1152         struct test_listener_data *tld = NULL;
1153         struct ast_threadpool_options options = {
1154                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1155                 .idle_timeout = 0,
1156                 .auto_increment = 0,
1157                 .initial_size = 0,
1158                 .max_size = 0,
1159         };
1160
1161         switch (cmd) {
1162         case TEST_INIT:
1163                 info->name = "reactivation";
1164                 info->category = "/main/threadpool/";
1165                 info->summary = "Test that a threadpool reactivates when work is added";
1166                 info->description =
1167                         "Push a task into a threadpool. Make sure the task executes and the\n"
1168                         "thread goes idle. Then push a second task and ensure that the thread\n"
1169                         "awakens and executes the second task.";
1170                 return AST_TEST_NOT_RUN;
1171         case TEST_EXECUTE:
1172                 break;
1173         }
1174
1175         tld = test_alloc();
1176         if (!tld) {
1177                 return AST_TEST_FAIL;
1178         }
1179
1180         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1181         if (!listener) {
1182                 goto end;
1183         }
1184
1185         pool = ast_threadpool_create(info->name, listener, &options);
1186         if (!pool) {
1187                 goto end;
1188         }
1189
1190         std1 = simple_task_data_alloc();
1191         std2 = simple_task_data_alloc();
1192         if (!std1 || !std2) {
1193                 goto end;
1194         }
1195
1196         ast_threadpool_push(pool, simple_task, std1);
1197
1198         ast_threadpool_set_size(pool, 1);
1199
1200         res = wait_for_completion(test, std1);
1201         if (res == AST_TEST_FAIL) {
1202                 goto end;
1203         }
1204
1205         res = wait_for_empty_notice(test, tld);
1206         if (res == AST_TEST_FAIL) {
1207                 goto end;
1208         }
1209
1210         res = wait_until_thread_state(test, tld, 0, 1);
1211         if (res == AST_TEST_FAIL) {
1212                 goto end;
1213         }
1214
1215         res = listener_check(test, listener, 1, 1, 1, 0, 1, 1);
1216         if (res == AST_TEST_FAIL) {
1217                 goto end;
1218         }
1219
1220         /* Now make sure the threadpool reactivates when we add a second task */
1221         ast_threadpool_push(pool, simple_task, std2);
1222
1223         res = wait_for_completion(test, std2);
1224         if (res == AST_TEST_FAIL) {
1225                 goto end;
1226         }
1227
1228         res = wait_for_empty_notice(test, tld);
1229         if (res == AST_TEST_FAIL) {
1230                 goto end;
1231         }
1232
1233         res = wait_until_thread_state(test, tld, 0, 1);
1234         if (res == AST_TEST_FAIL) {
1235                 goto end;
1236         }
1237
1238         res = listener_check(test, listener, 1, 1, 2, 0, 1, 1);
1239
1240 end:
1241         ast_threadpool_shutdown(pool);
1242         ao2_cleanup(listener);
1243         ast_free(std1);
1244         ast_free(std2);
1245         ast_free(tld);
1246         return res;
1247
1248 }
1249
1250 struct complex_task_data {
1251         int task_started;
1252         int task_executed;
1253         int continue_task;
1254         ast_mutex_t lock;
1255         ast_cond_t stall_cond;
1256         ast_cond_t notify_cond;
1257 };
1258
1259 static struct complex_task_data *complex_task_data_alloc(void)
1260 {
1261         struct complex_task_data *ctd = ast_calloc(1, sizeof(*ctd));
1262
1263         if (!ctd) {
1264                 return NULL;
1265         }
1266         ast_mutex_init(&ctd->lock);
1267         ast_cond_init(&ctd->stall_cond, NULL);
1268         ast_cond_init(&ctd->notify_cond, NULL);
1269         return ctd;
1270 }
1271
1272 static int complex_task(void *data)
1273 {
1274         struct complex_task_data *ctd = data;
1275         SCOPED_MUTEX(lock, &ctd->lock);
1276         /* Notify that we started */
1277         ctd->task_started = 1;
1278         ast_cond_signal(&ctd->notify_cond);
1279         while (!ctd->continue_task) {
1280                 ast_cond_wait(&ctd->stall_cond, lock);
1281         }
1282         /* We got poked. Finish up */
1283         ctd->task_executed = 1;
1284         ast_cond_signal(&ctd->notify_cond);
1285         return 0;
1286 }
1287
1288 static void poke_worker(struct complex_task_data *ctd)
1289 {
1290         SCOPED_MUTEX(lock, &ctd->lock);
1291         ctd->continue_task = 1;
1292         ast_cond_signal(&ctd->stall_cond);
1293 }
1294
1295 static int wait_for_complex_start(struct complex_task_data *ctd)
1296 {
1297         struct timeval start = ast_tvnow();
1298         struct timespec end = {
1299                 .tv_sec = start.tv_sec + 5,
1300                 .tv_nsec = start.tv_usec * 1000
1301         };
1302         SCOPED_MUTEX(lock, &ctd->lock);
1303
1304         while (!ctd->task_started) {
1305                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1306                         break;
1307                 }
1308         }
1309
1310         return ctd->task_started;
1311 }
1312
1313 static int has_complex_started(struct complex_task_data *ctd)
1314 {
1315         struct timeval start = ast_tvnow();
1316         struct timespec end = {
1317                 .tv_sec = start.tv_sec + 1,
1318                 .tv_nsec = start.tv_usec * 1000
1319         };
1320         SCOPED_MUTEX(lock, &ctd->lock);
1321
1322         while (!ctd->task_started) {
1323                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1324                         break;
1325                 }
1326         }
1327
1328         return ctd->task_started;
1329 }
1330
1331 static enum ast_test_result_state wait_for_complex_completion(struct complex_task_data *ctd)
1332 {
1333         struct timeval start = ast_tvnow();
1334         struct timespec end = {
1335                 .tv_sec = start.tv_sec + 5,
1336                 .tv_nsec = start.tv_usec * 1000
1337         };
1338         enum ast_test_result_state res = AST_TEST_PASS;
1339         SCOPED_MUTEX(lock, &ctd->lock);
1340
1341         while (!ctd->task_executed) {
1342                 if (ast_cond_timedwait(&ctd->notify_cond, lock, &end) == ETIMEDOUT) {
1343                         break;
1344                 }
1345         }
1346
1347         if (!ctd->task_executed) {
1348                 res = AST_TEST_FAIL;
1349         }
1350         return res;
1351 }
1352
1353 AST_TEST_DEFINE(threadpool_task_distribution)
1354 {
1355         struct ast_threadpool *pool = NULL;
1356         struct ast_threadpool_listener *listener = NULL;
1357         struct complex_task_data *ctd1 = NULL;
1358         struct complex_task_data *ctd2 = NULL;
1359         enum ast_test_result_state res = AST_TEST_FAIL;
1360         struct test_listener_data *tld = NULL;
1361         struct ast_threadpool_options options = {
1362                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1363                 .idle_timeout = 0,
1364                 .auto_increment = 0,
1365                 .initial_size = 0,
1366                 .max_size = 0,
1367         };
1368
1369         switch (cmd) {
1370         case TEST_INIT:
1371                 info->name = "task_distribution";
1372                 info->category = "/main/threadpool/";
1373                 info->summary = "Test that tasks are evenly distributed to threads";
1374                 info->description =
1375                         "Push two tasks into a threadpool. Ensure that each is handled by\n"
1376                         "a separate thread";
1377                 return AST_TEST_NOT_RUN;
1378         case TEST_EXECUTE:
1379                 break;
1380         }
1381
1382         tld = test_alloc();
1383         if (!tld) {
1384                 return AST_TEST_FAIL;
1385         }
1386
1387         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1388         if (!listener) {
1389                 goto end;
1390         }
1391
1392         pool = ast_threadpool_create(info->name, listener, &options);
1393         if (!pool) {
1394                 goto end;
1395         }
1396
1397         ctd1 = complex_task_data_alloc();
1398         ctd2 = complex_task_data_alloc();
1399         if (!ctd1 || !ctd2) {
1400                 goto end;
1401         }
1402
1403         ast_threadpool_push(pool, complex_task, ctd1);
1404         ast_threadpool_push(pool, complex_task, ctd2);
1405
1406         ast_threadpool_set_size(pool, 2);
1407
1408         res = wait_until_thread_state(test, tld, 2, 0);
1409         if (res == AST_TEST_FAIL) {
1410                 goto end;
1411         }
1412
1413         res = listener_check(test, listener, 1, 0, 2, 2, 0, 0);
1414         if (res == AST_TEST_FAIL) {
1415                 goto end;
1416         }
1417
1418         /* The tasks are stalled until we poke them */
1419         poke_worker(ctd1);
1420         poke_worker(ctd2);
1421
1422         res = wait_for_complex_completion(ctd1);
1423         if (res == AST_TEST_FAIL) {
1424                 goto end;
1425         }
1426         res = wait_for_complex_completion(ctd2);
1427         if (res == AST_TEST_FAIL) {
1428                 goto end;
1429         }
1430
1431         res = wait_until_thread_state(test, tld, 0, 2);
1432         if (res == AST_TEST_FAIL) {
1433                 goto end;
1434         }
1435
1436         res = listener_check(test, listener, 1, 0, 2, 0, 2, 1);
1437
1438 end:
1439         ast_threadpool_shutdown(pool);
1440         ao2_cleanup(listener);
1441         ast_free(ctd1);
1442         ast_free(ctd2);
1443         ast_free(tld);
1444         return res;
1445 }
1446
1447 AST_TEST_DEFINE(threadpool_more_destruction)
1448 {
1449         struct ast_threadpool *pool = NULL;
1450         struct ast_threadpool_listener *listener = NULL;
1451         struct complex_task_data *ctd1 = NULL;
1452         struct complex_task_data *ctd2 = NULL;
1453         enum ast_test_result_state res = AST_TEST_FAIL;
1454         struct test_listener_data *tld = NULL;
1455         struct ast_threadpool_options options = {
1456                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1457                 .idle_timeout = 0,
1458                 .auto_increment = 0,
1459                 .initial_size = 0,
1460                 .max_size = 0,
1461         };
1462
1463         switch (cmd) {
1464         case TEST_INIT:
1465                 info->name = "more_destruction";
1466                 info->category = "/main/threadpool/";
1467                 info->summary = "Test that threads are destroyed as expected";
1468                 info->description =
1469                         "Push two tasks into a threadpool. Set the threadpool size to 4\n"
1470                         "Ensure that there are 2 active and 2 idle threads. Then shrink the\n"
1471                         "threadpool down to 1 thread. Ensure that the thread leftover is active\n"
1472                         "and ensure that both tasks complete.";
1473                 return AST_TEST_NOT_RUN;
1474         case TEST_EXECUTE:
1475                 break;
1476         }
1477
1478         tld = test_alloc();
1479         if (!tld) {
1480                 return AST_TEST_FAIL;
1481         }
1482
1483         listener = ast_threadpool_listener_alloc(&test_callbacks, tld);
1484         if (!listener) {
1485                 goto end;
1486         }
1487
1488         pool = ast_threadpool_create(info->name, listener, &options);
1489         if (!pool) {
1490                 goto end;
1491         }
1492
1493         ctd1 = complex_task_data_alloc();
1494         ctd2 = complex_task_data_alloc();
1495         if (!ctd1 || !ctd2) {
1496                 goto end;
1497         }
1498
1499         ast_threadpool_push(pool, complex_task, ctd1);
1500         ast_threadpool_push(pool, complex_task, ctd2);
1501
1502         ast_threadpool_set_size(pool, 4);
1503
1504         res = wait_until_thread_state(test, tld, 2, 2);
1505         if (res == AST_TEST_FAIL) {
1506                 goto end;
1507         }
1508
1509         res = listener_check(test, listener, 1, 0, 2, 2, 2, 0);
1510         if (res == AST_TEST_FAIL) {
1511                 goto end;
1512         }
1513
1514         ast_threadpool_set_size(pool, 1);
1515
1516         /* Shrinking the threadpool should kill off the two idle threads
1517          * and one of the active threads.
1518          */
1519         res = wait_until_thread_state(test, tld, 1, 0);
1520         if (res == AST_TEST_FAIL) {
1521                 goto end;
1522         }
1523
1524         res = listener_check(test, listener, 1, 0, 2, 1, 0, 0);
1525         if (res == AST_TEST_FAIL) {
1526                 goto end;
1527         }
1528
1529         /* The tasks are stalled until we poke them */
1530         poke_worker(ctd1);
1531         poke_worker(ctd2);
1532
1533         res = wait_for_complex_completion(ctd1);
1534         if (res == AST_TEST_FAIL) {
1535                 goto end;
1536         }
1537         res = wait_for_complex_completion(ctd2);
1538         if (res == AST_TEST_FAIL) {
1539                 goto end;
1540         }
1541
1542         res = wait_until_thread_state(test, tld, 0, 1);
1543         if (res == AST_TEST_FAIL) {
1544                 goto end;
1545         }
1546
1547         res = listener_check(test, listener, 1, 0, 2, 0, 1, 1);
1548
1549 end:
1550         ast_threadpool_shutdown(pool);
1551         ao2_cleanup(listener);
1552         ast_free(ctd1);
1553         ast_free(ctd2);
1554         ast_free(tld);
1555         return res;
1556 }
1557
1558 AST_TEST_DEFINE(threadpool_serializer)
1559 {
1560         int started = 0;
1561         int finished = 0;
1562         enum ast_test_result_state res = AST_TEST_FAIL;
1563         struct ast_threadpool *pool = NULL;
1564         struct ast_taskprocessor *uut = NULL;
1565         struct complex_task_data *data1 = NULL;
1566         struct complex_task_data *data2 = NULL;
1567         struct complex_task_data *data3 = NULL;
1568         struct ast_threadpool_options options = {
1569                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1570                 .idle_timeout = 0,
1571                 .auto_increment = 0,
1572                 .initial_size = 2,
1573                 .max_size = 0,
1574         };
1575
1576         switch (cmd) {
1577         case TEST_INIT:
1578                 info->name = "threadpool_serializer";
1579                 info->category = "/main/threadpool/";
1580                 info->summary = "Test that serializers";
1581                 info->description =
1582                         "Ensures that tasks enqueued to a serialize execute in sequence.";
1583                 return AST_TEST_NOT_RUN;
1584         case TEST_EXECUTE:
1585                 break;
1586         }
1587
1588         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1589         if (!pool) {
1590                 ast_test_status_update(test, "Could not create threadpool\n");
1591                 goto end;
1592         }
1593         uut = ast_threadpool_serializer("ser1", pool);
1594         data1 = complex_task_data_alloc();
1595         data2 = complex_task_data_alloc();
1596         data3 = complex_task_data_alloc();
1597         if (!uut || !data1 || !data2 || !data3) {
1598                 ast_test_status_update(test, "Allocation failed\n");
1599                 goto end;
1600         }
1601
1602         /* This should start right away */
1603         if (ast_taskprocessor_push(uut, complex_task, data1)) {
1604                 ast_test_status_update(test, "Failed to enqueue data1\n");
1605                 goto end;
1606         }
1607         started = wait_for_complex_start(data1);
1608         if (!started) {
1609                 ast_test_status_update(test, "Failed to start data1\n");
1610                 goto end;
1611         }
1612
1613         /* This should not start until data 1 is complete */
1614         if (ast_taskprocessor_push(uut, complex_task, data2)) {
1615                 ast_test_status_update(test, "Failed to enqueue data2\n");
1616                 goto end;
1617         }
1618         started = has_complex_started(data2);
1619         if (started) {
1620                 ast_test_status_update(test, "data2 started out of order\n");
1621                 goto end;
1622         }
1623
1624         /* But the free thread in the pool can still run */
1625         if (ast_threadpool_push(pool, complex_task, data3)) {
1626                 ast_test_status_update(test, "Failed to enqueue data3\n");
1627         }
1628         started = wait_for_complex_start(data3);
1629         if (!started) {
1630                 ast_test_status_update(test, "Failed to start data3\n");
1631                 goto end;
1632         }
1633
1634         /* Finishing data1 should allow data2 to start */
1635         poke_worker(data1);
1636         finished = wait_for_complex_completion(data1) == AST_TEST_PASS;
1637         if (!finished) {
1638                 ast_test_status_update(test, "data1 couldn't finish\n");
1639                 goto end;
1640         }
1641         started = wait_for_complex_start(data2);
1642         if (!started) {
1643                 ast_test_status_update(test, "Failed to start data2\n");
1644                 goto end;
1645         }
1646
1647         /* Finish up */
1648         poke_worker(data2);
1649         finished = wait_for_complex_completion(data2) == AST_TEST_PASS;
1650         if (!finished) {
1651                 ast_test_status_update(test, "data2 couldn't finish\n");
1652                 goto end;
1653         }
1654         poke_worker(data3);
1655         finished = wait_for_complex_completion(data3) == AST_TEST_PASS;
1656         if (!finished) {
1657                 ast_test_status_update(test, "data3 couldn't finish\n");
1658                 goto end;
1659         }
1660
1661         res = AST_TEST_PASS;
1662
1663 end:
1664         poke_worker(data1);
1665         poke_worker(data2);
1666         poke_worker(data3);
1667         ast_taskprocessor_unreference(uut);
1668         ast_threadpool_shutdown(pool);
1669         ast_free(data1);
1670         ast_free(data2);
1671         ast_free(data3);
1672         return res;
1673 }
1674
1675 AST_TEST_DEFINE(threadpool_serializer_dupe)
1676 {
1677         enum ast_test_result_state res = AST_TEST_FAIL;
1678         struct ast_threadpool *pool = NULL;
1679         struct ast_taskprocessor *uut = NULL;
1680         struct ast_taskprocessor *there_can_be_only_one = NULL;
1681         struct ast_threadpool_options options = {
1682                 .version = AST_THREADPOOL_OPTIONS_VERSION,
1683                 .idle_timeout = 0,
1684                 .auto_increment = 0,
1685                 .initial_size = 2,
1686                 .max_size = 0,
1687         };
1688
1689         switch (cmd) {
1690         case TEST_INIT:
1691                 info->name = "threadpool_serializer_dupe";
1692                 info->category = "/main/threadpool/";
1693                 info->summary = "Test that serializers are uniquely named";
1694                 info->description =
1695                         "Creating two serializers with the same name should\n"
1696                         "result in error.";
1697                 return AST_TEST_NOT_RUN;
1698         case TEST_EXECUTE:
1699                 break;
1700         }
1701
1702         pool = ast_threadpool_create("threadpool_serializer", NULL, &options);
1703         if (!pool) {
1704                 ast_test_status_update(test, "Could not create threadpool\n");
1705                 goto end;
1706         }
1707
1708         uut = ast_threadpool_serializer("highlander", pool);
1709         if (!uut) {
1710                 ast_test_status_update(test, "Allocation failed\n");
1711                 goto end;
1712         }
1713
1714         there_can_be_only_one = ast_threadpool_serializer("highlander", pool);
1715         if (there_can_be_only_one) {
1716                 ast_taskprocessor_unreference(there_can_be_only_one);
1717                 ast_test_status_update(test, "Duplicate name error\n");
1718                 goto end;
1719         }
1720
1721         res = AST_TEST_PASS;
1722
1723 end:
1724         ast_taskprocessor_unreference(uut);
1725         ast_threadpool_shutdown(pool);
1726         return res;
1727 }
1728
1729 static int unload_module(void)
1730 {
1731         ast_test_unregister(threadpool_push);
1732         ast_test_unregister(threadpool_initial_threads);
1733         ast_test_unregister(threadpool_thread_creation);
1734         ast_test_unregister(threadpool_thread_destruction);
1735         ast_test_unregister(threadpool_thread_timeout);
1736         ast_test_unregister(threadpool_thread_timeout_thrash);
1737         ast_test_unregister(threadpool_one_task_one_thread);
1738         ast_test_unregister(threadpool_one_thread_one_task);
1739         ast_test_unregister(threadpool_one_thread_multiple_tasks);
1740         ast_test_unregister(threadpool_auto_increment);
1741         ast_test_unregister(threadpool_max_size);
1742         ast_test_unregister(threadpool_reactivation);
1743         ast_test_unregister(threadpool_task_distribution);
1744         ast_test_unregister(threadpool_more_destruction);
1745         ast_test_unregister(threadpool_serializer);
1746         ast_test_unregister(threadpool_serializer_dupe);
1747         return 0;
1748 }
1749
1750 static int load_module(void)
1751 {
1752         ast_test_register(threadpool_push);
1753         ast_test_register(threadpool_initial_threads);
1754         ast_test_register(threadpool_thread_creation);
1755         ast_test_register(threadpool_thread_destruction);
1756         ast_test_register(threadpool_thread_timeout);
1757         ast_test_register(threadpool_thread_timeout_thrash);
1758         ast_test_register(threadpool_one_task_one_thread);
1759         ast_test_register(threadpool_one_thread_one_task);
1760         ast_test_register(threadpool_one_thread_multiple_tasks);
1761         ast_test_register(threadpool_auto_increment);
1762         ast_test_register(threadpool_max_size);
1763         ast_test_register(threadpool_reactivation);
1764         ast_test_register(threadpool_task_distribution);
1765         ast_test_register(threadpool_more_destruction);
1766         ast_test_register(threadpool_serializer);
1767         ast_test_register(threadpool_serializer_dupe);
1768         return AST_MODULE_LOAD_SUCCESS;
1769 }
1770
1771 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "threadpool test module");