Fix infinite looping and crash problem.
[asterisk/asterisk.git] / main / taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007-2008, Digium, Inc.
5  *
6  * Dwayne M. Hubbard <dhubbard@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22  *
23  * \author Dwayne Hubbard <dhubbard@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/_private.h"
35 #include "asterisk/module.h"
36 #include "asterisk/time.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/cli.h"
39 #include "asterisk/taskprocessor.h"
40
41
42 /*!
43  * \brief tps_task structure is queued to a taskprocessor
44  *
45  * tps_tasks are processed in FIFO order and freed by the taskprocessing
46  * thread after the task handler returns.  The callback function that is assigned
47  * to the execute() function pointer is responsible for releasing datap resources if necessary.
48  */
49 struct tps_task {
50         /*! \brief The execute() task callback function pointer */
51         int (*execute)(void *datap);
52         /*! \brief The data pointer for the task execute() function */
53         void *datap;
54         /*! \brief AST_LIST_ENTRY overhead */
55         AST_LIST_ENTRY(tps_task) list;
56 };
57
58 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
59 struct tps_taskprocessor_stats {
60         /*! \brief This is the maximum number of tasks queued at any one time */
61         unsigned long max_qsize;
62         /*! \brief This is the current number of tasks processed */
63         unsigned long _tasks_processed_count;
64 };
65
66 /*! \brief A ast_taskprocessor structure is a singleton by name */
67 struct ast_taskprocessor {
68         /*! \brief Friendly name of the taskprocessor */
69         const char *name;
70         /*! \brief Taskprocessor statistics */
71         struct tps_taskprocessor_stats *stats;
72         /*! \brief Taskprocessor current queue size */
73         long tps_queue_size;
74         /*! \brief Taskprocessor queue */
75         AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
76         /*! \brief Taskprocessor singleton list entry */
77         AST_LIST_ENTRY(ast_taskprocessor) list;
78         struct ast_taskprocessor_listener *listener;
79         /*! Indicates if the taskprocessor is in the process of shuting down */
80         unsigned int shutting_down:1;
81 };
82 #define TPS_MAX_BUCKETS 7
83 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
84 static struct ao2_container *tps_singletons;
85
86 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
87 static ast_cond_t cli_ping_cond;
88
89 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
90 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
91
92 /*! \brief The astobj2 hash callback for taskprocessors */
93 static int tps_hash_cb(const void *obj, const int flags);
94 /*! \brief The astobj2 compare callback for taskprocessors */
95 static int tps_cmp_cb(void *obj, void *arg, int flags);
96
97 /*! \brief The task processing function executed by a taskprocessor */
98 static void *tps_processing_function(void *data);
99
100 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
101 static void tps_taskprocessor_destroy(void *tps);
102
103 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
104 static int tps_ping_handler(void *datap);
105
106 /*! \brief Remove the front task off the taskprocessor queue */
107 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
108
109 /*! \brief Return the size of the taskprocessor queue */
110 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
111
112 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
113 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
114
115 static struct ast_cli_entry taskprocessor_clis[] = {
116         AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
117         AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
118 };
119
120 struct default_taskprocessor_listener_pvt {
121         pthread_t poll_thread;
122         ast_mutex_t lock;
123         ast_cond_t cond;
124         int wake_up;
125         int dead;
126 };
127
128
129 static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
130 {
131         SCOPED_MUTEX(lock, &pvt->lock);
132         pvt->wake_up = 1;
133         pvt->dead = should_die;
134         ast_cond_signal(&pvt->cond);
135 }
136
137 static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
138 {
139         SCOPED_MUTEX(lock, &pvt->lock);
140         while (!pvt->wake_up) {
141                 ast_cond_wait(&pvt->cond, lock);
142         }
143         pvt->wake_up = 0;
144         return pvt->dead;
145 }
146
147 /* this is the task processing worker function */
148 static void *tps_processing_function(void *data)
149 {
150         struct ast_taskprocessor_listener *listener = data;
151         struct ast_taskprocessor *tps = listener->tps;
152         struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
153         int dead = 0;
154
155         while (!dead) {
156                 if (!ast_taskprocessor_execute(tps)) {
157                         dead = default_tps_idle(pvt);
158                 }
159         }
160         return NULL;
161 }
162
163 static void *default_listener_alloc(struct ast_taskprocessor_listener *listener)
164 {
165         struct default_taskprocessor_listener_pvt *pvt;
166
167         pvt = ast_calloc(1, sizeof(*pvt));
168         if (!pvt) {
169                 return NULL;
170         }
171         ast_cond_init(&pvt->cond, NULL);
172         ast_mutex_init(&pvt->lock);
173         pvt->poll_thread = AST_PTHREADT_NULL;
174         return pvt;
175 }
176
177 static int default_listener_start(struct ast_taskprocessor_listener *listener)
178 {
179         struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
180
181         if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
182                 return -1;
183         }
184
185         return 0;
186 }
187
188 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
189 {
190         struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
191
192         if (was_empty) {
193                 default_tps_wake_up(pvt, 0);
194         }
195 }
196
197 static void default_emptied(struct ast_taskprocessor_listener *listener)
198 {
199         /* No-op */
200 }
201
202 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
203 {
204         struct default_taskprocessor_listener_pvt *pvt = listener->private_data;
205         default_tps_wake_up(pvt, 1);
206         pthread_join(pvt->poll_thread, NULL);
207         pvt->poll_thread = AST_PTHREADT_NULL;
208 }
209
210 static void default_listener_destroy(void *obj)
211 {
212         struct default_taskprocessor_listener_pvt *pvt = obj;
213         ast_mutex_destroy(&pvt->lock);
214         ast_cond_destroy(&pvt->cond);
215         ast_free(pvt);
216 }
217
218 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
219         .alloc = default_listener_alloc,
220         .start = default_listener_start,
221         .task_pushed = default_task_pushed,
222         .emptied = default_emptied,
223         .shutdown = default_listener_shutdown,
224         .destroy = default_listener_destroy,
225 };
226
227 /*! \internal \brief Clean up resources on Asterisk shutdown */
228 static void tps_shutdown(void)
229 {
230         ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
231 }
232
233 /* initialize the taskprocessor container and register CLI operations */
234 int ast_tps_init(void)
235 {
236         if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
237                 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
238                 return -1;
239         }
240
241         ast_cond_init(&cli_ping_cond, NULL);
242
243         ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
244
245         ast_register_atexit(tps_shutdown);
246
247         return 0;
248 }
249
250 /* allocate resources for the task */
251 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
252 {
253         struct tps_task *t;
254         if ((t = ast_calloc(1, sizeof(*t)))) {
255                 t->execute = task_exe;
256                 t->datap = datap;
257         }
258         return t;
259 }
260
261 /* release task resources */
262 static void *tps_task_free(struct tps_task *task)
263 {
264         if (task) {
265                 ast_free(task);
266         }
267         return NULL;
268 }
269
270 /* taskprocessor tab completion */
271 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
272 {
273         int tklen;
274         int wordnum = 0;
275         char *name = NULL;
276         struct ao2_iterator i;
277
278         if (a->pos != 3)
279                 return NULL;
280
281         tklen = strlen(a->word);
282         i = ao2_iterator_init(tps_singletons, 0);
283         while ((p = ao2_iterator_next(&i))) {
284                 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
285                         name = ast_strdup(p->name);
286                         ao2_ref(p, -1);
287                         break;
288                 }
289                 ao2_ref(p, -1);
290         }
291         ao2_iterator_destroy(&i);
292         return name;
293 }
294
295 /* ping task handling function */
296 static int tps_ping_handler(void *datap)
297 {
298         ast_mutex_lock(&cli_ping_cond_lock);
299         ast_cond_signal(&cli_ping_cond);
300         ast_mutex_unlock(&cli_ping_cond_lock);
301         return 0;
302 }
303
304 /* ping the specified taskprocessor and display the ping time on the CLI */
305 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
306 {
307         struct timeval begin, end, delta;
308         const char *name;
309         struct timeval when;
310         struct timespec ts;
311         struct ast_taskprocessor *tps = NULL;
312
313         switch (cmd) {
314         case CLI_INIT:
315                 e->command = "core ping taskprocessor";
316                 e->usage =
317                         "Usage: core ping taskprocessor <taskprocessor>\n"
318                         "       Displays the time required for a task to be processed\n";
319                 return NULL;
320         case CLI_GENERATE:
321                 return tps_taskprocessor_tab_complete(tps, a);
322         }
323
324         if (a->argc != 4)
325                 return CLI_SHOWUSAGE;
326
327         name = a->argv[3];
328         if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
329                 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
330                 return CLI_SUCCESS;
331         }
332         ast_cli(a->fd, "\npinging %s ...", name);
333         when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
334         ts.tv_sec = when.tv_sec;
335         ts.tv_nsec = when.tv_usec * 1000;
336         ast_mutex_lock(&cli_ping_cond_lock);
337         if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
338                 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
339                 ao2_ref(tps, -1);
340                 return CLI_FAILURE;
341         }
342         ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
343         ast_mutex_unlock(&cli_ping_cond_lock);
344         end = ast_tvnow();
345         delta = ast_tvsub(end, begin);
346         ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
347         ao2_ref(tps, -1);
348         return CLI_SUCCESS;
349 }
350
351 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
352 {
353         char name[256];
354         int tcount;
355         unsigned long qsize;
356         unsigned long maxqsize;
357         unsigned long processed;
358         struct ast_taskprocessor *p;
359         struct ao2_iterator i;
360
361         switch (cmd) {
362         case CLI_INIT:
363                 e->command = "core show taskprocessors";
364                 e->usage =
365                         "Usage: core show taskprocessors\n"
366                         "       Shows a list of instantiated task processors and their statistics\n";
367                 return NULL;
368         case CLI_GENERATE:
369                 return NULL;
370         }
371
372         if (a->argc != e->args)
373                 return CLI_SHOWUSAGE;
374
375         ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
376         i = ao2_iterator_init(tps_singletons, 0);
377         while ((p = ao2_iterator_next(&i))) {
378                 ast_copy_string(name, p->name, sizeof(name));
379                 qsize = p->tps_queue_size;
380                 maxqsize = p->stats->max_qsize;
381                 processed = p->stats->_tasks_processed_count;
382                 ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
383                 ao2_ref(p, -1);
384         }
385         ao2_iterator_destroy(&i);
386         tcount = ao2_container_count(tps_singletons);
387         ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
388         return CLI_SUCCESS;
389 }
390
391 /* hash callback for astobj2 */
392 static int tps_hash_cb(const void *obj, const int flags)
393 {
394         const struct ast_taskprocessor *tps = obj;
395         const char *name = flags & OBJ_KEY ? obj : tps->name;
396
397         return ast_str_case_hash(name);
398 }
399
400 /* compare callback for astobj2 */
401 static int tps_cmp_cb(void *obj, void *arg, int flags)
402 {
403         struct ast_taskprocessor *lhs = obj, *rhs = arg;
404         const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
405
406         return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
407 }
408
409 /* destroy the taskprocessor */
410 static void tps_taskprocessor_destroy(void *tps)
411 {
412         struct ast_taskprocessor *t = tps;
413
414         if (!tps) {
415                 ast_log(LOG_ERROR, "missing taskprocessor\n");
416                 return;
417         }
418         ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
419         /* free it */
420         if (t->stats) {
421                 ast_free(t->stats);
422                 t->stats = NULL;
423         }
424         ast_free((char *) t->name);
425         if (t->listener) {
426                 /* This code should not be reached since the listener
427                  * should have been destroyed before the taskprocessor could
428                  * be destroyed
429                  */
430                 ao2_ref(t->listener, -1);
431                 t->listener = NULL;
432         }
433 }
434
435 /* pop the front task and return it */
436 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
437 {
438         struct tps_task *task;
439         SCOPED_AO2LOCK(lock, tps);
440
441         if (tps->shutting_down) {
442                 return NULL;
443         }
444
445         if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
446                 tps->tps_queue_size--;
447         }
448         return task;
449 }
450
451 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
452 {
453         return (tps) ? tps->tps_queue_size : -1;
454 }
455
456 /* taskprocessor name accessor */
457 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
458 {
459         if (!tps) {
460                 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
461                 return NULL;
462         }
463         return tps->name;
464 }
465
466 static void listener_destroy(void *obj)
467 {
468         struct ast_taskprocessor_listener *listener = obj;
469
470         listener->callbacks->destroy(listener->private_data);
471 }
472
473 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
474 {
475         listener->callbacks->shutdown(listener);
476         ao2_ref(listener->tps, -1);
477         listener->tps = NULL;
478 }
479
480 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks)
481 {
482         RAII_VAR(struct ast_taskprocessor_listener *, listener,
483                         ao2_alloc(sizeof(*listener), listener_destroy), ao2_cleanup);
484
485         if (!listener) {
486                 return NULL;
487         }
488         listener->callbacks = callbacks;
489         listener->private_data = listener->callbacks->alloc(listener);
490         if (!listener->private_data) {
491                 return NULL;
492         }
493
494         ao2_ref(listener, +1);
495         return listener;
496 }
497
498 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
499  * create the taskprocessor if we were told via ast_tps_options to return a reference only
500  * if it already exists */
501 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
502 {
503         struct ast_taskprocessor *p;
504         struct ast_taskprocessor_listener *listener;
505
506         if (ast_strlen_zero(name)) {
507                 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
508                 return NULL;
509         }
510         p = ao2_find(tps_singletons, name, OBJ_KEY);
511         if (p) {
512                 ao2_unlock(tps_singletons);
513                 return p;
514         }
515         if (create & TPS_REF_IF_EXISTS) {
516                 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
517                 return NULL;
518         }
519         /* Create a new taskprocessor. Start by creating a default listener */
520         listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks);
521         if (!listener) {
522                 return NULL;
523         }
524
525         p = ast_taskprocessor_create_with_listener(name, listener);
526         if (!p) {
527                 ao2_ref(listener, -1);
528                 return NULL;
529         }
530
531         /* Unref listener here since the taskprocessor has gained a reference to the listener */
532         ao2_ref(listener, -1);
533         return p;
534
535 }
536
537 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
538 {
539         RAII_VAR(struct ast_taskprocessor *, p,
540                         ao2_alloc(sizeof(*p), tps_taskprocessor_destroy),
541                         ao2_cleanup);
542
543         if (!p) {
544                 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
545                 return NULL;
546         }
547
548         if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
549                 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
550                 return NULL;
551         }
552         if (!(p->name = ast_strdup(name))) {
553                 ao2_ref(p, -1);
554                 return NULL;
555         }
556
557         ao2_ref(listener, +1);
558         p->listener = listener;
559
560         ao2_ref(p, +1);
561         listener->tps = p;
562
563         if (!(ao2_link(tps_singletons, p))) {
564                 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
565                 return NULL;
566         }
567
568         if (p->listener->callbacks->start(p->listener)) {
569                 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
570                 ast_taskprocessor_unreference(p);
571                 return NULL;
572         }
573
574         /* RAII_VAR will decrement the refcount at the end of the function.
575          * Since we want to pass back a reference to p, we bump the refcount
576          */
577         ao2_ref(p, +1);
578         return p;
579 }
580
581 /* decrement the taskprocessor reference count and unlink from the container if necessary */
582 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
583 {
584         if (!tps) {
585                 return NULL;
586         }
587
588         if (ao2_ref(tps, -1) > 3) {
589                 return NULL;
590         }
591         /* If we're down to 3 references, then those must be:
592          * 1. The reference we just got rid of
593          * 2. The container
594          * 3. The listener
595          */
596         ao2_unlink(tps_singletons, tps);
597         listener_shutdown(tps->listener);
598         return NULL;
599 }
600
601 /* push the task into the taskprocessor queue */
602 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
603 {
604         struct tps_task *t;
605         int previous_size;
606
607         if (!tps || !task_exe) {
608                 ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
609                 return -1;
610         }
611         if (!(t = tps_task_alloc(task_exe, datap))) {
612                 ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
613                 return -1;
614         }
615         ao2_lock(tps);
616         AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
617         previous_size = tps->tps_queue_size++;
618         ao2_unlock(tps);
619         tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
620         return 0;
621 }
622
623 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
624 {
625         struct tps_task *t;
626         int size;
627
628         if (!(t = tps_taskprocessor_pop(tps))) {
629                 return 0;
630         }
631
632         t->execute(t->datap);
633
634         tps_task_free(t);
635
636         ao2_lock(tps);
637         size = tps_taskprocessor_depth(tps);
638         if (tps->stats) {
639                 tps->stats->_tasks_processed_count++;
640                 if (size > tps->stats->max_qsize) {
641                         tps->stats->max_qsize = size;
642                 }
643         }
644         ao2_unlock(tps);
645
646         if (size == 0) {
647                 tps->listener->callbacks->emptied(tps->listener);
648                 return 0;
649         }
650         return 1;
651 }