pjsip_scheduler.c: Fix some corner cases.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_scheduler.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2016, Fairview 5 Engineering, LLC
5  *
6  * George Joseph <george.joseph@fairview5.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief res_pjsip Scheduler
22  *
23  * \author George Joseph <george.joseph@fairview5.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "asterisk/res_pjsip.h"
29 #include "include/res_pjsip_private.h"
30 #include "asterisk/res_pjsip_cli.h"
31 #include "asterisk/taskprocessor.h"
32
33 #define TASK_BUCKETS 53
34
35 static struct ast_sched_context *scheduler_context;
36 static struct ao2_container *tasks;
37 static int task_count;
38
39 struct ast_sip_sched_task {
40         /*! The serializer to be used (if any) (Holds a ref) */
41         struct ast_taskprocessor *serializer;
42         /*! task data */
43         void *task_data;
44         /*! task function */
45         ast_sip_task task;
46         /*! the time the task was originally scheduled/queued */
47         struct timeval when_queued;
48         /*! the last time the task was started */
49         struct timeval last_start;
50         /*! the last time the task was ended */
51         struct timeval last_end;
52         /*! When the periodic task is next expected to run */
53         struct timeval next_periodic;
54         /*! reschedule interval in milliseconds */
55         int interval;
56         /*! ast_sched scheudler id */
57         int current_scheduler_id;
58         /*! task is currently running */
59         int is_running;
60         /*! times run */
61         int run_count;
62         /*! the task reschedule, cleanup and policy flags */
63         enum ast_sip_scheduler_task_flags flags;
64         /*! A name to be associated with the task */
65         char name[0];
66 };
67
68 AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
69 AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
70 AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
71
72 static int push_to_serializer(const void *data);
73
74 /*
75  * This function is run in the context of the serializer.
76  * It runs the task with a simple call and reschedules based on the result.
77  */
78 static int run_task(void *data)
79 {
80         RAII_VAR(struct ast_sip_sched_task *, schtd, data, ao2_cleanup);
81         int res;
82         int delay;
83
84         if (!schtd->interval) {
85                 /* Task was cancelled while waiting to be executed by the serializer */
86                 return -1;
87         }
88
89         ao2_lock(schtd);
90         schtd->last_start = ast_tvnow();
91         schtd->is_running = 1;
92         ++schtd->run_count;
93         ao2_unlock(schtd);
94
95         res = schtd->task(schtd->task_data);
96
97         ao2_lock(schtd);
98         schtd->is_running = 0;
99         schtd->last_end = ast_tvnow();
100
101         /*
102          * Don't restart if the task returned <= 0 or if the interval
103          * was set to 0 while the task was running
104          */
105         if (res <= 0 || !schtd->interval) {
106                 schtd->interval = 0;
107                 ao2_unlock(schtd);
108                 ao2_unlink(tasks, schtd);
109                 return -1;
110         }
111
112         if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
113                 schtd->interval = res;
114         }
115
116         if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
117                 delay = schtd->interval;
118         } else {
119                 int64_t diff;
120
121                 /* Determine next periodic interval we need to expire. */
122                 do {
123                         schtd->next_periodic = ast_tvadd(schtd->next_periodic,
124                                 ast_samp2tv(schtd->interval, 1000));
125                         diff = ast_tvdiff_ms(schtd->next_periodic, schtd->last_end);
126                 } while (diff <= 0);
127                 delay = diff;
128         }
129
130         schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, schtd);
131         if (schtd->current_scheduler_id < 0) {
132                 schtd->interval = 0;
133                 ao2_unlock(schtd);
134                 ast_log(LOG_ERROR, "Sched %p: Failed to reschedule task %s\n", schtd, schtd->name);
135                 ao2_unlink(tasks, schtd);
136                 return -1;
137         }
138
139         ao2_unlock(schtd);
140
141         return 0;
142 }
143
144 /*
145  * This function is run by the scheduler thread.  Its only job is to push the task
146  * to the serialize and return.  It returns 0 so it's not rescheduled.
147  */
148 static int push_to_serializer(const void *data)
149 {
150         struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
151         int sched_id;
152
153         ao2_lock(schtd);
154         sched_id = schtd->current_scheduler_id;
155         schtd->current_scheduler_id = -1;
156         ao2_unlock(schtd);
157         if (sched_id < 0) {
158                 /* Task was cancelled while waiting on the lock */
159                 return 0;
160         }
161
162         ao2_t_ref(schtd, +1, "Give ref to run_task()");
163         if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
164                 /*
165                  * Oh my.  Have to cancel the scheduled item because we
166                  * unexpectedly cannot run it anymore.
167                  */
168                 ao2_unlink(tasks, schtd);
169                 ao2_lock(schtd);
170                 schtd->interval = 0;
171                 ao2_unlock(schtd);
172
173                 ao2_t_ref(schtd, -1, "Failed so release run_task() ref");
174         }
175
176         return 0;
177 }
178
179 int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
180 {
181         int res;
182         int sched_id;
183
184         /*
185          * Prevent any tasks in the serializer queue from
186          * running and restarting the scheduled item on us
187          * first.
188          */
189         ao2_lock(schtd);
190         schtd->interval = 0;
191
192         sched_id = schtd->current_scheduler_id;
193         schtd->current_scheduler_id = -1;
194         ao2_unlock(schtd);
195         res = ast_sched_del(scheduler_context, sched_id);
196
197         ao2_unlink(tasks, schtd);
198
199         return res;
200 }
201
202 int ast_sip_sched_task_cancel_by_name(const char *name)
203 {
204         int res;
205         struct ast_sip_sched_task *schtd;
206
207         if (ast_strlen_zero(name)) {
208                 return -1;
209         }
210
211         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
212         if (!schtd) {
213                 return -1;
214         }
215
216         res = ast_sip_sched_task_cancel(schtd);
217         ao2_ref(schtd, -1);
218         return res;
219 }
220
221
222 int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
223         struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
224 {
225         ao2_lock(schtd);
226         if (queued) {
227                 memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
228         }
229         if (last_start) {
230                 memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
231         }
232         if (last_end) {
233                 memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
234         }
235         ao2_unlock(schtd);
236
237         return 0;
238 }
239
240 int ast_sip_sched_task_get_times_by_name(const char *name,
241         struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
242 {
243         int res;
244         struct ast_sip_sched_task *schtd;
245
246         if (ast_strlen_zero(name)) {
247                 return -1;
248         }
249
250         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
251         if (!schtd) {
252                 return -1;
253         }
254
255         res = ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
256         ao2_ref(schtd, -1);
257         return res;
258 }
259
260 int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
261 {
262         if (maxlen <= 0) {
263                 return -1;
264         }
265
266         ao2_lock(schtd);
267         ast_copy_string(name, schtd->name, maxlen);
268         ao2_unlock(schtd);
269
270         return 0;
271 }
272
273 int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
274 {
275         int delay;
276         struct timeval since_when;
277         struct timeval now;
278
279         ao2_lock(schtd);
280
281         if (schtd->interval) {
282                 delay = schtd->interval;
283                 now = ast_tvnow();
284
285                 if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
286                         since_when = schtd->is_running ? now : schtd->last_end;
287                 } else {
288                         since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
289                 }
290
291                 delay -= ast_tvdiff_ms(now, since_when);
292
293                 delay = delay < 0 ? 0 : delay;
294         } else {
295                 delay = -1;
296         }
297
298         ao2_unlock(schtd);
299
300         return delay;
301 }
302
303 int ast_sip_sched_task_get_next_run_by_name(const char *name)
304 {
305         int next_run;
306         struct ast_sip_sched_task *schtd;
307
308         if (ast_strlen_zero(name)) {
309                 return -1;
310         }
311
312         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
313         if (!schtd) {
314                 return -1;
315         }
316
317         next_run = ast_sip_sched_task_get_next_run(schtd);
318         ao2_ref(schtd, -1);
319         return next_run;
320 }
321
322 int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
323 {
324         return schtd ? schtd->is_running : 0;
325 }
326
327 int ast_sip_sched_is_task_running_by_name(const char *name)
328 {
329         int is_running;
330         struct ast_sip_sched_task *schtd;
331
332         if (ast_strlen_zero(name)) {
333                 return 0;
334         }
335
336         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY);
337         if (!schtd) {
338                 return 0;
339         }
340
341         is_running = schtd->is_running;
342         ao2_ref(schtd, -1);
343         return is_running;
344 }
345
346 static void schtd_dtor(void *data)
347 {
348         struct ast_sip_sched_task *schtd = data;
349
350         if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
351                 /* release our own ref, then release the callers if asked to do so */
352                 ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
353         } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
354                 ast_free(schtd->task_data);
355         }
356         ast_taskprocessor_unreference(schtd->serializer);
357 }
358
359 struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
360         int interval, ast_sip_task sip_task, const char *name, void *task_data,
361         enum ast_sip_scheduler_task_flags flags)
362 {
363 #define ID_LEN 13 /* task_deadbeef */
364         struct ast_sip_sched_task *schtd;
365         int res;
366
367         if (interval <= 0) {
368                 return NULL;
369         }
370
371         schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1),
372                 schtd_dtor);
373         if (!schtd) {
374                 return NULL;
375         }
376
377         schtd->serializer = ao2_bump(serializer);
378         schtd->task_data = task_data;
379         schtd->task = sip_task;
380         schtd->interval = interval;
381         schtd->flags = flags;
382         if (!ast_strlen_zero(name)) {
383                 strcpy(schtd->name, name); /* Safe */
384         } else {
385                 uint32_t task_id;
386
387                 task_id = ast_atomic_fetchadd_int(&task_count, 1);
388                 sprintf(schtd->name, "task_%08x", task_id);
389         }
390         schtd->when_queued = ast_tvnow();
391         if (!(schtd->flags & AST_SIP_SCHED_TASK_DELAY)) {
392                 schtd->next_periodic = ast_tvadd(schtd->when_queued,
393                         ast_samp2tv(schtd->interval, 1000));
394         }
395
396         if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
397                 ao2_ref(task_data, +1);
398         }
399
400         /*
401          * We must put it in the 'tasks' container before scheduling
402          * the task because we don't want the push_to_serializer()
403          * sched task to "remove" it on failure before we even put
404          * it in.  If this happens then nothing would remove it from
405          * the 'tasks' container.
406          */
407         ao2_link(tasks, schtd);
408
409         /*
410          * Lock so we are guaranteed to get the sched id set before
411          * the push_to_serializer() sched task can clear it.
412          */
413         ao2_lock(schtd);
414         res = ast_sched_add(scheduler_context, interval, push_to_serializer, schtd);
415         schtd->current_scheduler_id = res;
416         ao2_unlock(schtd);
417         if (res < 0) {
418                 ao2_unlink(tasks, schtd);
419                 ao2_ref(schtd, -1);
420                 return NULL;
421         }
422
423         return schtd;
424 #undef ID_LEN
425 }
426
427 static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
428 {
429         struct ao2_iterator i;
430         struct ast_sip_sched_task *schtd;
431         const char *log_format;
432         struct ast_tm tm;
433         char queued[32];
434         char last_start[32];
435         char next_start[32];
436         int datelen;
437         struct timeval now;
438         static const char separator[] = "======================================";
439
440         switch (cmd) {
441         case CLI_INIT:
442                 e->command = "pjsip show scheduled_tasks";
443                 e->usage = "Usage: pjsip show scheduled_tasks\n"
444                             "      Show all scheduled tasks\n";
445                 return NULL;
446         case CLI_GENERATE:
447                 return NULL;
448         }
449
450         if (a->argc != 3) {
451                 return CLI_SHOWUSAGE;
452         }
453
454         now = ast_tvnow();
455         log_format = ast_logger_get_dateformat();
456
457         ast_localtime(&now, &tm, NULL);
458         datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
459
460         ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
461
462         ast_cli(a->fd, " %1$-24s %2$-9s %3$-9s %4$-5s  %6$-*5$s  %7$-*5$s  %8$-*5$s %9$7s\n",
463                 "Task Name", "Interval", "Times Run", "State",
464                 datelen, "Queued", "Last Started", "Next Start", "( secs)");
465
466         ast_cli(a->fd, " %1$-24.24s %2$-9.9s %3$-9.9s %4$-5.5s  %6$-*5$.*5$s  %7$-*5$.*5$s  %9$-*8$.*8$s\n",
467                 separator, separator, separator, separator,
468                 datelen, separator, separator, datelen + 8, separator);
469
470
471         ao2_rdlock(tasks);
472         i = ao2_iterator_init(tasks, AO2_ITERATOR_DONTLOCK);
473         while ((schtd = ao2_iterator_next(&i))) {
474                 int next_run_sec;
475                 struct timeval next;
476
477                 ao2_lock(schtd);
478
479                 next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
480                 next = ast_tvadd(now, (struct timeval) {next_run_sec, 0});
481
482                 ast_localtime(&schtd->when_queued, &tm, NULL);
483                 ast_strftime(queued, sizeof(queued), log_format, &tm);
484
485                 if (ast_tvzero(schtd->last_start)) {
486                         strcpy(last_start, "not yet started");
487                 } else {
488                         ast_localtime(&schtd->last_start, &tm, NULL);
489                         ast_strftime(last_start, sizeof(last_start), log_format, &tm);
490                 }
491
492                 ast_localtime(&next, &tm, NULL);
493                 ast_strftime(next_start, sizeof(next_start), log_format, &tm);
494
495                 ast_cli(a->fd, " %1$-24.24s %2$9.3f %3$9d %4$-5s  %6$-*5$s  %7$-*5$s  %8$-*5$s (%9$5d)\n",
496                         schtd->name,
497                         schtd->interval / 1000.0,
498                         schtd->run_count,
499                         schtd->is_running ? "run" : "wait",
500                         datelen, queued, last_start,
501                         next_start,
502                         next_run_sec);
503                 ao2_unlock(schtd);
504
505                 ao2_cleanup(schtd);
506         }
507         ao2_iterator_destroy(&i);
508         ao2_unlock(tasks);
509         ast_cli(a->fd, "\n");
510
511         return CLI_SUCCESS;
512 }
513
514 static struct ast_cli_entry cli_commands[] = {
515         AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
516 };
517
518 int ast_sip_initialize_scheduler(void)
519 {
520         scheduler_context = ast_sched_context_create();
521         if (!scheduler_context) {
522                 ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
523                 return -1;
524         }
525
526         if (ast_sched_start_thread(scheduler_context)) {
527                 ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
528                 ast_sched_context_destroy(scheduler_context);
529                 return -1;
530         }
531
532         tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK,
533                 AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT, TASK_BUCKETS, ast_sip_sched_task_hash_fn,
534                 ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
535         if (!tasks) {
536                 ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
537                 ast_sched_context_destroy(scheduler_context);
538                 return -1;
539         }
540
541         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
542
543         return 0;
544 }
545
546 int ast_sip_destroy_scheduler(void)
547 {
548         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
549
550         if (scheduler_context) {
551                 if (tasks) {
552                         struct ao2_iterator iter;
553                         struct ast_sip_sched_task *schtd;
554
555                         /* Cancel all scheduled tasks */
556                         iter = ao2_iterator_init(tasks, 0);
557                         while ((schtd = ao2_iterator_next(&iter))) {
558                                 ast_sip_sched_task_cancel(schtd);
559                                 ao2_ref(schtd, -1);
560                         }
561                         ao2_iterator_destroy(&iter);
562                 }
563
564                 ast_sched_context_destroy(scheduler_context);
565                 scheduler_context = NULL;
566         }
567
568         ao2_cleanup(tasks);
569         tasks = NULL;
570
571         return 0;
572 }