377a2b3e3ce7a4a3b5b8e3e167056b2dd9c37cb1
[asterisk/asterisk.git] / tests / test_taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief taskprocessor unit tests
22  *
23  * \author Mark Michelson <mmichelson@digium.com>
24  *
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 #include "asterisk/test.h"
35 #include "asterisk/taskprocessor.h"
36 #include "asterisk/module.h"
37 #include "asterisk/astobj2.h"
38
39 /*!
40  * \brief userdata associated with baseline taskprocessor test
41  */
42 struct task_data {
43         /* Condition used to signal to queuing thread that task was executed */
44         ast_cond_t cond;
45         /* Lock protecting the condition */
46         ast_mutex_t lock;
47         /*! Boolean indicating that the task was run */
48         int task_complete;
49 };
50
51 /*!
52  * \brief Queued task for baseline test.
53  *
54  * The task simply sets a boolean to indicate the
55  * task has been run and then signals a condition
56  * saying it's complete
57  */
58 static int task(void *data)
59 {
60         struct task_data *task_data = data;
61         SCOPED_MUTEX(lock, &task_data->lock);
62         task_data->task_complete = 1;
63         ast_cond_signal(&task_data->cond);
64         return 0;
65 }
66
67 /*!
68  * \brief Baseline test for default taskprocessor
69  *
70  * This test ensures that when a task is added to a taskprocessor that
71  * has been allocated with a default listener that the task gets executed
72  * as expected
73  */
74 AST_TEST_DEFINE(default_taskprocessor)
75 {
76         struct ast_taskprocessor *tps;
77         struct task_data task_data;
78         struct timeval start;
79         struct timespec ts;
80         enum ast_test_result_state res = AST_TEST_PASS;
81         int timedwait_res;
82
83         switch (cmd) {
84         case TEST_INIT:
85                 info->name = "default_taskprocessor";
86                 info->category = "/main/taskprocessor/";
87                 info->summary = "Test of default taskproccesor";
88                 info->description =
89                         "Ensures that a queued task gets executed.";
90                 return AST_TEST_NOT_RUN;
91         case TEST_EXECUTE:
92                 break;
93         }
94
95         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
96
97         if (!tps) {
98                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
99                 return AST_TEST_FAIL;
100         }
101
102         start = ast_tvnow();
103
104         ts.tv_sec = start.tv_sec + 30;
105         ts.tv_nsec = start.tv_usec * 1000;
106
107         ast_cond_init(&task_data.cond, NULL);
108         ast_mutex_init(&task_data.lock);
109         task_data.task_complete = 0;
110
111         ast_taskprocessor_push(tps, task, &task_data);
112         ast_mutex_lock(&task_data.lock);
113         while (!task_data.task_complete) {
114                 timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
115                 if (timedwait_res == ETIMEDOUT) {
116                         break;
117                 }
118         }
119         ast_mutex_unlock(&task_data.lock);
120
121         if (!task_data.task_complete) {
122                 ast_test_status_update(test, "Queued task did not execute!\n");
123                 res = AST_TEST_FAIL;
124                 goto test_end;
125         }
126
127 test_end:
128         tps = ast_taskprocessor_unreference(tps);
129         ast_mutex_destroy(&task_data.lock);
130         ast_cond_destroy(&task_data.cond);
131         return res;
132 }
133
134 #define NUM_TASKS 20000
135
136 /*!
137  * \brief Relevant data associated with taskprocessor load test
138  */
139 static struct load_task_data {
140         /*! Condition used to indicate a task has completed executing */
141         ast_cond_t cond;
142         /*! Lock used to protect the condition */
143         ast_mutex_t lock;
144         /*! Counter of the number of completed tasks */
145         int tasks_completed;
146         /*! Storage for task-specific data */
147         int task_rand[NUM_TASKS];
148 } load_task_results;
149
150 /*!
151  * \brief a queued task to be used in the taskprocessor load test
152  *
153  * The task increments the number of tasks executed and puts the passed-in
154  * data into the next slot in the array of random data.
155  */
156 static int load_task(void *data)
157 {
158         int *randdata = data;
159         SCOPED_MUTEX(lock, &load_task_results.lock);
160         load_task_results.task_rand[load_task_results.tasks_completed++] = *randdata;
161         ast_cond_signal(&load_task_results.cond);
162         return 0;
163 }
164
165 /*!
166  * \brief Load test for taskprocessor with default listener
167  *
168  * This test queues a large number of tasks, each with random data associated.
169  * The test ensures that all of the tasks are run and that the tasks are executed
170  * in the same order that they were queued
171  */
172 AST_TEST_DEFINE(default_taskprocessor_load)
173 {
174         struct ast_taskprocessor *tps;
175         struct timeval start;
176         struct timespec ts;
177         enum ast_test_result_state res = AST_TEST_PASS;
178         int timedwait_res;
179         int i;
180         int rand_data[NUM_TASKS];
181
182         switch (cmd) {
183         case TEST_INIT:
184                 info->name = "default_taskprocessor_load";
185                 info->category = "/main/taskprocessor/";
186                 info->summary = "Load test of default taskproccesor";
187                 info->description =
188                         "Ensure that a large number of queued tasks are executed in the proper order.";
189                 return AST_TEST_NOT_RUN;
190         case TEST_EXECUTE:
191                 break;
192         }
193
194         tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
195
196         if (!tps) {
197                 ast_test_status_update(test, "Unable to create test taskprocessor\n");
198                 return AST_TEST_FAIL;
199         }
200
201         start = ast_tvnow();
202
203         ts.tv_sec = start.tv_sec + 60;
204         ts.tv_nsec = start.tv_usec * 1000;
205
206         ast_cond_init(&load_task_results.cond, NULL);
207         ast_mutex_init(&load_task_results.lock);
208         load_task_results.tasks_completed = 0;
209
210         for (i = 0; i < NUM_TASKS; ++i) {
211                 rand_data[i] = ast_random();
212                 ast_taskprocessor_push(tps, load_task, &rand_data[i]);
213         }
214
215         ast_mutex_lock(&load_task_results.lock);
216         while (load_task_results.tasks_completed < NUM_TASKS) {
217                 timedwait_res = ast_cond_timedwait(&load_task_results.cond, &load_task_results.lock, &ts);
218                 if (timedwait_res == ETIMEDOUT) {
219                         break;
220                 }
221         }
222         ast_mutex_unlock(&load_task_results.lock);
223
224         if (load_task_results.tasks_completed != NUM_TASKS) {
225                 ast_test_status_update(test, "Unexpected number of tasks executed. Expected %d but got %d\n",
226                                 NUM_TASKS, load_task_results.tasks_completed);
227                 res = AST_TEST_FAIL;
228                 goto test_end;
229         }
230
231         for (i = 0; i < NUM_TASKS; ++i) {
232                 if (rand_data[i] != load_task_results.task_rand[i]) {
233                         ast_test_status_update(test, "Queued tasks did not execute in order\n");
234                         res = AST_TEST_FAIL;
235                         goto test_end;
236                 }
237         }
238
239 test_end:
240         tps = ast_taskprocessor_unreference(tps);
241         ast_mutex_destroy(&load_task_results.lock);
242         ast_cond_destroy(&load_task_results.cond);
243         return res;
244 }
245
246 /*!
247  * \brief Private data for the test taskprocessor listener
248  */
249 struct test_listener_pvt {
250         /* Counter of number of tasks pushed to the queue */
251         int num_pushed;
252         /* Counter of number of times the queue was emptied */
253         int num_emptied;
254         /* Counter of number of times that a pushed task occurred on an empty queue */
255         int num_was_empty;
256         /* Boolean indicating whether the shutdown callback was called */
257         int shutdown;
258 };
259
260 /*!
261  * \brief test taskprocessor listener's alloc callback
262  */
263 static void *test_alloc(struct ast_taskprocessor_listener *listener)
264 {
265         struct test_listener_pvt *pvt;
266
267         pvt = ast_calloc(1, sizeof(*pvt));
268         return pvt;
269 }
270
271 /*!
272  * \brief test taskprocessor listener's start callback
273  */
274 static int test_start(struct ast_taskprocessor_listener *listener)
275 {
276         return 0;
277 }
278
279 /*!
280  * \brief test taskprocessor listener's task_pushed callback
281  *
282  * Adjusts private data's stats as indicated by the parameters.
283  */
284 static void test_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
285 {
286         struct test_listener_pvt *pvt = listener->private_data;
287         ++pvt->num_pushed;
288         if (was_empty) {
289                 ++pvt->num_was_empty;
290         }
291 }
292
293 /*!
294  * \brief test taskprocessor listener's emptied callback.
295  */
296 static void test_emptied(struct ast_taskprocessor_listener *listener)
297 {
298         struct test_listener_pvt *pvt = listener->private_data;
299         ++pvt->num_emptied;
300 }
301
302 /*!
303  * \brief test taskprocessor listener's shutdown callback.
304  */
305 static void test_shutdown(struct ast_taskprocessor_listener *listener)
306 {
307         struct test_listener_pvt *pvt = listener->private_data;
308         pvt->shutdown = 1;
309 }
310
311 /*!
312  * \brief test taskprocessor listener's destroy callback.
313  */
314 static void test_destroy(void *private_data)
315 {
316         struct test_listener_pvt *pvt = private_data;
317         ast_free(pvt);
318 }
319
320 static const struct ast_taskprocessor_listener_callbacks test_callbacks = {
321         .alloc = test_alloc,
322         .start = test_start,
323         .task_pushed = test_task_pushed,
324         .emptied = test_emptied,
325         .shutdown = test_shutdown,
326         .destroy = test_destroy,
327 };
328
329 /*!
330  * \brief Queued task for taskprocessor listener test.
331  *
332  * Does nothing.
333  */
334 static int listener_test_task(void *ignore)
335 {
336         return 0;
337 }
338
339 /*!
340  * \brief helper to ensure that statistics the listener is keeping are what we expect
341  *
342  * \param test The currently-running test
343  * \param pvt The private data for the taskprocessor listener
344  * \param num_pushed The expected current number of tasks pushed to the processor
345  * \param num_emptied The expected current number of times the taskprocessor has become empty
346  * \param num_was_empty The expected current number of times that tasks were pushed to an empty taskprocessor
347  * \retval -1 Stats were not as expected
348  * \retval 0 Stats were as expected
349  */
350 static int check_stats(struct ast_test *test, const struct test_listener_pvt *pvt, int num_pushed, int num_emptied, int num_was_empty)
351 {
352         if (pvt->num_pushed != num_pushed) {
353                 ast_test_status_update(test, "Unexpected number of tasks pushed. Expected %d but got %d\n",
354                                 num_pushed, pvt->num_pushed);
355                 return -1;
356         }
357
358         if (pvt->num_emptied != num_emptied) {
359                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
360                                 num_emptied, pvt->num_emptied);
361                 return -1;
362         }
363
364         if (pvt->num_was_empty != num_was_empty) {
365                 ast_test_status_update(test, "Unexpected number of empties. Expected %d but got %d\n",
366                                 num_was_empty, pvt->num_emptied);
367                 return -1;
368         }
369
370         return 0;
371 }
372
373 /*!
374  * \brief Test for a taskprocessor with custom listener.
375  *
376  * This test pushes tasks to a taskprocessor with a custom listener, executes the taskss,
377  * and destroys the taskprocessor.
378  *
379  * The test ensures that the listener's callbacks are called when expected and that the data
380  * being passed in is accurate.
381  */
382 AST_TEST_DEFINE(taskprocessor_listener)
383 {
384         struct ast_taskprocessor *tps;
385         struct ast_taskprocessor_listener *listener;
386         struct test_listener_pvt *pvt;
387         enum ast_test_result_state res = AST_TEST_PASS;
388
389         switch (cmd) {
390         case TEST_INIT:
391                 info->name = "taskprocessor_listener";
392                 info->category = "/main/taskprocessor/";
393                 info->summary = "Test of taskproccesor listeners";
394                 info->description =
395                         "Ensures that listener callbacks are called when expected.";
396                 return AST_TEST_NOT_RUN;
397         case TEST_EXECUTE:
398                 break;
399         }
400
401         listener = ast_taskprocessor_listener_alloc(&test_callbacks);
402         if (!listener) {
403                 ast_test_status_update(test, "Unable to allocate test taskprocessor listener\n");
404                 return AST_TEST_FAIL;
405         }
406
407         tps = ast_taskprocessor_create_with_listener("test_listener", listener);
408         if (!tps) {
409                 ast_test_status_update(test, "Unable to allocate test taskprocessor\n");
410                 res = AST_TEST_FAIL;
411                 goto test_exit;
412         }
413
414         pvt = listener->private_data;
415
416         ast_taskprocessor_push(tps, listener_test_task, NULL);
417
418         if (check_stats(test, pvt, 1, 0, 1) < 0) {
419                 res = AST_TEST_FAIL;
420                 goto test_exit;
421         }
422
423         ast_taskprocessor_push(tps, listener_test_task, NULL);
424
425         if (check_stats(test, pvt, 2, 0, 1) < 0) {
426                 res = AST_TEST_FAIL;
427                 goto test_exit;
428         }
429
430         ast_taskprocessor_execute(tps);
431
432         if (check_stats(test, pvt, 2, 0, 1) < 0) {
433                 res = AST_TEST_FAIL;
434                 goto test_exit;
435         }
436
437         ast_taskprocessor_execute(tps);
438
439         if (check_stats(test, pvt, 2, 1, 1) < 0) {
440                 res = AST_TEST_FAIL;
441                 goto test_exit;
442         }
443
444         tps = ast_taskprocessor_unreference(tps);
445
446         if (!pvt->shutdown) {
447                 res = AST_TEST_FAIL;
448                 goto test_exit;
449         }
450
451 test_exit:
452         ao2_ref(listener, -1);
453         /* This is safe even if tps is NULL */
454         ast_taskprocessor_unreference(tps);
455         return res;
456 }
457
458 static int unload_module(void)
459 {
460         ast_test_unregister(default_taskprocessor);
461         ast_test_unregister(default_taskprocessor_load);
462         ast_test_unregister(taskprocessor_listener);
463         return 0;
464 }
465
466 static int load_module(void)
467 {
468         ast_test_register(default_taskprocessor);
469         ast_test_register(default_taskprocessor_load);
470         ast_test_register(taskprocessor_listener);
471         return AST_MODULE_LOAD_SUCCESS;
472 }
473
474 AST_MODULE_INFO_STANDARD(ASTERISK_GPL_KEY, "taskprocessor test module");