taskprocessor.c: Add CLI "core ping taskprocessor" missing unlock.
[asterisk/asterisk.git] / main / taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007-2013, Digium, Inc.
5  *
6  * Dwayne M. Hubbard <dhubbard@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22  *
23  * \author Dwayne Hubbard <dhubbard@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE()
33
34 #include "asterisk/_private.h"
35 #include "asterisk/module.h"
36 #include "asterisk/time.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/cli.h"
39 #include "asterisk/taskprocessor.h"
40 #include "asterisk/sem.h"
41
42 /*!
43  * \brief tps_task structure is queued to a taskprocessor
44  *
45  * tps_tasks are processed in FIFO order and freed by the taskprocessing
46  * thread after the task handler returns.  The callback function that is assigned
47  * to the execute() function pointer is responsible for releasing datap resources if necessary.
48  */
49 struct tps_task {
50         /*! \brief The execute() task callback function pointer */
51         union {
52                 int (*execute)(void *datap);
53                 int (*execute_local)(struct ast_taskprocessor_local *local);
54         } callback;
55         /*! \brief The data pointer for the task execute() function */
56         void *datap;
57         /*! \brief AST_LIST_ENTRY overhead */
58         AST_LIST_ENTRY(tps_task) list;
59         unsigned int wants_local:1;
60 };
61
62 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
63 struct tps_taskprocessor_stats {
64         /*! \brief This is the maximum number of tasks queued at any one time */
65         unsigned long max_qsize;
66         /*! \brief This is the current number of tasks processed */
67         unsigned long _tasks_processed_count;
68 };
69
70 /*! \brief A ast_taskprocessor structure is a singleton by name */
71 struct ast_taskprocessor {
72         /*! \brief Friendly name of the taskprocessor */
73         const char *name;
74         /*! \brief Taskprocessor statistics */
75         struct tps_taskprocessor_stats *stats;
76         void *local_data;
77         /*! \brief Taskprocessor current queue size */
78         long tps_queue_size;
79         /*! \brief Taskprocessor queue */
80         AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
81         struct ast_taskprocessor_listener *listener;
82         /*! Current thread executing the tasks */
83         pthread_t thread;
84         /*! Indicates if the taskprocessor is currently executing a task */
85         unsigned int executing:1;
86         /*! Indicates that a high water warning has been issued on this task processor */
87         unsigned int high_water_warned:1;
88 };
89
90 /*!
91  * \brief A listener for taskprocessors
92  *
93  * \since 12.0.0
94  *
95  * When a taskprocessor's state changes, the listener
96  * is notified of the change. This allows for tasks
97  * to be addressed in whatever way is appropriate for
98  * the module using the taskprocessor.
99  */
100 struct ast_taskprocessor_listener {
101         /*! The callbacks the taskprocessor calls into to notify of state changes */
102         const struct ast_taskprocessor_listener_callbacks *callbacks;
103         /*! The taskprocessor that the listener is listening to */
104         struct ast_taskprocessor *tps;
105         /*! Data private to the listener */
106         void *user_data;
107 };
108
109 #define TPS_MAX_BUCKETS 7
110 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
111 static struct ao2_container *tps_singletons;
112
113 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
114 static ast_cond_t cli_ping_cond;
115
116 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
117 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
118
119 /*! \brief The astobj2 hash callback for taskprocessors */
120 static int tps_hash_cb(const void *obj, const int flags);
121 /*! \brief The astobj2 compare callback for taskprocessors */
122 static int tps_cmp_cb(void *obj, void *arg, int flags);
123
124 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
125 static void tps_taskprocessor_destroy(void *tps);
126
127 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
128 static int tps_ping_handler(void *datap);
129
130 /*! \brief Remove the front task off the taskprocessor queue */
131 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
132
133 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
134 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
135
136 static struct ast_cli_entry taskprocessor_clis[] = {
137         AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
138         AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
139 };
140
141 struct default_taskprocessor_listener_pvt {
142         pthread_t poll_thread;
143         int dead;
144         struct ast_sem sem;
145 };
146
147 static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
148 {
149         ast_assert(pvt->dead);
150         ast_sem_destroy(&pvt->sem);
151         ast_free(pvt);
152 }
153
154 static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
155 {
156         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
157
158         default_listener_pvt_destroy(pvt);
159
160         listener->user_data = NULL;
161 }
162
163 /*!
164  * \brief Function that processes tasks in the taskprocessor
165  * \internal
166  */
167 static void *default_tps_processing_function(void *data)
168 {
169         struct ast_taskprocessor_listener *listener = data;
170         struct ast_taskprocessor *tps = listener->tps;
171         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
172         int sem_value;
173         int res;
174
175         while (!pvt->dead) {
176                 res = ast_sem_wait(&pvt->sem);
177                 if (res != 0 && errno != EINTR) {
178                         ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
179                                 strerror(errno));
180                         /* Just give up */
181                         break;
182                 }
183                 ast_taskprocessor_execute(tps);
184         }
185
186         /* No posting to a dead taskprocessor! */
187         res = ast_sem_getvalue(&pvt->sem, &sem_value);
188         ast_assert(res == 0 && sem_value == 0);
189
190         /* Free the shutdown reference (see default_listener_shutdown) */
191         ao2_t_ref(listener->tps, -1, "tps-shutdown");
192
193         return NULL;
194 }
195
196 static int default_listener_start(struct ast_taskprocessor_listener *listener)
197 {
198         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
199
200         if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
201                 return -1;
202         }
203
204         return 0;
205 }
206
207 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
208 {
209         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
210
211         if (ast_sem_post(&pvt->sem) != 0) {
212                 ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
213                         strerror(errno));
214         }
215 }
216
217 static int default_listener_die(void *data)
218 {
219         struct default_taskprocessor_listener_pvt *pvt = data;
220         pvt->dead = 1;
221         return 0;
222 }
223
224 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
225 {
226         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
227         int res;
228
229         /* Hold a reference during shutdown */
230         ao2_t_ref(listener->tps, +1, "tps-shutdown");
231
232         ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
233
234         ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
235
236         if (pthread_equal(pthread_self(), pvt->poll_thread)) {
237                 res = pthread_detach(pvt->poll_thread);
238                 if (res != 0) {
239                         ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
240                 }
241         } else {
242                 res = pthread_join(pvt->poll_thread, NULL);
243                 if (res != 0) {
244                         ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
245                 }
246         }
247         pvt->poll_thread = AST_PTHREADT_NULL;
248 }
249
250 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
251         .start = default_listener_start,
252         .task_pushed = default_task_pushed,
253         .shutdown = default_listener_shutdown,
254         .dtor = default_listener_pvt_dtor,
255 };
256
257 /*!
258  * \internal
259  * \brief Clean up resources on Asterisk shutdown
260  */
261 static void tps_shutdown(void)
262 {
263         ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
264         ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
265         tps_singletons = NULL;
266 }
267
268 /* initialize the taskprocessor container and register CLI operations */
269 int ast_tps_init(void)
270 {
271         if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
272                 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
273                 return -1;
274         }
275
276         ast_cond_init(&cli_ping_cond, NULL);
277
278         ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
279
280         ast_register_cleanup(tps_shutdown);
281
282         return 0;
283 }
284
285 /* allocate resources for the task */
286 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
287 {
288         struct tps_task *t;
289         if (!task_exe) {
290                 ast_log(LOG_ERROR, "task_exe is NULL!\n");
291                 return NULL;
292         }
293
294         t = ast_calloc(1, sizeof(*t));
295         if (!t) {
296                 ast_log(LOG_ERROR, "failed to allocate task!\n");
297                 return NULL;
298         }
299
300         t->callback.execute = task_exe;
301         t->datap = datap;
302
303         return t;
304 }
305
306 static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
307 {
308         struct tps_task *t;
309         if (!task_exe) {
310                 ast_log(LOG_ERROR, "task_exe is NULL!\n");
311                 return NULL;
312         }
313
314         t = ast_calloc(1, sizeof(*t));
315         if (!t) {
316                 ast_log(LOG_ERROR, "failed to allocate task!\n");
317                 return NULL;
318         }
319
320         t->callback.execute_local = task_exe;
321         t->datap = datap;
322         t->wants_local = 1;
323
324         return t;
325 }
326
327 /* release task resources */
328 static void *tps_task_free(struct tps_task *task)
329 {
330         ast_free(task);
331         return NULL;
332 }
333
334 /* taskprocessor tab completion */
335 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
336 {
337         int tklen;
338         int wordnum = 0;
339         char *name = NULL;
340         struct ao2_iterator i;
341
342         if (a->pos != 3)
343                 return NULL;
344
345         tklen = strlen(a->word);
346         i = ao2_iterator_init(tps_singletons, 0);
347         while ((p = ao2_iterator_next(&i))) {
348                 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
349                         name = ast_strdup(p->name);
350                         ao2_ref(p, -1);
351                         break;
352                 }
353                 ao2_ref(p, -1);
354         }
355         ao2_iterator_destroy(&i);
356         return name;
357 }
358
359 /* ping task handling function */
360 static int tps_ping_handler(void *datap)
361 {
362         ast_mutex_lock(&cli_ping_cond_lock);
363         ast_cond_signal(&cli_ping_cond);
364         ast_mutex_unlock(&cli_ping_cond_lock);
365         return 0;
366 }
367
368 /* ping the specified taskprocessor and display the ping time on the CLI */
369 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
370 {
371         struct timeval begin, end, delta;
372         const char *name;
373         struct timeval when;
374         struct timespec ts;
375         struct ast_taskprocessor *tps = NULL;
376
377         switch (cmd) {
378         case CLI_INIT:
379                 e->command = "core ping taskprocessor";
380                 e->usage =
381                         "Usage: core ping taskprocessor <taskprocessor>\n"
382                         "       Displays the time required for a task to be processed\n";
383                 return NULL;
384         case CLI_GENERATE:
385                 return tps_taskprocessor_tab_complete(tps, a);
386         }
387
388         if (a->argc != 4)
389                 return CLI_SHOWUSAGE;
390
391         name = a->argv[3];
392         if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
393                 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
394                 return CLI_SUCCESS;
395         }
396         ast_cli(a->fd, "\npinging %s ...", name);
397         when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
398         ts.tv_sec = when.tv_sec;
399         ts.tv_nsec = when.tv_usec * 1000;
400         ast_mutex_lock(&cli_ping_cond_lock);
401         if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
402                 ast_mutex_unlock(&cli_ping_cond_lock);
403                 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
404                 ao2_ref(tps, -1);
405                 return CLI_FAILURE;
406         }
407         ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
408         ast_mutex_unlock(&cli_ping_cond_lock);
409         end = ast_tvnow();
410         delta = ast_tvsub(end, begin);
411         ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
412         ao2_ref(tps, -1);
413         return CLI_SUCCESS;
414 }
415
416 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
417 {
418         char name[256];
419         int tcount;
420         unsigned long qsize;
421         unsigned long maxqsize;
422         unsigned long processed;
423         struct ast_taskprocessor *p;
424         struct ao2_iterator i;
425
426         switch (cmd) {
427         case CLI_INIT:
428                 e->command = "core show taskprocessors";
429                 e->usage =
430                         "Usage: core show taskprocessors\n"
431                         "       Shows a list of instantiated task processors and their statistics\n";
432                 return NULL;
433         case CLI_GENERATE:
434                 return NULL;
435         }
436
437         if (a->argc != e->args)
438                 return CLI_SHOWUSAGE;
439
440         ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
441         i = ao2_iterator_init(tps_singletons, 0);
442         while ((p = ao2_iterator_next(&i))) {
443                 ast_copy_string(name, p->name, sizeof(name));
444                 qsize = p->tps_queue_size;
445                 maxqsize = p->stats->max_qsize;
446                 processed = p->stats->_tasks_processed_count;
447                 ast_cli(a->fd, "\n%24s   %17lu %12lu %12lu", name, processed, qsize, maxqsize);
448                 ao2_ref(p, -1);
449         }
450         ao2_iterator_destroy(&i);
451         tcount = ao2_container_count(tps_singletons);
452         ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
453         return CLI_SUCCESS;
454 }
455
456 /* hash callback for astobj2 */
457 static int tps_hash_cb(const void *obj, const int flags)
458 {
459         const struct ast_taskprocessor *tps = obj;
460         const char *name = flags & OBJ_KEY ? obj : tps->name;
461
462         return ast_str_case_hash(name);
463 }
464
465 /* compare callback for astobj2 */
466 static int tps_cmp_cb(void *obj, void *arg, int flags)
467 {
468         struct ast_taskprocessor *lhs = obj, *rhs = arg;
469         const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
470
471         return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
472 }
473
474 /* destroy the taskprocessor */
475 static void tps_taskprocessor_destroy(void *tps)
476 {
477         struct ast_taskprocessor *t = tps;
478         struct tps_task *task;
479
480         if (!tps) {
481                 ast_log(LOG_ERROR, "missing taskprocessor\n");
482                 return;
483         }
484         ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
485         /* free it */
486         ast_free(t->stats);
487         t->stats = NULL;
488         ast_free((char *) t->name);
489         if (t->listener) {
490                 ao2_ref(t->listener, -1);
491                 t->listener = NULL;
492         }
493         while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
494                 tps_task_free(task);
495         }
496 }
497
498 /* pop the front task and return it */
499 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
500 {
501         struct tps_task *task;
502
503         if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
504                 tps->tps_queue_size--;
505         }
506         return task;
507 }
508
509 long ast_taskprocessor_size(struct ast_taskprocessor *tps)
510 {
511         return (tps) ? tps->tps_queue_size : -1;
512 }
513
514 /* taskprocessor name accessor */
515 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
516 {
517         if (!tps) {
518                 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
519                 return NULL;
520         }
521         return tps->name;
522 }
523
524 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
525 {
526         listener->callbacks->shutdown(listener);
527         ao2_ref(listener->tps, -1);
528 }
529
530 static void taskprocessor_listener_dtor(void *obj)
531 {
532         struct ast_taskprocessor_listener *listener = obj;
533
534         if (listener->callbacks->dtor) {
535                 listener->callbacks->dtor(listener);
536         }
537 }
538
539 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
540 {
541         struct ast_taskprocessor_listener *listener;
542
543         listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
544         if (!listener) {
545                 return NULL;
546         }
547         listener->callbacks = callbacks;
548         listener->user_data = user_data;
549
550         return listener;
551 }
552
553 struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
554 {
555         ao2_ref(listener->tps, +1);
556         return listener->tps;
557 }
558
559 void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
560 {
561         return listener->user_data;
562 }
563
564 static void *default_listener_pvt_alloc(void)
565 {
566         struct default_taskprocessor_listener_pvt *pvt;
567
568         pvt = ast_calloc(1, sizeof(*pvt));
569         if (!pvt) {
570                 return NULL;
571         }
572         pvt->poll_thread = AST_PTHREADT_NULL;
573         if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
574                 ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
575                 ast_free(pvt);
576                 return NULL;
577         }
578         return pvt;
579 }
580
581 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
582 {
583         RAII_VAR(struct ast_taskprocessor *, p,
584                         ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
585
586         if (!p) {
587                 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
588                 return NULL;
589         }
590
591         if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
592                 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
593                 return NULL;
594         }
595         if (!(p->name = ast_strdup(name))) {
596                 return NULL;
597         }
598
599         ao2_ref(listener, +1);
600         p->listener = listener;
601
602         p->thread = AST_PTHREADT_NULL;
603
604         ao2_ref(p, +1);
605         listener->tps = p;
606
607         if (!(ao2_link(tps_singletons, p))) {
608                 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
609                 return NULL;
610         }
611
612         if (p->listener->callbacks->start(p->listener)) {
613                 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
614                 ast_taskprocessor_unreference(p);
615                 return NULL;
616         }
617
618         /* RAII_VAR will decrement the refcount at the end of the function.
619          * Since we want to pass back a reference to p, we bump the refcount
620          */
621         ao2_ref(p, +1);
622         return p;
623
624 }
625
626 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
627  * create the taskprocessor if we were told via ast_tps_options to return a reference only
628  * if it already exists */
629 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
630 {
631         struct ast_taskprocessor *p;
632         struct ast_taskprocessor_listener *listener;
633         struct default_taskprocessor_listener_pvt *pvt;
634
635         if (ast_strlen_zero(name)) {
636                 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
637                 return NULL;
638         }
639         p = ao2_find(tps_singletons, name, OBJ_KEY);
640         if (p) {
641                 return p;
642         }
643         if (create & TPS_REF_IF_EXISTS) {
644                 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
645                 return NULL;
646         }
647         /* Create a new taskprocessor. Start by creating a default listener */
648         pvt = default_listener_pvt_alloc();
649         if (!pvt) {
650                 return NULL;
651         }
652         listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
653         if (!listener) {
654                 default_listener_pvt_destroy(pvt);
655                 return NULL;
656         }
657
658         p = __allocate_taskprocessor(name, listener);
659         if (!p) {
660                 ao2_ref(listener, -1);
661                 return NULL;
662         }
663
664         /* Unref listener here since the taskprocessor has gained a reference to the listener */
665         ao2_ref(listener, -1);
666         return p;
667 }
668
669 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
670 {
671         struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
672
673         if (p) {
674                 ast_taskprocessor_unreference(p);
675                 return NULL;
676         }
677         return __allocate_taskprocessor(name, listener);
678 }
679
680 void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
681         void *local_data)
682 {
683         SCOPED_AO2LOCK(lock, tps);
684         tps->local_data = local_data;
685 }
686
687 /* decrement the taskprocessor reference count and unlink from the container if necessary */
688 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
689 {
690         if (!tps) {
691                 return NULL;
692         }
693
694         /* To prevent another thread from finding and getting a reference to this
695          * taskprocessor we hold the singletons lock. If we didn't do this then
696          * they may acquire it and find that the listener has been shut down.
697          */
698         ao2_lock(tps_singletons);
699
700         if (ao2_ref(tps, -1) > 3) {
701                 ao2_unlock(tps_singletons);
702                 return NULL;
703         }
704
705         /* If we're down to 3 references, then those must be:
706          * 1. The reference we just got rid of
707          * 2. The container
708          * 3. The listener
709          */
710         ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
711         ao2_unlock(tps_singletons);
712
713         listener_shutdown(tps->listener);
714         return NULL;
715 }
716
717 /* push the task into the taskprocessor queue */
718 static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
719 {
720         int previous_size;
721         int was_empty;
722
723         if (!tps) {
724                 ast_log(LOG_ERROR, "tps is NULL!\n");
725                 return -1;
726         }
727
728         if (!t) {
729                 ast_log(LOG_ERROR, "t is NULL!\n");
730                 return -1;
731         }
732
733         ao2_lock(tps);
734         AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
735         previous_size = tps->tps_queue_size++;
736
737         if (previous_size >= AST_TASKPROCESSOR_HIGH_WATER_LEVEL && !tps->high_water_warned) {
738                 ast_log(LOG_WARNING, "The '%s' task processor queue reached %d scheduled tasks.\n",
739                         tps->name, previous_size);
740                 tps->high_water_warned = 1;
741         }
742
743         /* The currently executing task counts as still in queue */
744         was_empty = tps->executing ? 0 : previous_size == 0;
745         ao2_unlock(tps);
746         tps->listener->callbacks->task_pushed(tps->listener, was_empty);
747         return 0;
748 }
749
750 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
751 {
752         return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
753 }
754
755 int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
756 {
757         return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
758 }
759
760 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
761 {
762         struct ast_taskprocessor_local local;
763         struct tps_task *t;
764         long size;
765
766         ao2_lock(tps);
767         t = tps_taskprocessor_pop(tps);
768         if (!t) {
769                 ao2_unlock(tps);
770                 return 0;
771         }
772
773         tps->thread = pthread_self();
774         tps->executing = 1;
775
776         if (t->wants_local) {
777                 local.local_data = tps->local_data;
778                 local.data = t->datap;
779         }
780         ao2_unlock(tps);
781
782         if (t->wants_local) {
783                 t->callback.execute_local(&local);
784         } else {
785                 t->callback.execute(t->datap);
786         }
787         tps_task_free(t);
788
789         ao2_lock(tps);
790         tps->thread = AST_PTHREADT_NULL;
791         /* We need to check size in the same critical section where we reset the
792          * executing bit. Avoids a race condition where a task is pushed right
793          * after we pop an empty stack.
794          */
795         tps->executing = 0;
796         size = ast_taskprocessor_size(tps);
797         /* If we executed a task, bump the stats */
798         if (tps->stats) {
799                 tps->stats->_tasks_processed_count++;
800                 if (size > tps->stats->max_qsize) {
801                         tps->stats->max_qsize = size;
802                 }
803         }
804         ao2_unlock(tps);
805
806         /* If we executed a task, check for the transition to empty */
807         if (size == 0 && tps->listener->callbacks->emptied) {
808                 tps->listener->callbacks->emptied(tps->listener);
809         }
810         return size > 0;
811 }
812
813 int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
814 {
815         int is_task;
816
817         ao2_lock(tps);
818         is_task = pthread_equal(tps->thread, pthread_self());
819         ao2_unlock(tps);
820         return is_task;
821 }