cbab754a9406cd718e318f1604eebd18672c85f6
[asterisk/asterisk.git] / tests / test_taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief taskprocessor unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/test.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/module.h"
37 #include "asterisk/astobj2.h"
38
39 /*!
40  * \brief userdata associated with baseline taskprocessor test
41  */
42 struct task_data {
43         /* Condition used to signal to queuing thread that task was executed */
44         ast_cond_t cond;
45         /* Lock protecting the condition */
46         ast_mutex_t lock;
47         /*! Boolean indicating that the task was run */
48         int task_complete;
49 };
50
51 /*!
52  * \brief Queued task for baseline test.
53  *
54  * The task simply sets a boolean to indicate the
55  * task has been run and then signals a condition
56  * saying it's complete
57  */
58 static int task(void *data)
59 {
60         struct task_data *task_data = data;
61         SCOPED_MUTEX(lock, &task_data->lock);
62         task_data->task_complete = 1;
63         ast_cond_signal(&task_data->cond);
64         return 0;
65 }
66
67 /*!
68  * \brief Baseline test for default taskprocessor
69  *
70  * This test ensures that when a task is added to a taskprocessor that
71  * has been allocated with a default listener that the task gets executed
72  * as expected
73  */
74 AST_TEST_DEFINE(default_taskprocessor)
75 {
76         struct ast_taskprocessor *tps;
77         struct task_data task_data;
78         struct timeval start;
79         struct timespec ts;
80         enum ast_test_result_state res = AST_TEST_PASS;
81         int timedwait_res;
82
83         switch (cmd) {
84         case TEST_INIT:
85                 info->name = "default_taskprocessor";
86                 info->category = "/main/taskprocessor/";
87                 info->summary = "Test of default taskproccesor";
88                 info->description =
89                         "Ensures that a queued task gets executed.";
90                 return AST_TEST_NOT_RUN;
91         case TEST_EXECUTE:
92                 break;
93         }
94
95         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
96
97         if (!tps) {
98                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
99                 return AST_TEST_FAIL;
100         }
101
102         start = ast_tvnow();
103
104         ts.tv_sec = start.tv_sec + 30;
105         ts.tv_nsec = start.tv_usec * 1000;
106
107         ast_cond_init(&task_data.cond, NULL);
108         ast_mutex_init(&task_data.lock);
109         task_data.task_complete = 0;
110
111         ast_taskprocessor_push(tps, task, &task_data);
112         ast_mutex_lock(&task_data.lock);
113         while (!task_data.task_complete) {
114                 timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
115                 if (timedwait_res == ETIMEDOUT) {
116                         break;
117                 }
118         }
119
120         if (!task_data.task_complete) {
121                 ast_test_status_update(test, "Queued task did not execute!\n");
122                 res = AST_TEST_FAIL;
123                 goto test_end;
124         }
125
126 test_end:
127         tps = ast_taskprocessor_unreference(tps);
128         ast_mutex_destroy(&task_data.lock);
129         ast_cond_destroy(&task_data.cond);
130         return res;
131 }
132
133 #define NUM_TASKS 20000
134
135 /*!
136  * \brief Relevant data associated with taskprocessor load test
137  */
138 static struct load_task_data {
139         /*! Condition used to indicate a task has completed executing */
140         ast_cond_t cond;
141         /*! Lock used to protect the condition */
142         ast_mutex_t lock;
143         /*! Counter of the number of completed tasks */
144         int tasks_completed;
145         /*! Storage for task-specific data */
146         int task_rand[NUM_TASKS];
147 } load_task_results;
148
149 /*!
150  * \brief a queued task to be used in the taskprocessor load test
151  *
152  * The task increments the number of tasks executed and puts the passed-in
153  * data into the next slot in the array of random data.
154  */
155 static int load_task(void *data)
156 {
157         int *randdata = data;
158         SCOPED_MUTEX(lock, &load_task_results.lock);
159         load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
160         ast_cond_signal(&load_task_results.cond);
161         return 0;
162 }
163
164 /*!
165  * \brief Load test for taskprocessor with default listener
166  *
167  * This test queues a large number of tasks, each with random data associated.
168  * The test ensures that all of the tasks are run and that the tasks are executed
169  * in the same order that they were queued
170  */
171 AST_TEST_DEFINE(default_taskprocessor_load)
172 {
173         struct ast_taskprocessor *tps;
174         struct timeval start;
175         struct timespec ts;
176         enum ast_test_result_state res = AST_TEST_PASS;
177         int timedwait_res;
178         int i;
179         int rand_data[NUM_TASKS];
180
181         switch (cmd) {
182         case TEST_INIT:
183                 info->name = "default_taskprocessor_load";
184                 info->category = "/main/taskprocessor/";
185                 info->summary = "Load test of default taskproccesor";
186                 info->description =
187                         "Ensure that a large number of queued tasks are executed in the proper order.";
188                 return AST_TEST_NOT_RUN;
189         case TEST_EXECUTE:
190                 break;
191         }
192
193         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
194
195         if (!tps) {
196                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
197                 return AST_TEST_FAIL;
198         }
199
200         start = ast_tvnow();
201
202         ts.tv_sec = start.tv_sec + 60;
203         ts.tv_nsec = start.tv_usec * 1000;
204
205         ast_cond_init(&load_task_results.cond, NULL);
206         ast_mutex_init(&load_task_results.lock);
207         load_task_results.tasks_completed = 0;
208
209         for (i = 0; i < NUM_TASKS; ++i) {
210                 rand_data[i] = ast_random();
211                 ast_taskprocessor_push(tps, load_task, &rand_data[i]);
212         }
213
214         ast_mutex_lock(&load_task_results.lock);
215         while (load_task_results.tasks_completed < NUM_TASKS) {
216                 timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
217                 if (timedwait_res == ETIMEDOUT) {
218                         break;
219                 }
220         }
221
222         if (load_task_results.tasks_completed != NUM_TASKS) {
223                 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
224                                 NUM_TASKS, load_task_results.tasks_completed);
225                 res = AST_TEST_FAIL;
226                 goto test_end;
227         }
228
229         for (i = 0; i < NUM_TASKS; ++i) {
230                 if (rand_data[i] != load_task_results.task_rand[i]) {
231                         ast_test_status_update(test, "Queued tasks did not execute in order\n");
232                         res = AST_TEST_FAIL;
233                         goto test_end;
234                 }
235         }
236
237 test_end:
238         tps = ast_taskprocessor_unreference(tps);
239         ast_mutex_destroy(&load_task_results.lock);
240         ast_cond_destroy(&load_task_results.cond);
241         return res;
242 }
243
244 /*!
245  * \brief Private data for the test taskprocessor listener
246  */
247 struct test_listener_pvt {
248         /* Counter of number of tasks pushed to the queue */
249         int num_pushed;
250         /* Counter of number of times the queue was emptied */
251         int num_emptied;
252         /* Counter of number of times that a pushed task occurred on an empty queue */
253         int num_was_empty;
254         /* Boolean indicating whether the shutdown callback was called */
255         int shutdown;
256 };
257
258 /*!
259  * \brief test taskprocessor listener's alloc callback
260  */
261 static void *test_alloc(struct ast_taskprocessor_listener *listener)
262 {
263         struct test_listener_pvt *pvt;
264
265         pvt = ast_calloc(1, sizeof(*pvt));
266         return pvt;
267 }
268
269 /*!
270  * \brief test taskprocessor listener's task_pushed callback
271  *
272  * Adjusts private data's stats as indicated by the parameters.
273  */
274 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
275 {
276         struct test_listener_pvt *pvt = listener->private_data;
277         ++pvt->num_pushed;
278         if (was_empty) {
279                 ++pvt->num_was_empty;
280         }
281 }
282
283 /*!
284  * \brief test taskprocessor listener's emptied callback.
285  */
286 static void test_emptied(struct ast_taskprocessor_listener *listener)
287 {
288         struct test_listener_pvt *pvt = listener->private_data;
289         ++pvt->num_emptied;
290 }
291
292 /*!
293  * \brief test taskprocessor listener's shutdown callback.
294  */
295 static void test_shutdown(struct ast_taskprocessor_listener *listener)
296 {
297         struct test_listener_pvt *pvt = listener->private_data;
298         pvt->shutdown = 1;
299 }
300
301 /*!
302  * \brief test taskprocessor listener's destroy callback.
303  */
304 static void test_destroy(void *private_data)
305 {
306         struct test_listener_pvt *pvt = private_data;
307         ast_free(pvt);
308 }
309
310 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
311         .alloc = test_alloc,
312         .task_pushed = test_task_pushed,
313         .emptied = test_emptied,
314         .shutdown = test_shutdown,
315         .destroy = test_destroy,
316 };
317
318 /*!
319  * \brief Queued task for taskprocessor listener test.
320  *
321  * Does nothing.
322  */
323 static int listener_test_task(void *ignore)
324 {
325         return 0;
326 }
327
328 /*!
329  * \brief helper to ensure that statistics the listener is keeping are what we expect
330  *
331  * \param test The currently-running test
332  * \param pvt The private data for the taskprocessor listener
333  * \param num_pushed The expected current number of tasks pushed to the processor
334  * \param num_emptied The expected current number of times the taskprocessor has become empty
335  * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
336  * \retval -1 Stats were not as expected
337  * \retval 0 Stats were as expected
338  */
339 static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
340 {
341         if (pvt->num_pushed != num_pushed) {
342                 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
343                                 num_pushed, pvt->num_pushed);
344                 return -1;
345         }
346
347         if (pvt->num_emptied != num_emptied) {
348                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
349                                 num_emptied, pvt->num_emptied);
350                 return -1;
351         }
352
353         if (pvt->num_was_empty != num_was_empty) {
354                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
355                                 num_was_empty, pvt->num_emptied);
356                 return -1;
357         }
358
359         return 0;
360 }
361
362 /*!
363  * \brief Test for a taskprocessor with custom listener.
364  *
365  * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss,
366  * and destroys the taskprocessor.
367  *
368  * The test ensures that the listener's callbacks are called when expected and that the data
369  * being passed in is accurate.
370  */
371 AST_TEST_DEFINE(taskprocessor_listener)
372 {
373         struct ast_taskprocessor *tps;
374         struct ast_taskprocessor_listener *listener;
375         struct test_listener_pvt *pvt;
376         enum ast_test_result_state res = AST_TEST_PASS;
377
378         switch (cmd) {
379         case TEST_INIT:
380                 info->name = "taskprocessor_listener";
381                 info->category = "/main/taskprocessor/";
382                 info->summary = "Test of taskproccesor listeners";
383                 info->description =
384                         "Ensures that listener callbacks are called when expected.";
385                 return AST_TEST_NOT_RUN;
386         case TEST_EXECUTE:
387                 break;
388         }
389
390         listener = ast_taskprocessor_listener_alloc(&test_callbacks);
391         if (!listener) {
392                 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
393                 return AST_TEST_FAIL;
394         }
395
396         tps = ast_taskprocessor_create_with_listener("test_listener", listener);
397         if (!tps) {
398                 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
399                 res = AST_TEST_FAIL;
400                 goto test_exit;
401         }
402
403         pvt = listener->private_data;
404
405         ast_taskprocessor_push(tps, listener_test_task, NULL);
406
407         if (check_stats(test, pvt, 1, 0, 1) < 0) {
408                 res = AST_TEST_FAIL;
409                 goto test_exit;
410         }
411
412         ast_taskprocessor_push(tps, listener_test_task, NULL);
413
414         if (check_stats(test, pvt, 2, 0, 1) < 0) {
415                 res = AST_TEST_FAIL;
416                 goto test_exit;
417         }
418
419         ast_taskprocessor_execute(tps);
420
421         if (check_stats(test, pvt, 2, 0, 1) < 0) {
422                 res = AST_TEST_FAIL;
423                 goto test_exit;
424         }
425
426         ast_taskprocessor_execute(tps);
427
428         if (check_stats(test, pvt, 2, 1, 1) < 0) {
429                 res = AST_TEST_FAIL;
430                 goto test_exit;
431         }
432
433         tps = ast_taskprocessor_unreference(tps);
434
435         if (!pvt->shutdown) {
436                 res = AST_TEST_FAIL;
437                 goto test_exit;
438         }
439
440 test_exit:
441         ao2_ref(listener, -1);
442         /* This is safe even if tps is NULL */
443         ast_taskprocessor_unreference(tps);
444         return res;
445 }
446
447 static int unload_module(void)
448 {
449         ast_test_unregister(default_taskprocessor);
450         ast_test_unregister(default_taskprocessor_load);
451         ast_test_unregister(taskprocessor_listener);
452         return 0;
453 }
454
455 static int load_module(void)
456 {
457         ast_test_register(default_taskprocessor);
458         ast_test_register(default_taskprocessor_load);
459         ast_test_register(taskprocessor_listener);
460         return AST_MODULE_LOAD_SUCCESS;
461 }
462
463 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");