main/taskprocessor: Increase max name length of taskprocessors
[asterisk/asterisk.git] / main / taskprocessor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2007-2013, Digium, Inc.
5  *
6  * Dwayne M. Hubbard <dhubbard@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Maintain a container of uniquely-named taskprocessor threads that can be shared across modules.
22  *
23  * \author Dwayne Hubbard <dhubbard@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/_private.h"
33 #include "asterisk/module.h"
34 #include "asterisk/time.h"
35 #include "asterisk/astobj2.h"
36 #include "asterisk/cli.h"
37 #include "asterisk/taskprocessor.h"
38 #include "asterisk/sem.h"
39
40 /*!
41  * \brief tps_task structure is queued to a taskprocessor
42  *
43  * tps_tasks are processed in FIFO order and freed by the taskprocessing
44  * thread after the task handler returns.  The callback function that is assigned
45  * to the execute() function pointer is responsible for releasing datap resources if necessary.
46  */
47 struct tps_task {
48         /*! \brief The execute() task callback function pointer */
49         union {
50                 int (*execute)(void *datap);
51                 int (*execute_local)(struct ast_taskprocessor_local *local);
52         } callback;
53         /*! \brief The data pointer for the task execute() function */
54         void *datap;
55         /*! \brief AST_LIST_ENTRY overhead */
56         AST_LIST_ENTRY(tps_task) list;
57         unsigned int wants_local:1;
58 };
59
60 /*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
61 struct tps_taskprocessor_stats {
62         /*! \brief This is the maximum number of tasks queued at any one time */
63         unsigned long max_qsize;
64         /*! \brief This is the current number of tasks processed */
65         unsigned long _tasks_processed_count;
66 };
67
68 /*! \brief A ast_taskprocessor structure is a singleton by name */
69 struct ast_taskprocessor {
70         /*! \brief Taskprocessor statistics */
71         struct tps_taskprocessor_stats stats;
72         void *local_data;
73         /*! \brief Taskprocessor current queue size */
74         long tps_queue_size;
75         /*! \brief Taskprocessor low water clear alert level */
76         long tps_queue_low;
77         /*! \brief Taskprocessor high water alert trigger level */
78         long tps_queue_high;
79         /*! \brief Taskprocessor queue */
80         AST_LIST_HEAD_NOLOCK(tps_queue, tps_task) tps_queue;
81         struct ast_taskprocessor_listener *listener;
82         /*! Current thread executing the tasks */
83         pthread_t thread;
84         /*! Indicates if the taskprocessor is currently executing a task */
85         unsigned int executing:1;
86         /*! Indicates that a high water warning has been issued on this task processor */
87         unsigned int high_water_warned:1;
88         /*! Indicates that a high water alert is active on this taskprocessor */
89         unsigned int high_water_alert:1;
90         /*! Indicates if the taskprocessor is currently suspended */
91         unsigned int suspended:1;
92         /*! \brief Anything before the first '/' in the name (if there is one) */
93         char *subsystem;
94         /*! \brief Friendly name of the taskprocessor.
95          * Subsystem is appended after the name's NULL terminator.
96          */
97         char name[0];
98 };
99
100 /*!
101  * \brief A listener for taskprocessors
102  *
103  * \since 12.0.0
104  *
105  * When a taskprocessor's state changes, the listener
106  * is notified of the change. This allows for tasks
107  * to be addressed in whatever way is appropriate for
108  * the module using the taskprocessor.
109  */
110 struct ast_taskprocessor_listener {
111         /*! The callbacks the taskprocessor calls into to notify of state changes */
112         const struct ast_taskprocessor_listener_callbacks *callbacks;
113         /*! The taskprocessor that the listener is listening to */
114         struct ast_taskprocessor *tps;
115         /*! Data private to the listener */
116         void *user_data;
117 };
118
119 /*!
120  * Keep track of which subsystems are in alert
121  * and how many of their taskprocessors are overloaded.
122  */
123 struct subsystem_alert {
124         unsigned int alert_count;
125         char subsystem[0];
126 };
127 static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
128
129 #ifdef LOW_MEMORY
130 #define TPS_MAX_BUCKETS 61
131 #else
132 /*! \brief Number of buckets in the tps_singletons container. */
133 #define TPS_MAX_BUCKETS 1567
134 #endif
135
136 /*! \brief tps_singletons is the astobj2 container for taskprocessor singletons */
137 static struct ao2_container *tps_singletons;
138
139 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition */
140 static ast_cond_t cli_ping_cond;
141
142 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> operation requires a ping condition lock */
143 AST_MUTEX_DEFINE_STATIC(cli_ping_cond_lock);
144
145 /*! \brief The astobj2 hash callback for taskprocessors */
146 static int tps_hash_cb(const void *obj, const int flags);
147 /*! \brief The astobj2 compare callback for taskprocessors */
148 static int tps_cmp_cb(void *obj, void *arg, int flags);
149
150 /*! \brief CLI <example>taskprocessor ping &lt;blah&gt;</example> handler function */
151 static int tps_ping_handler(void *datap);
152
153 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
154 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
155 static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
156
157 static struct ast_cli_entry taskprocessor_clis[] = {
158         AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
159         AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
160         AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
161 };
162
163 struct default_taskprocessor_listener_pvt {
164         pthread_t poll_thread;
165         int dead;
166         struct ast_sem sem;
167 };
168
169 static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
170 {
171         ast_assert(pvt->dead);
172         ast_sem_destroy(&pvt->sem);
173         ast_free(pvt);
174 }
175
176 static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
177 {
178         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
179
180         default_listener_pvt_destroy(pvt);
181
182         listener->user_data = NULL;
183 }
184
185 /*!
186  * \brief Function that processes tasks in the taskprocessor
187  * \internal
188  */
189 static void *default_tps_processing_function(void *data)
190 {
191         struct ast_taskprocessor_listener *listener = data;
192         struct ast_taskprocessor *tps = listener->tps;
193         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
194         int sem_value;
195         int res;
196
197         while (!pvt->dead) {
198                 res = ast_sem_wait(&pvt->sem);
199                 if (res != 0 && errno != EINTR) {
200                         ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
201                                 strerror(errno));
202                         /* Just give up */
203                         break;
204                 }
205                 ast_taskprocessor_execute(tps);
206         }
207
208         /* No posting to a dead taskprocessor! */
209         res = ast_sem_getvalue(&pvt->sem, &sem_value);
210         ast_assert(res == 0 && sem_value == 0);
211
212         /* Free the shutdown reference (see default_listener_shutdown) */
213         ao2_t_ref(listener->tps, -1, "tps-shutdown");
214
215         return NULL;
216 }
217
218 static int default_listener_start(struct ast_taskprocessor_listener *listener)
219 {
220         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
221
222         if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
223                 return -1;
224         }
225
226         return 0;
227 }
228
229 static void default_task_pushed(struct ast_taskprocessor_listener *listener, int was_empty)
230 {
231         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
232
233         if (ast_sem_post(&pvt->sem) != 0) {
234                 ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
235                         strerror(errno));
236         }
237 }
238
239 static int default_listener_die(void *data)
240 {
241         struct default_taskprocessor_listener_pvt *pvt = data;
242         pvt->dead = 1;
243         return 0;
244 }
245
246 static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
247 {
248         struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
249         int res;
250
251         /* Hold a reference during shutdown */
252         ao2_t_ref(listener->tps, +1, "tps-shutdown");
253
254         if (ast_taskprocessor_push(listener->tps, default_listener_die, pvt)) {
255                 /* This will cause the thread to exit early without completing tasks already
256                  * in the queue.  This is probably the least bad option in this situation. */
257                 default_listener_die(pvt);
258         }
259
260         ast_assert(pvt->poll_thread != AST_PTHREADT_NULL);
261
262         if (pthread_equal(pthread_self(), pvt->poll_thread)) {
263                 res = pthread_detach(pvt->poll_thread);
264                 if (res != 0) {
265                         ast_log(LOG_ERROR, "pthread_detach(): %s\n", strerror(errno));
266                 }
267         } else {
268                 res = pthread_join(pvt->poll_thread, NULL);
269                 if (res != 0) {
270                         ast_log(LOG_ERROR, "pthread_join(): %s\n", strerror(errno));
271                 }
272         }
273         pvt->poll_thread = AST_PTHREADT_NULL;
274 }
275
276 static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
277         .start = default_listener_start,
278         .task_pushed = default_task_pushed,
279         .shutdown = default_listener_shutdown,
280         .dtor = default_listener_pvt_dtor,
281 };
282
283 /*!
284  * \internal
285  * \brief Clean up resources on Asterisk shutdown
286  */
287 static void tps_shutdown(void)
288 {
289         ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
290         AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
291         AST_VECTOR_RW_FREE(&overloaded_subsystems);
292         ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
293         tps_singletons = NULL;
294 }
295
296 /* initialize the taskprocessor container and register CLI operations */
297 int ast_tps_init(void)
298 {
299         tps_singletons = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
300                 TPS_MAX_BUCKETS, tps_hash_cb, NULL, tps_cmp_cb);
301         if (!tps_singletons) {
302                 ast_log(LOG_ERROR, "taskprocessor container failed to initialize!\n");
303                 return -1;
304         }
305
306         if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
307                 ao2_ref(tps_singletons, -1);
308                 ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
309                 return -1;
310         }
311
312         ast_cond_init(&cli_ping_cond, NULL);
313
314         ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
315
316         ast_register_cleanup(tps_shutdown);
317
318         return 0;
319 }
320
321 /* allocate resources for the task */
322 static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
323 {
324         struct tps_task *t;
325         if (!task_exe) {
326                 ast_log(LOG_ERROR, "task_exe is NULL!\n");
327                 return NULL;
328         }
329
330         t = ast_calloc(1, sizeof(*t));
331         if (!t) {
332                 ast_log(LOG_ERROR, "failed to allocate task!\n");
333                 return NULL;
334         }
335
336         t->callback.execute = task_exe;
337         t->datap = datap;
338
339         return t;
340 }
341
342 static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
343 {
344         struct tps_task *t;
345         if (!task_exe) {
346                 ast_log(LOG_ERROR, "task_exe is NULL!\n");
347                 return NULL;
348         }
349
350         t = ast_calloc(1, sizeof(*t));
351         if (!t) {
352                 ast_log(LOG_ERROR, "failed to allocate task!\n");
353                 return NULL;
354         }
355
356         t->callback.execute_local = task_exe;
357         t->datap = datap;
358         t->wants_local = 1;
359
360         return t;
361 }
362
363 /* release task resources */
364 static void *tps_task_free(struct tps_task *task)
365 {
366         ast_free(task);
367         return NULL;
368 }
369
370 /* taskprocessor tab completion */
371 static char *tps_taskprocessor_tab_complete(struct ast_cli_args *a)
372 {
373         int tklen;
374         struct ast_taskprocessor *p;
375         struct ao2_iterator i;
376
377         if (a->pos != 3) {
378                 return NULL;
379         }
380
381         tklen = strlen(a->word);
382         i = ao2_iterator_init(tps_singletons, 0);
383         while ((p = ao2_iterator_next(&i))) {
384                 if (!strncasecmp(a->word, p->name, tklen)) {
385                         if (ast_cli_completion_add(ast_strdup(p->name))) {
386                                 ast_taskprocessor_unreference(p);
387                                 break;
388                         }
389                 }
390                 ast_taskprocessor_unreference(p);
391         }
392         ao2_iterator_destroy(&i);
393
394         return NULL;
395 }
396
397 /* ping task handling function */
398 static int tps_ping_handler(void *datap)
399 {
400         ast_mutex_lock(&cli_ping_cond_lock);
401         ast_cond_signal(&cli_ping_cond);
402         ast_mutex_unlock(&cli_ping_cond_lock);
403         return 0;
404 }
405
406 /* ping the specified taskprocessor and display the ping time on the CLI */
407 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
408 {
409         struct timeval begin, end, delta;
410         const char *name;
411         struct timeval when;
412         struct timespec ts;
413         struct ast_taskprocessor *tps;
414
415         switch (cmd) {
416         case CLI_INIT:
417                 e->command = "core ping taskprocessor";
418                 e->usage =
419                         "Usage: core ping taskprocessor <taskprocessor>\n"
420                         "       Displays the time required for a task to be processed\n";
421                 return NULL;
422         case CLI_GENERATE:
423                 return tps_taskprocessor_tab_complete(a);
424         }
425
426         if (a->argc != 4)
427                 return CLI_SHOWUSAGE;
428
429         name = a->argv[3];
430         if (!(tps = ast_taskprocessor_get(name, TPS_REF_IF_EXISTS))) {
431                 ast_cli(a->fd, "\nping failed: %s not found\n\n", name);
432                 return CLI_SUCCESS;
433         }
434         ast_cli(a->fd, "\npinging %s ...", name);
435
436         /*
437          * Wait up to 5 seconds for a ping reply.
438          *
439          * On a very busy system it could take awhile to get a
440          * ping response from some taskprocessors.
441          */
442         begin = ast_tvnow();
443         when = ast_tvadd(begin, ast_samp2tv(5000, 1000));
444         ts.tv_sec = when.tv_sec;
445         ts.tv_nsec = when.tv_usec * 1000;
446
447         ast_mutex_lock(&cli_ping_cond_lock);
448         if (ast_taskprocessor_push(tps, tps_ping_handler, 0) < 0) {
449                 ast_mutex_unlock(&cli_ping_cond_lock);
450                 ast_cli(a->fd, "\nping failed: could not push task to %s\n\n", name);
451                 ast_taskprocessor_unreference(tps);
452                 return CLI_FAILURE;
453         }
454         ast_cond_timedwait(&cli_ping_cond, &cli_ping_cond_lock, &ts);
455         ast_mutex_unlock(&cli_ping_cond_lock);
456
457         end = ast_tvnow();
458         delta = ast_tvsub(end, begin);
459         ast_cli(a->fd, "\n\t%24s ping time: %.1ld.%.6ld sec\n\n", name, (long)delta.tv_sec, (long int)delta.tv_usec);
460         ast_taskprocessor_unreference(tps);
461         return CLI_SUCCESS;
462 }
463
464 /*!
465  * \internal
466  * \brief Taskprocessor ao2 container sort function.
467  * \since 13.8.0
468  *
469  * \param obj_left pointer to the (user-defined part) of an object.
470  * \param obj_right pointer to the (user-defined part) of an object.
471  * \param flags flags from ao2_callback()
472  *   OBJ_SEARCH_OBJECT - if set, 'obj_right', is an object.
473  *   OBJ_SEARCH_KEY - if set, 'obj_right', is a search key item that is not an object.
474  *   OBJ_SEARCH_PARTIAL_KEY - if set, 'obj_right', is a partial search key item that is not an object.
475  *
476  * \retval <0 if obj_left < obj_right
477  * \retval =0 if obj_left == obj_right
478  * \retval >0 if obj_left > obj_right
479  */
480 static int tps_sort_cb(const void *obj_left, const void *obj_right, int flags)
481 {
482         const struct ast_taskprocessor *tps_left = obj_left;
483         const struct ast_taskprocessor *tps_right = obj_right;
484         const char *right_key = obj_right;
485         int cmp;
486
487         switch (flags & OBJ_SEARCH_MASK) {
488         default:
489         case OBJ_SEARCH_OBJECT:
490                 right_key = tps_right->name;
491                 /* Fall through */
492         case OBJ_SEARCH_KEY:
493                 cmp = strcasecmp(tps_left->name, right_key);
494                 break;
495         case OBJ_SEARCH_PARTIAL_KEY:
496                 cmp = strncasecmp(tps_left->name, right_key, strlen(right_key));
497                 break;
498         }
499         return cmp;
500 }
501
502 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
503 {
504         char name[256];
505         int tcount;
506         unsigned long qsize;
507         unsigned long maxqsize;
508         unsigned long processed;
509         struct ao2_container *sorted_tps;
510         struct ast_taskprocessor *tps;
511         struct ao2_iterator iter;
512 #define FMT_HEADERS             "%-70s %10s %10s %10s %10s %10s\n"
513 #define FMT_FIELDS              "%-70s %10lu %10lu %10lu %10lu %10lu\n"
514
515         switch (cmd) {
516         case CLI_INIT:
517                 e->command = "core show taskprocessors";
518                 e->usage =
519                         "Usage: core show taskprocessors\n"
520                         "       Shows a list of instantiated task processors and their statistics\n";
521                 return NULL;
522         case CLI_GENERATE:
523                 return NULL;
524         }
525
526         if (a->argc != e->args) {
527                 return CLI_SHOWUSAGE;
528         }
529
530         sorted_tps = ao2_container_alloc_rbtree(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, tps_sort_cb,
531                 NULL);
532         if (!sorted_tps
533                 || ao2_container_dup(sorted_tps, tps_singletons, 0)) {
534                 ao2_cleanup(sorted_tps);
535                 return CLI_FAILURE;
536         }
537
538         ast_cli(a->fd, "\n" FMT_HEADERS, "Processor", "Processed", "In Queue", "Max Depth", "Low water", "High water");
539         tcount = 0;
540         iter = ao2_iterator_init(sorted_tps, AO2_ITERATOR_UNLINK);
541         while ((tps = ao2_iterator_next(&iter))) {
542                 ast_copy_string(name, tps->name, sizeof(name));
543                 qsize = tps->tps_queue_size;
544                 maxqsize = tps->stats.max_qsize;
545                 processed = tps->stats._tasks_processed_count;
546                 ast_cli(a->fd, FMT_FIELDS, name, processed, qsize, maxqsize,
547                         tps->tps_queue_low, tps->tps_queue_high);
548                 ast_taskprocessor_unreference(tps);
549                 ++tcount;
550         }
551         ao2_iterator_destroy(&iter);
552         ast_cli(a->fd, "\n%d taskprocessors\n\n", tcount);
553         ao2_ref(sorted_tps, -1);
554         return CLI_SUCCESS;
555 }
556
557 /* hash callback for astobj2 */
558 static int tps_hash_cb(const void *obj, const int flags)
559 {
560         const struct ast_taskprocessor *tps = obj;
561         const char *name = flags & OBJ_KEY ? obj : tps->name;
562
563         return ast_str_case_hash(name);
564 }
565
566 /* compare callback for astobj2 */
567 static int tps_cmp_cb(void *obj, void *arg, int flags)
568 {
569         struct ast_taskprocessor *lhs = obj, *rhs = arg;
570         const char *rhsname = flags & OBJ_KEY ? arg : rhs->name;
571
572         return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
573 }
574
575 static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
576 {
577         return !strcmp(alert->subsystem, subsystem);
578 }
579
580 static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
581 {
582         return strcmp(a->subsystem, b->subsystem);
583 }
584
585 unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
586 {
587         struct subsystem_alert *alert;
588         unsigned int count = 0;
589         int idx;
590
591         AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
592         idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
593         if (idx >= 0) {
594                 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
595                 count = alert->alert_count;
596         }
597         AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
598
599         return count;
600 }
601
602 static void subsystem_alert_increment(const char *subsystem)
603 {
604         struct subsystem_alert *alert;
605         int idx;
606
607         if (ast_strlen_zero(subsystem)) {
608                 return;
609         }
610
611         AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
612         idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
613         if (idx >= 0) {
614                 alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
615                 alert->alert_count++;
616                 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
617                 return;
618         }
619
620         alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
621         if (!alert) {
622                 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
623                 return;
624         }
625         alert->alert_count = 1;
626         strcpy(alert->subsystem, subsystem); /* Safe */
627
628         if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
629                 ast_free(alert);
630         }
631         AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
632 }
633
634 static void subsystem_alert_decrement(const char *subsystem)
635 {
636         struct subsystem_alert *alert;
637         int idx;
638
639         if (ast_strlen_zero(subsystem)) {
640                 return;
641         }
642
643         AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
644         idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
645         if (idx < 0) {
646                 ast_log(LOG_ERROR,
647                         "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
648                 AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
649                 return;
650         }
651         alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
652
653         alert->alert_count--;
654         if (alert->alert_count <= 0) {
655                 AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
656                 ast_free(alert);
657         }
658
659         AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
660 }
661
662 static void subsystem_copy(struct subsystem_alert *alert,
663         struct subsystem_alert_vector *vector)
664 {
665         struct subsystem_alert *alert_copy;
666         alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
667         if (!alert_copy) {
668                 return;
669         }
670         alert_copy->alert_count = alert->alert_count;
671         strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
672         if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
673                 ast_free(alert_copy);
674         }
675 }
676
677 static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
678 {
679         struct subsystem_alert_vector sorted_subsystems;
680         int i;
681
682 #define FMT_HEADERS_SUBSYSTEM           "%-32s %12s\n"
683 #define FMT_FIELDS_SUBSYSTEM            "%-32s %12u\n"
684
685         switch (cmd) {
686         case CLI_INIT:
687                 e->command = "core show taskprocessor alerted subsystems";
688                 e->usage =
689                         "Usage: core show taskprocessor alerted subsystems\n"
690                         "       Shows a list of task processor subsystems that are currently alerted\n";
691                 return NULL;
692         case CLI_GENERATE:
693                 return NULL;
694         }
695
696         if (a->argc != e->args) {
697                 return CLI_SHOWUSAGE;
698         }
699
700         if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
701                 return CLI_FAILURE;
702         }
703
704         AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
705         for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
706                 subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
707         }
708         AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
709
710         ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
711
712         for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
713                 struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
714                 ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
715         }
716
717         ast_cli(a->fd, "\n%zu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
718
719         AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
720         AST_VECTOR_FREE(&sorted_subsystems);
721
722         return CLI_SUCCESS;
723 }
724
725
726 /*! Count of the number of taskprocessors in high water alert. */
727 static unsigned int tps_alert_count;
728
729 /*! Access protection for tps_alert_count */
730 AST_RWLOCK_DEFINE_STATIC(tps_alert_lock);
731
732 /*!
733  * \internal
734  * \brief Add a delta to tps_alert_count with protection.
735  * \since 13.10.0
736  *
737  * \param tps Taskprocessor updating queue water mark alert trigger.
738  * \param delta The amount to add to tps_alert_count.
739  *
740  * \return Nothing
741  */
742 static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
743 {
744         unsigned int old;
745
746         ast_rwlock_wrlock(&tps_alert_lock);
747         old = tps_alert_count;
748         tps_alert_count += delta;
749         if (DEBUG_ATLEAST(3)
750                 /* and tps_alert_count becomes zero or non-zero */
751                 && !old != !tps_alert_count) {
752                 ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
753                         tps->name, tps_alert_count ? "triggered" : "cleared");
754         }
755
756         if (tps->subsystem[0] != '\0') {
757                 if (delta > 0) {
758                         subsystem_alert_increment(tps->subsystem);
759                 } else {
760                         subsystem_alert_decrement(tps->subsystem);
761                 }
762         }
763
764         ast_rwlock_unlock(&tps_alert_lock);
765 }
766
767 unsigned int ast_taskprocessor_alert_get(void)
768 {
769         unsigned int count;
770
771         ast_rwlock_rdlock(&tps_alert_lock);
772         count = tps_alert_count;
773         ast_rwlock_unlock(&tps_alert_lock);
774
775         return count;
776 }
777
778 int ast_taskprocessor_alert_set_levels(struct ast_taskprocessor *tps, long low_water, long high_water)
779 {
780         if (!tps || high_water < 0 || high_water < low_water) {
781                 return -1;
782         }
783
784         if (low_water < 0) {
785                 /* Set low water level to 90% of high water level */
786                 low_water = (high_water * 9) / 10;
787         }
788
789         ao2_lock(tps);
790
791         tps->tps_queue_low = low_water;
792         tps->tps_queue_high = high_water;
793
794         if (tps->high_water_alert) {
795                 if (!tps->tps_queue_size || tps->tps_queue_size < low_water) {
796                         /* Update water mark alert immediately */
797                         tps->high_water_alert = 0;
798                         tps_alert_add(tps, -1);
799                 }
800         } else {
801                 if (high_water < tps->tps_queue_size) {
802                         /* Update water mark alert immediately */
803                         tps->high_water_alert = 1;
804                         tps_alert_add(tps, +1);
805                 }
806         }
807
808         ao2_unlock(tps);
809
810         return 0;
811 }
812
813 /* destroy the taskprocessor */
814 static void tps_taskprocessor_dtor(void *tps)
815 {
816         struct ast_taskprocessor *t = tps;
817         struct tps_task *task;
818
819         while ((task = AST_LIST_REMOVE_HEAD(&t->tps_queue, list))) {
820                 tps_task_free(task);
821         }
822         t->tps_queue_size = 0;
823
824         if (t->high_water_alert) {
825                 t->high_water_alert = 0;
826                 tps_alert_add(t, -1);
827         }
828
829         ao2_cleanup(t->listener);
830         t->listener = NULL;
831 }
832
833 /* pop the front task and return it */
834 static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
835 {
836         struct tps_task *task;
837
838         if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
839                 --tps->tps_queue_size;
840                 if (tps->high_water_alert && tps->tps_queue_size <= tps->tps_queue_low) {
841                         tps->high_water_alert = 0;
842                         tps_alert_add(tps, -1);
843                 }
844         }
845         return task;
846 }
847
848 long ast_taskprocessor_size(struct ast_taskprocessor *tps)
849 {
850         return (tps) ? tps->tps_queue_size : -1;
851 }
852
853 /* taskprocessor name accessor */
854 const char *ast_taskprocessor_name(struct ast_taskprocessor *tps)
855 {
856         if (!tps) {
857                 ast_log(LOG_ERROR, "no taskprocessor specified!\n");
858                 return NULL;
859         }
860         return tps->name;
861 }
862
863 static void listener_shutdown(struct ast_taskprocessor_listener *listener)
864 {
865         listener->callbacks->shutdown(listener);
866         ao2_ref(listener->tps, -1);
867 }
868
869 static void taskprocessor_listener_dtor(void *obj)
870 {
871         struct ast_taskprocessor_listener *listener = obj;
872
873         if (listener->callbacks->dtor) {
874                 listener->callbacks->dtor(listener);
875         }
876 }
877
878 struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
879 {
880         struct ast_taskprocessor_listener *listener;
881
882         listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
883         if (!listener) {
884                 return NULL;
885         }
886         listener->callbacks = callbacks;
887         listener->user_data = user_data;
888
889         return listener;
890 }
891
892 struct ast_taskprocessor *ast_taskprocessor_listener_get_tps(const struct ast_taskprocessor_listener *listener)
893 {
894         ao2_ref(listener->tps, +1);
895         return listener->tps;
896 }
897
898 void *ast_taskprocessor_listener_get_user_data(const struct ast_taskprocessor_listener *listener)
899 {
900         return listener->user_data;
901 }
902
903 static void *default_listener_pvt_alloc(void)
904 {
905         struct default_taskprocessor_listener_pvt *pvt;
906
907         pvt = ast_calloc(1, sizeof(*pvt));
908         if (!pvt) {
909                 return NULL;
910         }
911         pvt->poll_thread = AST_PTHREADT_NULL;
912         if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
913                 ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
914                 ast_free(pvt);
915                 return NULL;
916         }
917         return pvt;
918 }
919
920 /*!
921  * \internal
922  * \brief Allocate a task processor structure
923  *
924  * \param name Name of the task processor.
925  * \param listener Listener to associate with the task processor.
926  *
927  * \return The newly allocated task processor.
928  *
929  * \pre tps_singletons must be locked by the caller.
930  */
931 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
932 {
933         struct ast_taskprocessor *p;
934         char *subsystem_separator;
935         size_t subsystem_length = 0;
936         size_t name_length;
937
938         name_length = strlen(name);
939         subsystem_separator = strchr(name, '/');
940         if (subsystem_separator) {
941                 subsystem_length = subsystem_separator - name;
942         }
943
944         p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
945         if (!p) {
946                 ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
947                 return NULL;
948         }
949
950         /* Set default congestion water level alert triggers. */
951         p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
952         p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
953
954         strcpy(p->name, name); /* Safe */
955         p->subsystem = p->name + name_length + 1;
956         ast_copy_string(p->subsystem, name, subsystem_length + 1);
957
958         ao2_ref(listener, +1);
959         p->listener = listener;
960
961         p->thread = AST_PTHREADT_NULL;
962
963         ao2_ref(p, +1);
964         listener->tps = p;
965
966         if (!(ao2_link_flags(tps_singletons, p, OBJ_NOLOCK))) {
967                 ast_log(LOG_ERROR, "Failed to add taskprocessor '%s' to container\n", p->name);
968                 listener->tps = NULL;
969                 ao2_ref(p, -2);
970                 return NULL;
971         }
972
973         return p;
974 }
975
976 static struct ast_taskprocessor *__start_taskprocessor(struct ast_taskprocessor *p)
977 {
978         if (p && p->listener->callbacks->start(p->listener)) {
979                 ast_log(LOG_ERROR, "Unable to start taskprocessor listener for taskprocessor %s\n",
980                         p->name);
981                 ast_taskprocessor_unreference(p);
982
983                 return NULL;
984         }
985
986         return p;
987 }
988
989 /* Provide a reference to a taskprocessor.  Create the taskprocessor if necessary, but don't
990  * create the taskprocessor if we were told via ast_tps_options to return a reference only
991  * if it already exists */
992 struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_options create)
993 {
994         struct ast_taskprocessor *p;
995         struct ast_taskprocessor_listener *listener;
996         struct default_taskprocessor_listener_pvt *pvt;
997
998         if (ast_strlen_zero(name)) {
999                 ast_log(LOG_ERROR, "requesting a nameless taskprocessor!!!\n");
1000                 return NULL;
1001         }
1002         ao2_lock(tps_singletons);
1003         p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1004         if (p || (create & TPS_REF_IF_EXISTS)) {
1005                 /* calling function does not want a new taskprocessor to be created if it doesn't already exist */
1006                 ao2_unlock(tps_singletons);
1007                 return p;
1008         }
1009
1010         /* Create a new taskprocessor. Start by creating a default listener */
1011         pvt = default_listener_pvt_alloc();
1012         if (!pvt) {
1013                 ao2_unlock(tps_singletons);
1014                 return NULL;
1015         }
1016         listener = ast_taskprocessor_listener_alloc(&default_listener_callbacks, pvt);
1017         if (!listener) {
1018                 ao2_unlock(tps_singletons);
1019                 default_listener_pvt_destroy(pvt);
1020                 return NULL;
1021         }
1022
1023         p = __allocate_taskprocessor(name, listener);
1024         ao2_unlock(tps_singletons);
1025         p = __start_taskprocessor(p);
1026         ao2_ref(listener, -1);
1027
1028         return p;
1029 }
1030
1031 struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener)
1032 {
1033         struct ast_taskprocessor *p;
1034
1035         ao2_lock(tps_singletons);
1036         p = ao2_find(tps_singletons, name, OBJ_KEY | OBJ_NOLOCK);
1037         if (p) {
1038                 ao2_unlock(tps_singletons);
1039                 ast_taskprocessor_unreference(p);
1040                 return NULL;
1041         }
1042
1043         p = __allocate_taskprocessor(name, listener);
1044         ao2_unlock(tps_singletons);
1045
1046         return __start_taskprocessor(p);
1047 }
1048
1049 void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
1050         void *local_data)
1051 {
1052         SCOPED_AO2LOCK(lock, tps);
1053         tps->local_data = local_data;
1054 }
1055
1056 /* decrement the taskprocessor reference count and unlink from the container if necessary */
1057 void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
1058 {
1059         if (!tps) {
1060                 return NULL;
1061         }
1062
1063         /* To prevent another thread from finding and getting a reference to this
1064          * taskprocessor we hold the singletons lock. If we didn't do this then
1065          * they may acquire it and find that the listener has been shut down.
1066          */
1067         ao2_lock(tps_singletons);
1068
1069         if (ao2_ref(tps, -1) > 3) {
1070                 ao2_unlock(tps_singletons);
1071                 return NULL;
1072         }
1073
1074         /* If we're down to 3 references, then those must be:
1075          * 1. The reference we just got rid of
1076          * 2. The container
1077          * 3. The listener
1078          */
1079         ao2_unlink_flags(tps_singletons, tps, OBJ_NOLOCK);
1080         ao2_unlock(tps_singletons);
1081
1082         listener_shutdown(tps->listener);
1083         return NULL;
1084 }
1085
1086 /* push the task into the taskprocessor queue */
1087 static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
1088 {
1089         int previous_size;
1090         int was_empty;
1091
1092         if (!tps) {
1093                 ast_log(LOG_ERROR, "tps is NULL!\n");
1094                 return -1;
1095         }
1096
1097         if (!t) {
1098                 ast_log(LOG_ERROR, "t is NULL!\n");
1099                 return -1;
1100         }
1101
1102         ao2_lock(tps);
1103         AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
1104         previous_size = tps->tps_queue_size++;
1105
1106         if (tps->tps_queue_high <= tps->tps_queue_size) {
1107                 if (!tps->high_water_alert) {
1108                         ast_log(LOG_WARNING, "The '%s' task processor queue reached %ld scheduled tasks%s.\n",
1109                                 tps->name, tps->tps_queue_size, tps->high_water_warned ? " again" : "");
1110                         tps->high_water_warned = 1;
1111                         tps->high_water_alert = 1;
1112                         tps_alert_add(tps, +1);
1113                 }
1114         }
1115
1116         /* The currently executing task counts as still in queue */
1117         was_empty = tps->executing ? 0 : previous_size == 0;
1118         ao2_unlock(tps);
1119         tps->listener->callbacks->task_pushed(tps->listener, was_empty);
1120         return 0;
1121 }
1122
1123 int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
1124 {
1125         return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
1126 }
1127
1128 int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
1129 {
1130         return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
1131 }
1132
1133 int ast_taskprocessor_suspend(struct ast_taskprocessor *tps)
1134 {
1135         if (tps) {
1136                 ao2_lock(tps);
1137                 tps->suspended = 1;
1138                 ao2_unlock(tps);
1139                 return 0;
1140         }
1141         return -1;
1142 }
1143
1144 int ast_taskprocessor_unsuspend(struct ast_taskprocessor *tps)
1145 {
1146         if (tps) {
1147                 ao2_lock(tps);
1148                 tps->suspended = 0;
1149                 ao2_unlock(tps);
1150                 return 0;
1151         }
1152         return -1;
1153 }
1154
1155 int ast_taskprocessor_is_suspended(struct ast_taskprocessor *tps)
1156 {
1157         return tps ? tps->suspended : -1;
1158 }
1159
1160 int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
1161 {
1162         struct ast_taskprocessor_local local;
1163         struct tps_task *t;
1164         long size;
1165
1166         ao2_lock(tps);
1167         t = tps_taskprocessor_pop(tps);
1168         if (!t) {
1169                 ao2_unlock(tps);
1170                 return 0;
1171         }
1172
1173         tps->thread = pthread_self();
1174         tps->executing = 1;
1175
1176         if (t->wants_local) {
1177                 local.local_data = tps->local_data;
1178                 local.data = t->datap;
1179         }
1180         ao2_unlock(tps);
1181
1182         if (t->wants_local) {
1183                 t->callback.execute_local(&local);
1184         } else {
1185                 t->callback.execute(t->datap);
1186         }
1187         tps_task_free(t);
1188
1189         ao2_lock(tps);
1190         tps->thread = AST_PTHREADT_NULL;
1191         /* We need to check size in the same critical section where we reset the
1192          * executing bit. Avoids a race condition where a task is pushed right
1193          * after we pop an empty stack.
1194          */
1195         tps->executing = 0;
1196         size = ast_taskprocessor_size(tps);
1197
1198         /* Update the stats */
1199         ++tps->stats._tasks_processed_count;
1200
1201         /* Include the task we just executed as part of the queue size. */
1202         if (size >= tps->stats.max_qsize) {
1203                 tps->stats.max_qsize = size + 1;
1204         }
1205         ao2_unlock(tps);
1206
1207         /* If we executed a task, check for the transition to empty */
1208         if (size == 0 && tps->listener->callbacks->emptied) {
1209                 tps->listener->callbacks->emptied(tps->listener);
1210         }
1211         return size > 0;
1212 }
1213
1214 int ast_taskprocessor_is_task(struct ast_taskprocessor *tps)
1215 {
1216         int is_task;
1217
1218         ao2_lock(tps);
1219         is_task = pthread_equal(tps->thread, pthread_self());
1220         ao2_unlock(tps);
1221         return is_task;
1222 }
1223
1224 unsigned int ast_taskprocessor_seq_num(void)
1225 {
1226         static int seq_num;
1227
1228         return (unsigned int) ast_atomic_fetchadd_int(&seq_num, +1);
1229 }
1230
1231 void ast_taskprocessor_build_name(char *buf, unsigned int size, const char *format, ...)
1232 {
1233         va_list ap;
1234         int user_size;
1235 #define SEQ_STR_SIZE (1 + 8 + 1)        /* Dash plus 8 hex digits plus null terminator */
1236
1237         ast_assert(buf != NULL);
1238         ast_assert(SEQ_STR_SIZE <= size);
1239
1240         va_start(ap, format);
1241         user_size = vsnprintf(buf, size - (SEQ_STR_SIZE - 1), format, ap);
1242         va_end(ap);
1243         if (user_size < 0) {
1244                 /*
1245                  * Wow!  We got an output error to a memory buffer.
1246                  * Assume no user part of name written.
1247                  */
1248                 user_size = 0;
1249         } else if (size < user_size + SEQ_STR_SIZE) {
1250                 /* Truncate user part of name to make sequence number fit. */
1251                 user_size = size - SEQ_STR_SIZE;
1252         }
1253
1254         /* Append sequence number to end of user name. */
1255         snprintf(buf + user_size, SEQ_STR_SIZE, "-%08x", ast_taskprocessor_seq_num());
1256 }