0b6a2a817ec10d7d2e095bbb7665c1d35674c2c5
[asterisk/asterisk.git] / main / taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007-2013, Digium, Inc.
5  *
6  * Dwayne M. Hubbard <dhubbard@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22  *
23  * \author Dwayne Hubbard <dhubbard@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/_private.h"
35 #include "asterisk/module.h"
36 #include "asterisk/time.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/cli.h"
39 #include "asterisk/taskprocessor.h"
40
41 /*!
42  * \brief tps_task structure is queued to a taskprocessor
43  *
44  * tps_tasks are processed in FIFO order and freed by the taskprocessing
45  * thread after the task handler returns.  The callback function that is assigned
46  * to the execute() function pointer is responsible for releasing datap resources if necessary.
47  */
48 struct tps_task {
49         /*! \brief The execute() task callback function pointer */
50         int (*execute)(void *datap);
51         /*! \brief The data pointer for the task execute() function */
52         void *datap;
53         /*! \brief AST_LIST_ENTRY overhead */
54         AST_LIST_ENTRY(tps_task) list;
55 };
56
57 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
58 struct tps_taskprocessor_stats {
59         /*! \brief This is the maximum number of tasks queued at any one time */
60         unsigned long max_qsize;
61         /*! \brief This is the current number of tasks processed */
62         unsigned long _tasks_processed_count;
63 };
64
65 /*! \brief A ast_taskprocessor structure is a singleton by name */
66 struct ast_taskprocessor {
67         /*! \brief Friendly name of the taskprocessor */
68         const char *name;
69         /*! \brief Taskprocessor statistics */
70         struct tps_taskprocessor_stats *stats;
71         /*! \brief Taskprocessor current queue size */
72         long tps_queue_size;
73         /*! \brief Taskprocessor queue */
74         AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
75         /*! \brief Taskprocessor singleton list entry */
76         AST_LIST_ENTRY(ast_taskprocessor) list;
77         struct ast_taskprocessor_listener *listener;
78         /*! Indicates if the taskprocessor is in the process of shuting down */
79         unsigned int shutting_down:1;
80 };
81
82 /*!
83  * \brief A listener for taskprocessors
84  *
85  * \since 12.0.0
86  *
87  * When a taskprocessor's state changes, the listener
88  * is notified of the change. This allows for tasks
89  * to be addressed in whatever way is appropriate for
90  * the module using the taskprocessor.
91  */
92 struct ast_taskprocessor_listener {
93         /*! The callbacks the taskprocessor calls into to notify of state changes */
94         const struct ast_taskprocessor_listener_callbacks *callbacks;
95         /*! The taskprocessor that the listener is listening to */
96         struct ast_taskprocessor *tps;
97         /*! Data private to the listener */
98         void *user_data;
99 };
100
101 #define TPS_MAX_BUCKETS 7
102 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
103 static struct ao2_container *tps_singletons;
104
105 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
106 static ast_cond_t cli_ping_cond;
107
108 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
109 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
110
111 /*! \brief The astobj2 hash callback for taskprocessors */
112 static int tps_hash_cb(const void *obj, const int flags);
113 /*! \brief The astobj2 compare callback for taskprocessors */
114 static int tps_cmp_cb(void *obj, void *arg, int flags);
115
116 /*! \brief The task processing function executed by a taskprocessor */
117 static void *tps_processing_function(void *data);
118
119 /*! \brief Destroy the taskprocessor when its refcount reaches zero */
120 static void tps_taskprocessor_destroy(void *tps);
121
122 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
123 static int tps_ping_handler(void *datap);
124
125 /*! \brief Remove the front task off the taskprocessor queue */
126 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps);
127
128 /*! \brief Return the size of the taskprocessor queue */
129 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps);
130
131 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
132 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
133
134 static struct ast_cli_entry taskprocessor_clis[] = {
135         AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
136         AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
137 };
138
139 struct default_taskprocessor_listener_pvt {
140         pthread_t poll_thread;
141         ast_mutex_t lock;
142         ast_cond_t cond;
143         int wake_up;
144         int dead;
145 };
146
147
148 static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
149 {
150         SCOPED_MUTEX(lock, &pvt->lock);
151         pvt->wake_up = 1;
152         pvt->dead = should_die;
153         ast_cond_signal(&pvt->cond);
154 }
155
156 static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
157 {
158         SCOPED_MUTEX(lock, &pvt->lock);
159         while (!pvt->wake_up) {
160                 ast_cond_wait(&pvt->cond, lock);
161         }
162         pvt->wake_up = 0;
163         return pvt->dead;
164 }
165
166 /*!
167  * \brief Function that processes tasks in the taskprocessor
168  * \internal
169  */
170 static void *tps_processing_function(void *data)
171 {
172         struct ast_taskprocessor_listener *listener = data;
173         struct ast_taskprocessor *tps = listener->tps;
174         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
175         int dead = 0;
176
177         while (!dead) {
178                 if (!ast_taskprocessor_execute(tps)) {
179                         dead = default_tps_idle(pvt);
180                 }
181         }
182         return NULL;
183 }
184
185 static int default_listener_start(struct ast_taskprocessor_listener *listener)
186 {
187         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
188
189         if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
190                 return -1;
191         }
192
193         return 0;
194 }
195
196 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
197 {
198         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
199
200         if (was_empty) {
201                 default_tps_wake_up(pvt, 0);
202         }
203 }
204
205 static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
206 {
207         ast_mutex_destroy(&pvt->lock);
208         ast_cond_destroy(&pvt->cond);
209         ast_free(pvt);
210 }
211
212 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
213 {
214         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
215         default_tps_wake_up(pvt, 1);
216         pthread_join(pvt->poll_thread, NULL);
217         pvt->poll_thread = AST_PTHREADT_NULL;
218         default_listener_pvt_destroy(pvt);
219 }
220
221 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
222         .start = default_listener_start,
223         .task_pushed = default_task_pushed,
224         .shutdown = default_listener_shutdown,
225 };
226
227 /*!
228  * \internal
229  * \brief Clean up resources on Asterisk shutdown
230  */
231 static void tps_shutdown(void)
232 {
233         ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
234         ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
235         tps_singletons = NULL;
236 }
237
238 /* initialize the taskprocessor container and register CLI operations */
239 int ast_tps_init(void)
240 {
241         if (!(tps_singletons = ao2_container_alloc(TPS_MAX_BUCKETS, tps_hash_cb, tps_cmp_cb))) {
242                 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
243                 return -1;
244         }
245
246         ast_cond_init(&cli_ping_cond, NULL);
247
248         ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
249
250         ast_register_atexit(tps_shutdown);
251
252         return 0;
253 }
254
255 /* allocate resources for the task */
256 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
257 {
258         struct tps_task *t;
259         if ((t = ast_calloc(1, sizeof(*t)))) {
260                 t->execute = task_exe;
261                 t->datap = datap;
262         }
263         return t;
264 }
265
266 /* release task resources */
267 static void *tps_task_free(struct tps_task *task)
268 {
269         if (task) {
270                 ast_free(task);
271         }
272         return NULL;
273 }
274
275 /* taskprocessor tab completion */
276 static char *tps_taskprocessor_tab_complete(struct ast_taskprocessor *p, struct ast_cli_args *a)
277 {
278         int tklen;
279         int wordnum = 0;
280         char *name = NULL;
281         struct ao2_iterator i;
282
283         if (a->pos != 3)
284                 return NULL;
285
286         tklen = strlen(a->word);
287         i = ao2_iterator_init(tps_singletons, 0);
288         while ((p = ao2_iterator_next(&i))) {
289                 if (!strncasecmp(a->word, p->name, tklen) && ++wordnum > a->n) {
290                         name = ast_strdup(p->name);
291                         ao2_ref(p, -1);
292                         break;
293                 }
294                 ao2_ref(p, -1);
295         }
296         ao2_iterator_destroy(&i);
297         return name;
298 }
299
300 /* ping task handling function */
301 static int tps_ping_handler(void *datap)
302 {
303         ast_mutex_lock(&cli_ping_cond_lock);
304         ast_cond_signal(&cli_ping_cond);
305         ast_mutex_unlock(&cli_ping_cond_lock);
306         return 0;
307 }
308
309 /* ping the specified taskprocessor and display the ping time on the CLI */
310 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
311 {
312         struct timeval begin, end, delta;
313         const char *name;
314         struct timeval when;
315         struct timespec ts;
316         struct ast_taskprocessor *tps = NULL;
317
318         switch (cmd) {
319         case CLI_INIT:
320                 e->command = "core ping taskprocessor";
321                 e->usage =
322                         "Usage: core ping taskprocessor <taskprocessor>\n"
323                         "       Displays the time required for a task to be processed\n";
324                 return NULL;
325         case CLI_GENERATE:
326                 return tps_taskprocessor_tab_complete(tps, a);
327         }
328
329         if (a->argc != 4)
330                 return CLI_SHOWUSAGE;
331
332         name = a->argv[3];
333         if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
334                 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
335                 return CLI_SUCCESS;
336         }
337         ast_cli(a->fd, "\npinging %s ...", name);
338         when = ast_tvadd((begin = ast_tvnow()), ast_samp2tv(1000, 1000));
339         ts.tv_sec = when.tv_sec;
340         ts.tv_nsec = when.tv_usec * 1000;
341         ast_mutex_lock(&cli_ping_cond_lock);
342         if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
343                 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
344                 ao2_ref(tps, -1);
345                 return CLI_FAILURE;
346         }
347         ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
348         ast_mutex_unlock(&cli_ping_cond_lock);
349         end = ast_tvnow();
350         delta = ast_tvsub(end, begin);
351         ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
352         ao2_ref(tps, -1);
353         return CLI_SUCCESS;
354 }
355
356 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
357 {
358         char name[256];
359         int tcount;
360         unsigned long qsize;
361         unsigned long maxqsize;
362         unsigned long processed;
363         struct ast_taskprocessor *p;
364         struct ao2_iterator i;
365
366         switch (cmd) {
367         case CLI_INIT:
368                 e->command = "core show taskprocessors";
369                 e->usage =
370                         "Usage: core show taskprocessors\n"
371                         "       Shows a list of instantiated task processors and their statistics\n";
372                 return NULL;
373         case CLI_GENERATE:
374                 return NULL;
375         }
376
377         if (a->argc != e->args)
378                 return CLI_SHOWUSAGE;
379
380         ast_cli(a->fd, "\n\t+----- Processor -----+--- Processed ---+- In Queue -+- Max Depth -+");
381         i = ao2_iterator_init(tps_singletons, 0);
382         while ((p = ao2_iterator_next(&i))) {
383                 ast_copy_string(name, p->name, sizeof(name));
384                 qsize = p->tps_queue_size;
385                 maxqsize = p->stats->max_qsize;
386                 processed = p->stats->_tasks_processed_count;
387                 ast_cli(a->fd, "\n%24s   %17ld %12ld %12ld", name, processed, qsize, maxqsize);
388                 ao2_ref(p, -1);
389         }
390         ao2_iterator_destroy(&i);
391         tcount = ao2_container_count(tps_singletons);
392         ast_cli(a->fd, "\n\t+---------------------+-----------------+------------+-------------+\n\t%d taskprocessors\n\n", tcount);
393         return CLI_SUCCESS;
394 }
395
396 /* hash callback for astobj2 */
397 static int tps_hash_cb(const void *obj, const int flags)
398 {
399         const struct ast_taskprocessor *tps = obj;
400         const char *name = flags & OBJ_KEY ? obj : tps->name;
401
402         return ast_str_case_hash(name);
403 }
404
405 /* compare callback for astobj2 */
406 static int tps_cmp_cb(void *obj, void *arg, int flags)
407 {
408         struct ast_taskprocessor *lhs = obj, *rhs = arg;
409         const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
410
411         return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
412 }
413
414 /* destroy the taskprocessor */
415 static void tps_taskprocessor_destroy(void *tps)
416 {
417         struct ast_taskprocessor *t = tps;
418         struct tps_task *task;
419
420         if (!tps) {
421                 ast_log(LOG_ERROR, "missing taskprocessor\n");
422                 return;
423         }
424         ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
425         /* free it */
426         if (t->stats) {
427                 ast_free(t->stats);
428                 t->stats = NULL;
429         }
430         ast_free((char *) t->name);
431         if (t->listener) {
432                 /* This code should not be reached since the listener
433                  * should have been destroyed before the taskprocessor could
434                  * be destroyed
435                  */
436                 ao2_ref(t->listener, -1);
437                 t->listener = NULL;
438         }
439         while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
440                 tps_task_free(task);
441         }
442 }
443
444 /* pop the front task and return it */
445 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
446 {
447         struct tps_task *task;
448         SCOPED_AO2LOCK(lock, tps);
449
450         if (tps->shutting_down) {
451                 return NULL;
452         }
453
454         if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
455                 tps->tps_queue_size--;
456         }
457         return task;
458 }
459
460 static int tps_taskprocessor_depth(struct ast_taskprocessor *tps)
461 {
462         return (tps) ? tps->tps_queue_size : -1;
463 }
464
465 /* taskprocessor name accessor */
466 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
467 {
468         if (!tps) {
469                 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
470                 return NULL;
471         }
472         return tps->name;
473 }
474
475 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
476 {
477         listener->callbacks->shutdown(listener);
478         ao2_ref(listener->tps, -1);
479 }
480
481 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
482 {
483         RAII_VAR(struct ast_taskprocessor_listener *, listener,
484                         ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
485
486         if (!listener) {
487                 return NULL;
488         }
489         listener->callbacks = callbacks;
490         listener->user_data = user_data;
491
492         ao2_ref(listener, +1);
493         return listener;
494 }
495
496 struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
497 {
498         return listener->tps;
499 }
500
501 void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
502 {
503         return listener->user_data;
504 }
505
506 static void *default_listener_pvt_alloc(void)
507 {
508         struct default_taskprocessor_listener_pvt *pvt;
509
510         pvt = ast_calloc(1, sizeof(*pvt));
511         if (!pvt) {
512                 return NULL;
513         }
514         ast_cond_init(&pvt->cond, NULL);
515         ast_mutex_init(&pvt->lock);
516         pvt->poll_thread = AST_PTHREADT_NULL;
517         return pvt;
518 }
519
520 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
521 {
522         RAII_VAR(struct ast_taskprocessor *, p,
523                         ao2_alloc(sizeof(*p), tps_taskprocessor_destroy), ao2_cleanup);
524
525         if (!p) {
526                 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
527                 return NULL;
528         }
529
530         if (!(p->stats = ast_calloc(1, sizeof(*p->stats)))) {
531                 ast_log(LOG_WARNING, "failed to create taskprocessor stats for '%s'\n", name);
532                 return NULL;
533         }
534         if (!(p->name = ast_strdup(name))) {
535                 ao2_ref(p, -1);
536                 return NULL;
537         }
538
539         ao2_ref(listener, +1);
540         p->listener = listener;
541
542         ao2_ref(p, +1);
543         listener->tps = p;
544
545         if (!(ao2_link(tps_singletons, p))) {
546                 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
547                 return NULL;
548         }
549
550         if (p->listener->callbacks->start(p->listener)) {
551                 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n", p->name);
552                 ast_taskprocessor_unreference(p);
553                 return NULL;
554         }
555
556         /* RAII_VAR will decrement the refcount at the end of the function.
557          * Since we want to pass back a reference to p, we bump the refcount
558          */
559         ao2_ref(p, +1);
560         return p;
561
562 }
563
564 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
565  * create the taskprocessor if we were told via ast_tps_options to return a reference only
566  * if it already exists */
567 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
568 {
569         struct ast_taskprocessor *p;
570         struct ast_taskprocessor_listener *listener;
571         struct default_taskprocessor_listener_pvt *pvt;
572
573         if (ast_strlen_zero(name)) {
574                 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
575                 return NULL;
576         }
577         p = ao2_find(tps_singletons, name, OBJ_KEY);
578         if (p) {
579                 return p;
580         }
581         if (create & TPS_REF_IF_EXISTS) {
582                 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
583                 return NULL;
584         }
585         /* Create a new taskprocessor. Start by creating a default listener */
586         pvt = default_listener_pvt_alloc();
587         if (!pvt) {
588                 return NULL;
589         }
590         listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
591         if (!listener) {
592                 default_listener_pvt_destroy(pvt);
593                 return NULL;
594         }
595
596         p = __allocate_taskprocessor(name, listener);
597         if (!p) {
598                 default_listener_pvt_destroy(pvt);
599                 ao2_ref(listener, -1);
600                 return NULL;
601         }
602
603         /* Unref listener here since the taskprocessor has gained a reference to the listener */
604         ao2_ref(listener, -1);
605         return p;
606
607 }
608
609 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
610 {
611         struct ast_taskprocessor *p = ao2_find(tps_singletons, name, OBJ_KEY);
612
613         if (p) {
614                 ast_taskprocessor_unreference(p);
615                 return NULL;
616         }
617         return __allocate_taskprocessor(name, listener);
618 }
619
620 /* decrement the taskprocessor reference count and unlink from the container if necessary */
621 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
622 {
623         if (!tps) {
624                 return NULL;
625         }
626
627         if (ao2_ref(tps, -1) > 3) {
628                 return NULL;
629         }
630         /* If we're down to 3 references, then those must be:
631          * 1. The reference we just got rid of
632          * 2. The container
633          * 3. The listener
634          */
635         ao2_unlink(tps_singletons, tps);
636         listener_shutdown(tps->listener);
637         return NULL;
638 }
639
640 /* push the task into the taskprocessor queue */
641 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
642 {
643         struct tps_task *t;
644         int previous_size;
645
646         if (!tps || !task_exe) {
647                 ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
648                 return -1;
649         }
650         if (!(t = tps_task_alloc(task_exe, datap))) {
651                 ast_log(LOG_ERROR, "failed to allocate task!  Can't push to '%s'\n", tps->name);
652                 return -1;
653         }
654         ao2_lock(tps);
655         AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
656         previous_size = tps->tps_queue_size++;
657         ao2_unlock(tps);
658         tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
659         return 0;
660 }
661
662 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
663 {
664         struct tps_task *t;
665         int size;
666
667         if (!(t = tps_taskprocessor_pop(tps))) {
668                 return 0;
669         }
670
671         t->execute(t->datap);
672
673         tps_task_free(t);
674
675         ao2_lock(tps);
676         size = tps_taskprocessor_depth(tps);
677         if (tps->stats) {
678                 tps->stats->_tasks_processed_count++;
679                 if (size > tps->stats->max_qsize) {
680                         tps->stats->max_qsize = size;
681                 }
682         }
683         ao2_unlock(tps);
684
685         if (size == 0 && tps->listener->callbacks->emptied) {
686                 tps->listener->callbacks->emptied(tps->listener);
687                 return 0;
688         }
689         return 1;
690 }