pjsip_transport_events.c: Fix crash using stale transport pointer.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_scheduler.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2016, Fairview 5 Engineering, LLC
5  *
6  * George Joseph <george.joseph@fairview5.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief res_pjsip Scheduler
22  *
23  * \author George Joseph <george.joseph@fairview5.com>
24  */
25
26 #include "asterisk.h"
27
28 #include "asterisk/res_pjsip.h"
29 #include "include/res_pjsip_private.h"
30 #include "asterisk/res_pjsip_cli.h"
31
32 #define TASK_BUCKETS 53
33
34 static struct ast_sched_context *scheduler_context;
35 static struct ao2_container *tasks;
36 static int task_count;
37
38 struct ast_sip_sched_task {
39         /*! ast_sip_sched task id */
40         uint32_t task_id;
41         /*! ast_sched scheudler id */
42         int current_scheduler_id;
43         /*! task is currently running */
44         int is_running;
45         /*! task */
46         ast_sip_task task;
47         /*! task data */
48         void *task_data;
49         /*! reschedule interval in milliseconds */
50         int interval;
51         /*! the time the task was queued */
52         struct timeval when_queued;
53         /*! the last time the task was started */
54         struct timeval last_start;
55         /*! the last time the task was ended */
56         struct timeval last_end;
57         /*! times run */
58         int run_count;
59         /*! the task reschedule, cleanup and policy flags */
60         enum ast_sip_scheduler_task_flags flags;
61         /*! the serializer to be used (if any) */
62         struct ast_taskprocessor *serializer;
63         /* A name to be associated with the task */
64         char name[0];
65 };
66
67 AO2_STRING_FIELD_HASH_FN(ast_sip_sched_task, name);
68 AO2_STRING_FIELD_CMP_FN(ast_sip_sched_task, name);
69 AO2_STRING_FIELD_SORT_FN(ast_sip_sched_task, name);
70
71 static int push_to_serializer(const void *data);
72
73 /*
74  * This function is run in the context of the serializer.
75  * It runs the task with a simple call and reschedules based on the result.
76  */
77 static int run_task(void *data)
78 {
79         RAII_VAR(struct ast_sip_sched_task *, schtd, ao2_bump(data), ao2_cleanup);
80         int res;
81         int delay;
82
83         ao2_lock(schtd);
84         schtd->last_start = ast_tvnow();
85         schtd->is_running = 1;
86         schtd->run_count++;
87         ao2_unlock(schtd);
88
89         res = schtd->task(schtd->task_data);
90
91         ao2_lock(schtd);
92         schtd->is_running = 0;
93         schtd->last_end = ast_tvnow();
94
95         /*
96          * Don't restart if the task returned 0 or if the interval
97          * was set to 0 while the task was running
98          */
99         if (!res || !schtd->interval) {
100                 schtd->interval = 0;
101                 ao2_unlock(schtd);
102                 ao2_unlink(tasks, schtd);
103                 return -1;
104         }
105
106         if (schtd->flags & AST_SIP_SCHED_TASK_VARIABLE) {
107                 schtd->interval = res;
108         }
109
110         if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
111                 delay = schtd->interval;
112         } else {
113                 delay = schtd->interval - (ast_tvdiff_ms(schtd->last_end, schtd->last_start) % schtd->interval);
114         }
115
116         schtd->current_scheduler_id = ast_sched_add(scheduler_context, delay, push_to_serializer, (const void *)schtd);
117         if (schtd->current_scheduler_id < 0) {
118                 schtd->interval = 0;
119                 ao2_unlock(schtd);
120                 ao2_unlink(tasks, schtd);
121                 return -1;
122         }
123
124         ao2_unlock(schtd);
125
126         return 0;
127 }
128
129 /*
130  * This function is run by the scheduler thread.  Its only job is to push the task
131  * to the serialize and return.  It returns 0 so it's not rescheduled.
132  */
133 static int push_to_serializer(const void *data)
134 {
135         struct ast_sip_sched_task *schtd = (struct ast_sip_sched_task *)data;
136
137         if (ast_sip_push_task(schtd->serializer, run_task, schtd)) {
138                 ao2_ref(schtd, -1);
139         }
140
141         return 0;
142 }
143
144 int ast_sip_sched_task_cancel(struct ast_sip_sched_task *schtd)
145 {
146         int res;
147
148         if (!ao2_ref_and_lock(schtd)) {
149                 return -1;
150         }
151
152         if (schtd->current_scheduler_id < 0 || schtd->interval <= 0) {
153                 ao2_unlock_and_unref(schtd);
154                 return 0;
155         }
156
157         schtd->interval = 0;
158         ao2_unlock_and_unref(schtd);
159         ao2_unlink(tasks, schtd);
160         res = ast_sched_del(scheduler_context, schtd->current_scheduler_id);
161
162         return res;
163 }
164
165 int ast_sip_sched_task_cancel_by_name(const char *name)
166 {
167         RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
168
169         if (ast_strlen_zero(name)) {
170                 return -1;
171         }
172
173         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
174         if (!schtd) {
175                 return -1;
176         }
177
178         return ast_sip_sched_task_cancel(schtd);
179 }
180
181
182 int ast_sip_sched_task_get_times(struct ast_sip_sched_task *schtd,
183         struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
184 {
185         if (!ao2_ref_and_lock(schtd)) {
186                 return -1;
187         }
188
189         if (queued) {
190                 memcpy(queued, &schtd->when_queued, sizeof(struct timeval));
191         }
192         if (last_start) {
193                 memcpy(last_start, &schtd->last_start, sizeof(struct timeval));
194         }
195         if (last_end) {
196                 memcpy(last_end, &schtd->last_end, sizeof(struct timeval));
197         }
198
199         ao2_unlock_and_unref(schtd);
200
201         return 0;
202 }
203
204 int ast_sip_sched_task_get_times_by_name(const char *name,
205         struct timeval *queued, struct timeval *last_start, struct timeval *last_end)
206 {
207         RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
208
209         if (ast_strlen_zero(name)) {
210                 return -1;
211         }
212
213         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
214         if (!schtd) {
215                 return -1;
216         }
217
218         return ast_sip_sched_task_get_times(schtd, queued, last_start, last_end);
219 }
220
221 int ast_sip_sched_task_get_name(struct ast_sip_sched_task *schtd, char *name, size_t maxlen)
222 {
223         if (maxlen <= 0) {
224                 return -1;
225         }
226
227         if (!ao2_ref_and_lock(schtd)) {
228                 return -1;
229         }
230
231         ast_copy_string(name, schtd->name, maxlen);
232
233         ao2_unlock_and_unref(schtd);
234
235         return 0;
236 }
237
238 int ast_sip_sched_task_get_next_run(struct ast_sip_sched_task *schtd)
239 {
240         int delay;
241         struct timeval since_when;
242         struct timeval now;
243
244         if (!ao2_ref_and_lock(schtd)) {
245                 return -1;
246         }
247
248         if (schtd->interval) {
249                 delay = schtd->interval;
250                 now = ast_tvnow();
251
252                 if (schtd->flags & AST_SIP_SCHED_TASK_DELAY) {
253                         since_when = schtd->is_running ? now : schtd->last_end;
254                 } else {
255                         since_when = schtd->last_start.tv_sec ? schtd->last_start : schtd->when_queued;
256                 }
257
258                 delay -= ast_tvdiff_ms(now, since_when);
259
260                 delay = delay < 0 ? 0 : delay;
261         } else {
262                 delay = -1;
263         }
264
265         ao2_unlock_and_unref(schtd);
266
267         return delay;
268 }
269
270 int ast_sip_sched_task_get_next_run_by_name(const char *name)
271 {
272         RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
273
274         if (ast_strlen_zero(name)) {
275                 return -1;
276         }
277
278         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
279         if (!schtd) {
280                 return -1;
281         }
282
283         return ast_sip_sched_task_get_next_run(schtd);
284 }
285
286 int ast_sip_sched_is_task_running(struct ast_sip_sched_task *schtd)
287 {
288         if (!schtd) {
289                 return 0;
290         }
291
292         return schtd->is_running;
293 }
294
295 int ast_sip_sched_is_task_running_by_name(const char *name)
296 {
297         RAII_VAR(struct ast_sip_sched_task *, schtd, NULL, ao2_cleanup);
298
299         if (ast_strlen_zero(name)) {
300                 return 0;
301         }
302
303         schtd = ao2_find(tasks, name, OBJ_SEARCH_KEY | OBJ_NOLOCK);
304         if (!schtd) {
305                 return 0;
306         }
307
308         return schtd->is_running;
309 }
310
311 static void schtd_destructor(void *data)
312 {
313         struct ast_sip_sched_task *schtd = data;
314
315         if (schtd->flags & AST_SIP_SCHED_TASK_DATA_AO2) {
316                 /* release our own ref, then release the callers if asked to do so */
317                 ao2_ref(schtd->task_data, (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE) ? -2 : -1);
318         } else if (schtd->task_data && (schtd->flags & AST_SIP_SCHED_TASK_DATA_FREE)) {
319                 ast_free(schtd->task_data);
320         }
321 }
322
323 struct ast_sip_sched_task *ast_sip_schedule_task(struct ast_taskprocessor *serializer,
324         int interval, ast_sip_task sip_task, char *name, void *task_data, enum ast_sip_scheduler_task_flags flags)
325 {
326 #define ID_LEN 13 /* task_deadbeef */
327         struct ast_sip_sched_task *schtd;
328         int res;
329
330         if (interval < 0) {
331                 return NULL;
332         }
333
334         schtd = ao2_alloc((sizeof(*schtd) + (!ast_strlen_zero(name) ? strlen(name) : ID_LEN) + 1), schtd_destructor);
335         if (!schtd) {
336                 return NULL;
337         }
338
339         schtd->task_id = ast_atomic_fetchadd_int(&task_count, 1);
340         schtd->serializer = serializer;
341         schtd->task = sip_task;
342         if (!ast_strlen_zero(name)) {
343                 strcpy(schtd->name, name); /* Safe */
344         } else {
345                 sprintf(schtd->name, "task_%08x", schtd->task_id);
346         }
347         schtd->task_data = task_data;
348         schtd->flags = flags;
349         schtd->interval = interval;
350         schtd->when_queued = ast_tvnow();
351
352         if (flags & AST_SIP_SCHED_TASK_DATA_AO2) {
353                 ao2_ref(task_data, +1);
354         }
355         res = ast_sched_add(scheduler_context, interval, push_to_serializer, (const void *)schtd);
356         if (res < 0) {
357                 ao2_ref(schtd, -1);
358                 return NULL;
359         } else {
360                 schtd->current_scheduler_id = res;
361                 ao2_link(tasks, schtd);
362         }
363
364         return schtd;
365 #undef ID_LEN
366 }
367
368 static char *cli_show_tasks(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
369 {
370         struct ao2_iterator i;
371         struct ast_sip_sched_task *schtd;
372         const char *log_format = ast_logger_get_dateformat();
373         struct ast_tm tm;
374         char queued[32];
375         char last_start[32];
376         char next_start[32];
377         int datelen;
378         struct timeval now = ast_tvnow();
379         const char *separator = "======================================";
380
381         switch (cmd) {
382         case CLI_INIT:
383                 e->command = "pjsip show scheduled_tasks";
384                 e->usage = "Usage: pjsip show scheduled_tasks\n"
385                             "      Show all scheduled tasks\n";
386                 return NULL;
387         case CLI_GENERATE:
388                 return NULL;
389         }
390
391         if (a->argc != 3) {
392                 return CLI_SHOWUSAGE;
393         }
394
395         ast_localtime(&now, &tm, NULL);
396         datelen = ast_strftime(queued, sizeof(queued), log_format, &tm);
397
398         ast_cli(a->fd, "PJSIP Scheduled Tasks:\n\n");
399
400         ast_cli(a->fd, " %1$-24s %2$-9s %3$-9s %4$-5s  %6$-*5$s  %7$-*5$s  %8$-*5$s %9$7s\n",
401                 "Task Name", "Interval", "Times Run", "State",
402                 datelen, "Queued", "Last Started", "Next Start", "( secs)");
403
404         ast_cli(a->fd, " %1$-24.24s %2$-9.9s %3$-9.9s %4$-5.5s  %6$-*5$.*5$s  %7$-*5$.*5$s  %9$-*8$.*8$s\n",
405                 separator, separator, separator, separator,
406                 datelen, separator, separator, datelen + 8, separator);
407
408
409         ao2_ref(tasks, +1);
410         ao2_rdlock(tasks);
411         i = ao2_iterator_init(tasks, 0);
412         while ((schtd = ao2_iterator_next(&i))) {
413                 int next_run_sec = ast_sip_sched_task_get_next_run(schtd) / 1000;
414                 struct timeval next = ast_tvadd(now, (struct timeval) {next_run_sec, 0});
415
416                 ast_localtime(&schtd->when_queued, &tm, NULL);
417                 ast_strftime(queued, sizeof(queued), log_format, &tm);
418
419                 if (ast_tvzero(schtd->last_start)) {
420                         strcpy(last_start, "not yet started");
421                 } else {
422                         ast_localtime(&schtd->last_start, &tm, NULL);
423                         ast_strftime(last_start, sizeof(last_start), log_format, &tm);
424                 }
425
426                 ast_localtime(&next, &tm, NULL);
427                 ast_strftime(next_start, sizeof(next_start), log_format, &tm);
428
429                 ast_cli(a->fd, " %1$-24.24s %2$9.3f %3$9d %4$-5s  %6$-*5$s  %7$-*5$s  %8$-*5$s (%9$5d)\n",
430                         schtd->name,
431                         schtd->interval / 1000.0,
432                         schtd->run_count,
433                         schtd->is_running ? "run" : "wait",
434                         datelen, queued, last_start,
435                         next_start,
436                         next_run_sec);
437                 ao2_cleanup(schtd);
438         }
439         ao2_iterator_destroy(&i);
440         ao2_unlock(tasks);
441         ao2_ref(tasks, -1);
442         ast_cli(a->fd, "\n");
443
444         return CLI_SUCCESS;
445 }
446
447 static struct ast_cli_entry cli_commands[] = {
448         AST_CLI_DEFINE(cli_show_tasks, "Show all scheduled tasks"),
449 };
450
451 int ast_sip_initialize_scheduler(void)
452 {
453         if (!(scheduler_context = ast_sched_context_create())) {
454                 ast_log(LOG_ERROR, "Failed to create scheduler. Aborting load\n");
455                 return -1;
456         }
457
458         if (ast_sched_start_thread(scheduler_context)) {
459                 ast_log(LOG_ERROR, "Failed to start scheduler. Aborting load\n");
460                 ast_sched_context_destroy(scheduler_context);
461                 return -1;
462         }
463
464         tasks = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, AO2_CONTAINER_ALLOC_OPT_DUPS_REJECT,
465                 TASK_BUCKETS, ast_sip_sched_task_hash_fn, ast_sip_sched_task_sort_fn, ast_sip_sched_task_cmp_fn);
466         if (!tasks) {
467                 ast_log(LOG_ERROR, "Failed to allocate task container. Aborting load\n");
468                 ast_sched_context_destroy(scheduler_context);
469                 return -1;
470         }
471
472         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
473
474         return 0;
475 }
476
477 int ast_sip_destroy_scheduler(void)
478 {
479         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
480
481         if (scheduler_context) {
482                 ast_sched_context_destroy(scheduler_context);
483         }
484
485         ao2_cleanup(tasks);
486         tasks = NULL;
487
488         return 0;
489 }