Shuffle RESTful URL's around.
[asterisk/asterisk.git] / tests / test_taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012-2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief taskprocessor unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/test.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/module.h"
37 #include "asterisk/astobj2.h"
38
39 /*!
40  * \brief userdata associated with baseline taskprocessor test
41  */
42 struct task_data {
43         /* Condition used to signal to queuing thread that task was executed */
44         ast_cond_t cond;
45         /* Lock protecting the condition */
46         ast_mutex_t lock;
47         /*! Boolean indicating that the task was run */
48         int task_complete;
49 };
50
51 /*!
52  * \brief Queued task for baseline test.
53  *
54  * The task simply sets a boolean to indicate the
55  * task has been run and then signals a condition
56  * saying it's complete
57  */
58 static int task(void *data)
59 {
60         struct task_data *task_data = data;
61         SCOPED_MUTEX(lock, &task_data->lock);
62         task_data->task_complete = 1;
63         ast_cond_signal(&task_data->cond);
64         return 0;
65 }
66
67 /*!
68  * \brief Baseline test for default taskprocessor
69  *
70  * This test ensures that when a task is added to a taskprocessor that
71  * has been allocated with a default listener that the task gets executed
72  * as expected
73  */
74 AST_TEST_DEFINE(default_taskprocessor)
75 {
76         struct ast_taskprocessor *tps;
77         struct task_data task_data;
78         struct timeval start;
79         struct timespec ts;
80         enum ast_test_result_state res = AST_TEST_PASS;
81         int timedwait_res;
82
83         switch (cmd) {
84         case TEST_INIT:
85                 info->name = "default_taskprocessor";
86                 info->category = "/main/taskprocessor/";
87                 info->summary = "Test of default taskproccesor";
88                 info->description =
89                         "Ensures that a queued task gets executed.";
90                 return AST_TEST_NOT_RUN;
91         case TEST_EXECUTE:
92                 break;
93         }
94
95         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
96
97         if (!tps) {
98                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
99                 return AST_TEST_FAIL;
100         }
101
102         start = ast_tvnow();
103
104         ts.tv_sec = start.tv_sec + 30;
105         ts.tv_nsec = start.tv_usec * 1000;
106
107         ast_cond_init(&task_data.cond, NULL);
108         ast_mutex_init(&task_data.lock);
109         task_data.task_complete = 0;
110
111         ast_taskprocessor_push(tps, task, &task_data);
112         ast_mutex_lock(&task_data.lock);
113         while (!task_data.task_complete) {
114                 timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
115                 if (timedwait_res == ETIMEDOUT) {
116                         break;
117                 }
118         }
119         ast_mutex_unlock(&task_data.lock);
120
121         if (!task_data.task_complete) {
122                 ast_test_status_update(test, "Queued task did not execute!\n");
123                 res = AST_TEST_FAIL;
124                 goto test_end;
125         }
126
127 test_end:
128         tps = ast_taskprocessor_unreference(tps);
129         ast_mutex_destroy(&task_data.lock);
130         ast_cond_destroy(&task_data.cond);
131         return res;
132 }
133
134 #define NUM_TASKS 20000
135
136 /*!
137  * \brief Relevant data associated with taskprocessor load test
138  */
139 static struct load_task_data {
140         /*! Condition used to indicate a task has completed executing */
141         ast_cond_t cond;
142         /*! Lock used to protect the condition */
143         ast_mutex_t lock;
144         /*! Counter of the number of completed tasks */
145         int tasks_completed;
146         /*! Storage for task-specific data */
147         int task_rand[NUM_TASKS];
148 } load_task_results;
149
150 /*!
151  * \brief a queued task to be used in the taskprocessor load test
152  *
153  * The task increments the number of tasks executed and puts the passed-in
154  * data into the next slot in the array of random data.
155  */
156 static int load_task(void *data)
157 {
158         int *randdata = data;
159         SCOPED_MUTEX(lock, &load_task_results.lock);
160         load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
161         ast_cond_signal(&load_task_results.cond);
162         return 0;
163 }
164
165 /*!
166  * \brief Load test for taskprocessor with default listener
167  *
168  * This test queues a large number of tasks, each with random data associated.
169  * The test ensures that all of the tasks are run and that the tasks are executed
170  * in the same order that they were queued
171  */
172 AST_TEST_DEFINE(default_taskprocessor_load)
173 {
174         struct ast_taskprocessor *tps;
175         struct timeval start;
176         struct timespec ts;
177         enum ast_test_result_state res = AST_TEST_PASS;
178         int timedwait_res;
179         int i;
180         int rand_data[NUM_TASKS];
181
182         switch (cmd) {
183         case TEST_INIT:
184                 info->name = "default_taskprocessor_load";
185                 info->category = "/main/taskprocessor/";
186                 info->summary = "Load test of default taskproccesor";
187                 info->description =
188                         "Ensure that a large number of queued tasks are executed in the proper order.";
189                 return AST_TEST_NOT_RUN;
190         case TEST_EXECUTE:
191                 break;
192         }
193
194         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
195
196         if (!tps) {
197                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
198                 return AST_TEST_FAIL;
199         }
200
201         start = ast_tvnow();
202
203         ts.tv_sec = start.tv_sec + 60;
204         ts.tv_nsec = start.tv_usec * 1000;
205
206         ast_cond_init(&load_task_results.cond, NULL);
207         ast_mutex_init(&load_task_results.lock);
208         load_task_results.tasks_completed = 0;
209
210         for (i = 0; i < NUM_TASKS; ++i) {
211                 rand_data[i] = ast_random();
212                 ast_taskprocessor_push(tps, load_task, &rand_data[i]);
213         }
214
215         ast_mutex_lock(&load_task_results.lock);
216         while (load_task_results.tasks_completed < NUM_TASKS) {
217                 timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
218                 if (timedwait_res == ETIMEDOUT) {
219                         break;
220                 }
221         }
222         ast_mutex_unlock(&load_task_results.lock);
223
224         if (load_task_results.tasks_completed != NUM_TASKS) {
225                 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
226                                 NUM_TASKS, load_task_results.tasks_completed);
227                 res = AST_TEST_FAIL;
228                 goto test_end;
229         }
230
231         for (i = 0; i < NUM_TASKS; ++i) {
232                 if (rand_data[i] != load_task_results.task_rand[i]) {
233                         ast_test_status_update(test, "Queued tasks did not execute in order\n");
234                         res = AST_TEST_FAIL;
235                         goto test_end;
236                 }
237         }
238
239 test_end:
240         tps = ast_taskprocessor_unreference(tps);
241         ast_mutex_destroy(&load_task_results.lock);
242         ast_cond_destroy(&load_task_results.cond);
243         return res;
244 }
245
246 /*!
247  * \brief Private data for the test taskprocessor listener
248  */
249 struct test_listener_pvt {
250         /* Counter of number of tasks pushed to the queue */
251         int num_pushed;
252         /* Counter of number of times the queue was emptied */
253         int num_emptied;
254         /* Counter of number of times that a pushed task occurred on an empty queue */
255         int num_was_empty;
256         /* Boolean indicating whether the shutdown callback was called */
257         int shutdown;
258 };
259
260 /*!
261  * \brief test taskprocessor listener's alloc callback
262  */
263 static void *test_listener_pvt_alloc(void)
264 {
265         struct test_listener_pvt *pvt;
266
267         pvt = ast_calloc(1, sizeof(*pvt));
268         return pvt;
269 }
270
271 /*!
272  * \brief test taskprocessor listener's start callback
273  */
274 static int test_start(struct ast_taskprocessor_listener *listener)
275 {
276         return 0;
277 }
278
279 /*!
280  * \brief test taskprocessor listener's task_pushed callback
281  *
282  * Adjusts private data's stats as indicated by the parameters.
283  */
284 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
285 {
286         struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
287         ++pvt->num_pushed;
288         if (was_empty) {
289                 ++pvt->num_was_empty;
290         }
291 }
292
293 /*!
294  * \brief test taskprocessor listener's emptied callback.
295  */
296 static void test_emptied(struct ast_taskprocessor_listener *listener)
297 {
298         struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
299         ++pvt->num_emptied;
300 }
301
302 /*!
303  * \brief test taskprocessor listener's shutdown callback.
304  */
305 static void test_shutdown(struct ast_taskprocessor_listener *listener)
306 {
307         struct test_listener_pvt *pvt = ast_taskprocessor_listener_get_user_data(listener);
308         pvt->shutdown = 1;
309 }
310
311 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
312         .start = test_start,
313         .task_pushed = test_task_pushed,
314         .emptied = test_emptied,
315         .shutdown = test_shutdown,
316 };
317
318 /*!
319  * \brief Queued task for taskprocessor listener test.
320  *
321  * Does nothing.
322  */
323 static int listener_test_task(void *ignore)
324 {
325         return 0;
326 }
327
328 /*!
329  * \brief helper to ensure that statistics the listener is keeping are what we expect
330  *
331  * \param test The currently-running test
332  * \param pvt The private data for the taskprocessor listener
333  * \param num_pushed The expected current number of tasks pushed to the processor
334  * \param num_emptied The expected current number of times the taskprocessor has become empty
335  * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
336  * \retval -1 Stats were not as expected
337  * \retval 0 Stats were as expected
338  */
339 static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
340 {
341         if (pvt->num_pushed != num_pushed) {
342                 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
343                                 num_pushed, pvt->num_pushed);
344                 return -1;
345         }
346
347         if (pvt->num_emptied != num_emptied) {
348                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
349                                 num_emptied, pvt->num_emptied);
350                 return -1;
351         }
352
353         if (pvt->num_was_empty != num_was_empty) {
354                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
355                                 num_was_empty, pvt->num_emptied);
356                 return -1;
357         }
358
359         return 0;
360 }
361
362 /*!
363  * \brief Test for a taskprocessor with custom listener.
364  *
365  * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss,
366  * and destroys the taskprocessor.
367  *
368  * The test ensures that the listener's callbacks are called when expected and that the data
369  * being passed in is accurate.
370  */
371 AST_TEST_DEFINE(taskprocessor_listener)
372 {
373         struct ast_taskprocessor *tps = NULL;
374         struct ast_taskprocessor_listener *listener = NULL;
375         struct test_listener_pvt *pvt = NULL;
376         enum ast_test_result_state res = AST_TEST_PASS;
377
378         switch (cmd) {
379         case TEST_INIT:
380                 info->name = "taskprocessor_listener";
381                 info->category = "/main/taskprocessor/";
382                 info->summary = "Test of taskproccesor listeners";
383                 info->description =
384                         "Ensures that listener callbacks are called when expected.";
385                 return AST_TEST_NOT_RUN;
386         case TEST_EXECUTE:
387                 break;
388         }
389
390         pvt = test_listener_pvt_alloc();
391         if (!pvt) {
392                 ast_test_status_update(test, "Unable to allocate test taskprocessor listener user data\n");
393                 return AST_TEST_FAIL;
394         }
395
396         listener = ast_taskprocessor_listener_alloc(&test_callbacks, pvt);
397         if (!listener) {
398                 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
399                 res = AST_TEST_FAIL;
400                 goto test_exit;
401         }
402
403         tps = ast_taskprocessor_create_with_listener("test_listener", listener);
404         if (!tps) {
405                 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
406                 res = AST_TEST_FAIL;
407                 goto test_exit;
408         }
409
410         ast_taskprocessor_push(tps, listener_test_task, NULL);
411
412         if (check_stats(test, pvt, 1, 0, 1) < 0) {
413                 res = AST_TEST_FAIL;
414                 goto test_exit;
415         }
416
417         ast_taskprocessor_push(tps, listener_test_task, NULL);
418
419         if (check_stats(test, pvt, 2, 0, 1) < 0) {
420                 res = AST_TEST_FAIL;
421                 goto test_exit;
422         }
423
424         ast_taskprocessor_execute(tps);
425
426         if (check_stats(test, pvt, 2, 0, 1) < 0) {
427                 res = AST_TEST_FAIL;
428                 goto test_exit;
429         }
430
431         ast_taskprocessor_execute(tps);
432
433         if (check_stats(test, pvt, 2, 1, 1) < 0) {
434                 res = AST_TEST_FAIL;
435                 goto test_exit;
436         }
437
438         tps = ast_taskprocessor_unreference(tps);
439
440         if (!pvt->shutdown) {
441                 res = AST_TEST_FAIL;
442                 goto test_exit;
443         }
444
445 test_exit:
446         ao2_cleanup(listener);
447         /* This is safe even if tps is NULL */
448         ast_taskprocessor_unreference(tps);
449         ast_free(pvt);
450         return res;
451 }
452
453 struct shutdown_data {
454         ast_cond_t in;
455         ast_cond_t out;
456         ast_mutex_t lock;
457         int task_complete;
458         int task_started;
459         int task_stop_waiting;
460 };
461
462 static void shutdown_data_dtor(void *data)
463 {
464         struct shutdown_data *shutdown_data = data;
465         ast_mutex_destroy(&shutdown_data->lock);
466         ast_cond_destroy(&shutdown_data->in);
467         ast_cond_destroy(&shutdown_data->out);
468 }
469
470 static struct shutdown_data *shutdown_data_create(int dont_wait)
471 {
472         RAII_VAR(struct shutdown_data *, shutdown_data, NULL, ao2_cleanup);
473
474         shutdown_data = ao2_alloc(sizeof(*shutdown_data), shutdown_data_dtor);
475         if (!shutdown_data) {
476                 return NULL;
477         }
478
479         ast_mutex_init(&shutdown_data->lock);
480         ast_cond_init(&shutdown_data->in, NULL);
481         ast_cond_init(&shutdown_data->out, NULL);
482         shutdown_data->task_stop_waiting = dont_wait;
483         ao2_ref(shutdown_data, +1);
484         return shutdown_data;
485 }
486
487 static int shutdown_task_exec(void *data)
488 {
489         struct shutdown_data *shutdown_data = data;
490         SCOPED_MUTEX(lock, &shutdown_data->lock);
491         shutdown_data->task_started = 1;
492         ast_cond_signal(&shutdown_data->out);
493         while (!shutdown_data->task_stop_waiting) {
494                 ast_cond_wait(&shutdown_data->in, &shutdown_data->lock);
495         }
496         shutdown_data->task_complete = 1;
497         ast_cond_signal(&shutdown_data->out);
498         return 0;
499 }
500
501 static int shutdown_waitfor_completion(struct shutdown_data *shutdown_data)
502 {
503         struct timeval start = ast_tvnow();
504         struct timespec end = {
505                 .tv_sec = start.tv_sec + 5,
506                 .tv_nsec = start.tv_usec * 1000
507         };
508         SCOPED_MUTEX(lock, &shutdown_data->lock);
509
510         while (!shutdown_data->task_complete) {
511                 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
512                         break;
513                 }
514         }
515
516         return shutdown_data->task_complete;
517 }
518
519 static int shutdown_has_completed(struct shutdown_data *shutdown_data)
520 {
521         SCOPED_MUTEX(lock, &shutdown_data->lock);
522         return shutdown_data->task_complete;
523 }
524
525 static int shutdown_waitfor_start(struct shutdown_data *shutdown_data)
526 {
527         struct timeval start = ast_tvnow();
528         struct timespec end = {
529                 .tv_sec = start.tv_sec + 5,
530                 .tv_nsec = start.tv_usec * 1000
531         };
532         SCOPED_MUTEX(lock, &shutdown_data->lock);
533
534         while (!shutdown_data->task_started) {
535                 if (ast_cond_timedwait(&shutdown_data->out, &shutdown_data->lock, &end) == ETIMEDOUT) {
536                         break;
537                 }
538         }
539
540         return shutdown_data->task_started;
541 }
542
543 static void shutdown_poke(struct shutdown_data *shutdown_data)
544 {
545         SCOPED_MUTEX(lock, &shutdown_data->lock);
546         shutdown_data->task_stop_waiting = 1;
547         ast_cond_signal(&shutdown_data->in);
548 }
549
550 static void *tps_shutdown_thread(void *data)
551 {
552         struct ast_taskprocessor *tps = data;
553         ast_taskprocessor_unreference(tps);
554         return NULL;
555 }
556
557 AST_TEST_DEFINE(taskprocessor_shutdown)
558 {
559         RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
560         RAII_VAR(struct shutdown_data *, task1, NULL, ao2_cleanup);
561         RAII_VAR(struct shutdown_data *, task2, NULL, ao2_cleanup);
562         int push_res;
563         int wait_res;
564         int pthread_res;
565         pthread_t shutdown_thread;
566
567         switch (cmd) {
568         case TEST_INIT:
569                 info->name = "taskprocessor_shutdown";
570                 info->category = "/main/taskprocessor/";
571                 info->summary = "Test of taskproccesor shutdown sequence";
572                 info->description =
573                         "Ensures that all tasks run to completion after the taskprocessor has been unref'ed.";
574                 return AST_TEST_NOT_RUN;
575         case TEST_EXECUTE:
576                 break;
577         }
578
579         tps = ast_taskprocessor_get("test_shutdown", TPS_REF_DEFAULT);
580         task1 = shutdown_data_create(0); /* task1 waits to be poked */
581         task2 = shutdown_data_create(1); /* task2 waits for nothing */
582
583         if (!tps || !task1 || !task2) {
584                 ast_test_status_update(test, "Allocation error\n");
585                 return AST_TEST_FAIL;
586         }
587
588         push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task1);
589         if (push_res != 0) {
590                 ast_test_status_update(test, "Could not push task1\n");
591                 return AST_TEST_FAIL;
592         }
593
594         push_res = ast_taskprocessor_push(tps, shutdown_task_exec, task2);
595         if (push_res != 0) {
596                 ast_test_status_update(test, "Could not push task2\n");
597                 return AST_TEST_FAIL;
598         }
599
600         wait_res = shutdown_waitfor_start(task1);
601         if (!wait_res) {
602                 ast_test_status_update(test, "Task1 didn't start\n");
603                 return AST_TEST_FAIL;
604         }
605
606         pthread_res = ast_pthread_create(&shutdown_thread, NULL, tps_shutdown_thread, tps);
607         if (pthread_res != 0) {
608                 ast_test_status_update(test, "Failed to create shutdown thread\n");
609                 return AST_TEST_FAIL;
610         }
611         tps = NULL;
612
613         /* Wakeup task1; it should complete */
614         shutdown_poke(task1);
615         wait_res = shutdown_waitfor_completion(task1);
616         if (!wait_res) {
617                 ast_test_status_update(test, "Task1 didn't complete\n");
618                 return AST_TEST_FAIL;
619         }
620
621         /* Wait for shutdown to complete */
622         pthread_join(shutdown_thread, NULL);
623
624         /* Should have also also completed task2 */
625         wait_res = shutdown_has_completed(task2);
626         if (!wait_res) {
627                 ast_test_status_update(test, "Task2 didn't finish\n");
628                 return AST_TEST_FAIL;
629         }
630
631         return AST_TEST_PASS;
632 }
633
634 static int unload_module(void)
635 {
636         ast_test_unregister(default_taskprocessor);
637         ast_test_unregister(default_taskprocessor_load);
638         ast_test_unregister(taskprocessor_listener);
639         ast_test_unregister(taskprocessor_shutdown);
640         return 0;
641 }
642
643 static int load_module(void)
644 {
645         ast_test_register(default_taskprocessor);
646         ast_test_register(default_taskprocessor_load);
647         ast_test_register(taskprocessor_listener);
648         ast_test_register(taskprocessor_shutdown);
649         return AST_MODULE_LOAD_SUCCESS;
650 }
651
652 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");