Finally, a method that really fixes the assertions in chan_iax2.c related to cancelli...
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2008, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Scheduler Routines (from cheops-NG)
22  *
23  * \author Mark Spencer <markster@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #ifdef DEBUG_SCHEDULER
31 #define DEBUG(a) do { \
32         if (option_debug) \
33                 DEBUG_M(a) \
34         } while (0)
35 #else
36 #define DEBUG(a) 
37 #endif
38
39 #include <sys/time.h>
40
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/linkedlists.h"
46 #include "asterisk/dlinkedlists.h"
47 #include "asterisk/hashtab.h"
48 #include "asterisk/heap.h"
49 #include "asterisk/threadstorage.h"
50
51 AST_THREADSTORAGE(last_del_id);
52
53 struct sched {
54         AST_LIST_ENTRY(sched) list;
55         int id;                       /*!< ID number of event */
56         struct timeval when;          /*!< Absolute time event should take place */
57         int resched;                  /*!< When to reschedule */
58         int variable;                 /*!< Use return value from callback to reschedule */
59         const void *data;             /*!< Data */
60         ast_sched_cb callback;        /*!< Callback */
61         ssize_t __heap_index;
62 };
63
64 struct sched_context {
65         ast_mutex_t lock;
66         unsigned int eventcnt;                  /*!< Number of events processed */
67         unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
68         unsigned int highwater;                                 /*!< highest count so far */
69         struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
70         struct ast_heap *sched_heap;
71
72 #ifdef SCHED_MAX_CACHE
73         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
74         unsigned int schedccnt;
75 #endif
76 };
77
78 struct ast_sched_thread {
79         pthread_t thread;
80         ast_mutex_t lock;
81         ast_cond_t cond;
82         struct sched_context *context;
83         unsigned int stop:1;
84 };
85
86 static void *sched_run(void *data)
87 {
88         struct ast_sched_thread *st = data;
89
90         while (!st->stop) {
91                 int ms;
92                 struct timespec ts = {
93                         .tv_sec = 0,    
94                 };
95
96                 ast_mutex_lock(&st->lock);
97
98                 if (st->stop) {
99                         ast_mutex_unlock(&st->lock);
100                         return NULL;
101                 }
102
103                 ms = ast_sched_wait(st->context);
104
105                 if (ms == -1) {
106                         ast_cond_wait(&st->cond, &st->lock);
107                 } else {        
108                         struct timeval tv;
109                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
110                         ts.tv_sec = tv.tv_sec;
111                         ts.tv_nsec = tv.tv_usec * 1000;
112                         ast_cond_timedwait(&st->cond, &st->lock, &ts);
113                 }
114
115                 ast_mutex_unlock(&st->lock);
116
117                 if (st->stop) {
118                         return NULL;
119                 }
120
121                 ast_sched_runq(st->context);
122         }
123
124         return NULL;
125 }
126
127 void ast_sched_thread_poke(struct ast_sched_thread *st)
128 {
129         ast_mutex_lock(&st->lock);
130         ast_cond_signal(&st->cond);
131         ast_mutex_unlock(&st->lock);
132 }
133
134 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
135 {
136         return st->context;
137 }
138
139 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
140 {
141         if (st->thread != AST_PTHREADT_NULL) {
142                 ast_mutex_lock(&st->lock);
143                 st->stop = 1;
144                 ast_cond_signal(&st->cond);
145                 ast_mutex_unlock(&st->lock);
146                 pthread_join(st->thread, NULL);
147                 st->thread = AST_PTHREADT_NULL;
148         }
149
150         ast_mutex_destroy(&st->lock);
151         ast_cond_destroy(&st->cond);
152
153         if (st->context) {
154                 sched_context_destroy(st->context);
155                 st->context = NULL;
156         }
157
158         ast_free(st);
159
160         return NULL;
161 }
162
163 struct ast_sched_thread *ast_sched_thread_create(void)
164 {
165         struct ast_sched_thread *st;
166
167         if (!(st = ast_calloc(1, sizeof(*st)))) {
168                 return NULL;
169         }
170
171         ast_mutex_init(&st->lock);
172         ast_cond_init(&st->cond, NULL);
173
174         st->thread = AST_PTHREADT_NULL;
175
176         if (!(st->context = sched_context_create())) {
177                 ast_log(LOG_ERROR, "Failed to create scheduler\n");
178                 ast_sched_thread_destroy(st);
179                 return NULL;
180         }
181         
182         if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
183                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
184                 ast_sched_thread_destroy(st);
185                 return NULL;
186         }
187
188         return st;
189 }
190
191 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
192                 const void *data, int variable)
193 {
194         int res;
195
196         ast_mutex_lock(&st->lock);
197         res = ast_sched_add_variable(st->context, when, cb, data, variable);
198         if (res != -1) {
199                 ast_cond_signal(&st->cond);
200         }
201         ast_mutex_unlock(&st->lock);
202
203         return res;
204 }
205
206 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
207                 const void *data)
208 {
209         int res;
210
211         ast_mutex_lock(&st->lock);
212         res = ast_sched_add(st->context, when, cb, data);
213         if (res != -1) {
214                 ast_cond_signal(&st->cond);
215         }
216         ast_mutex_unlock(&st->lock);
217
218         return res;
219 }
220
221 /* hash routines for sched */
222
223 static int sched_cmp(const void *a, const void *b)
224 {
225         const struct sched *as = a;
226         const struct sched *bs = b;
227         return as->id != bs->id; /* return 0 on a match like strcmp would */
228 }
229
230 static unsigned int sched_hash(const void *obj)
231 {
232         const struct sched *s = obj;
233         unsigned int h = s->id;
234         return h;
235 }
236
237 static int sched_time_cmp(void *a, void *b)
238 {
239         return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
240 }
241
242 struct sched_context *sched_context_create(void)
243 {
244         struct sched_context *tmp;
245
246         if (!(tmp = ast_calloc(1, sizeof(*tmp))))
247                 return NULL;
248
249         ast_mutex_init(&tmp->lock);
250         tmp->eventcnt = 1;
251
252         tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
253
254         if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
255                         offsetof(struct sched, __heap_index)))) {
256                 sched_context_destroy(tmp);
257                 return NULL;
258         }
259
260         return tmp;
261 }
262
263 void sched_context_destroy(struct sched_context *con)
264 {
265         struct sched *s;
266
267         ast_mutex_lock(&con->lock);
268
269 #ifdef SCHED_MAX_CACHE
270         /* Eliminate the cache */
271         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
272                 ast_free(s);
273 #endif
274
275         if (con->sched_heap) {
276                 while ((s = ast_heap_pop(con->sched_heap))) {
277                         ast_free(s);
278                 }
279                 ast_heap_destroy(con->sched_heap);
280                 con->sched_heap = NULL;
281         }
282
283         ast_hashtab_destroy(con->schedq_ht, NULL);
284         con->schedq_ht = NULL;
285         
286         /* And the context */
287         ast_mutex_unlock(&con->lock);
288         ast_mutex_destroy(&con->lock);
289         ast_free(con);
290 }
291
292 static struct sched *sched_alloc(struct sched_context *con)
293 {
294         struct sched *tmp;
295
296         /*
297          * We keep a small cache of schedule entries
298          * to minimize the number of necessary malloc()'s
299          */
300 #ifdef SCHED_MAX_CACHE
301         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
302                 con->schedccnt--;
303         else
304 #endif
305                 tmp = ast_calloc(1, sizeof(*tmp));
306
307         return tmp;
308 }
309
310 static void sched_release(struct sched_context *con, struct sched *tmp)
311 {
312         /*
313          * Add to the cache, or just free() if we
314          * already have too many cache entries
315          */
316
317 #ifdef SCHED_MAX_CACHE   
318         if (con->schedccnt < SCHED_MAX_CACHE) {
319                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
320                 con->schedccnt++;
321         } else
322 #endif
323                 ast_free(tmp);
324 }
325
326 /*! \brief
327  * Return the number of milliseconds 
328  * until the next scheduled event
329  */
330 int ast_sched_wait(struct sched_context *con)
331 {
332         int ms;
333         struct sched *s;
334
335         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
336
337         ast_mutex_lock(&con->lock);
338         if ((s = ast_heap_peek(con->sched_heap, 1))) {
339                 ms = ast_tvdiff_ms(s->when, ast_tvnow());
340                 if (ms < 0) {
341                         ms = 0;
342                 }
343         } else {
344                 ms = -1;
345         }
346         ast_mutex_unlock(&con->lock);
347
348         return ms;
349 }
350
351
352 /*! \brief
353  * Take a sched structure and put it in the
354  * queue, such that the soonest event is
355  * first in the list. 
356  */
357 static void schedule(struct sched_context *con, struct sched *s)
358 {
359         ast_heap_push(con->sched_heap, s);
360
361         if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
362                 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
363         }
364
365         con->schedcnt++;
366
367         if (con->schedcnt > con->highwater) {
368                 con->highwater = con->schedcnt;
369         }
370 }
371
372 /*! \brief
373  * given the last event *tv and the offset in milliseconds 'when',
374  * computes the next value,
375  */
376 static int sched_settime(struct timeval *t, int when)
377 {
378         struct timeval now = ast_tvnow();
379
380         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
381         if (ast_tvzero(*t))     /* not supplied, default to now */
382                 *t = now;
383         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
384         if (ast_tvcmp(*t, now) < 0) {
385                 *t = now;
386         }
387         return 0;
388 }
389
390 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
391 {
392         /* 0 means the schedule item is new; do not delete */
393         if (old_id > 0) {
394                 AST_SCHED_DEL(con, old_id);
395         }
396         return ast_sched_add_variable(con, when, callback, data, variable);
397 }
398
399 /*! \brief
400  * Schedule callback(data) to happen when ms into the future
401  */
402 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
403 {
404         struct sched *tmp;
405         int res = -1;
406
407         DEBUG(ast_debug(1, "ast_sched_add()\n"));
408
409         ast_mutex_lock(&con->lock);
410         if ((tmp = sched_alloc(con))) {
411                 tmp->id = con->eventcnt++;
412                 tmp->callback = callback;
413                 tmp->data = data;
414                 tmp->resched = when;
415                 tmp->variable = variable;
416                 tmp->when = ast_tv(0, 0);
417                 if (sched_settime(&tmp->when, when)) {
418                         sched_release(con, tmp);
419                 } else {
420                         schedule(con, tmp);
421                         res = tmp->id;
422                 }
423         }
424 #ifdef DUMP_SCHEDULER
425         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
426         if (option_debug)
427                 ast_sched_dump(con);
428 #endif
429         ast_mutex_unlock(&con->lock);
430
431         return res;
432 }
433
434 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
435 {
436         if (old_id > -1) {
437                 AST_SCHED_DEL(con, old_id);
438         }
439         return ast_sched_add(con, when, callback, data);
440 }
441
442 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
443 {
444         return ast_sched_add_variable(con, when, callback, data, 0);
445 }
446
447 const void *ast_sched_find_data(struct sched_context *con, int id)
448 {
449         struct sched tmp,*res;
450         tmp.id = id;
451         res = ast_hashtab_lookup(con->schedq_ht, &tmp);
452         if (res)
453                 return res->data;
454         return NULL;
455 }
456
457 /*! \brief
458  * Delete the schedule entry with number
459  * "id".  It's nearly impossible that there
460  * would be two or more in the list with that
461  * id.
462  */
463 #ifndef AST_DEVMODE
464 int ast_sched_del(struct sched_context *con, int id)
465 #else
466 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
467 #endif
468 {
469         struct sched *s, tmp = {
470                 .id = id,
471         };
472         int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int *));
473
474         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
475
476         if (id < 0) {
477                 return 0;
478         }
479
480         ast_mutex_lock(&con->lock);
481         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
482         if (s) {
483                 if (!ast_heap_remove(con->sched_heap, s)) {
484                         ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
485                 }
486
487                 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
488                         ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
489                 }
490
491                 con->schedcnt--;
492
493                 sched_release(con, s);
494         }
495
496 #ifdef DUMP_SCHEDULER
497         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
498         if (option_debug)
499                 ast_sched_dump(con);
500 #endif
501         ast_mutex_unlock(&con->lock);
502
503         if (!s && *last_id != id) {
504                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
505 #ifndef AST_DEVMODE
506                 ast_assert(s != NULL);
507 #else
508                 {
509                 char buf[100];
510                 snprintf(buf, sizeof(buf), "s != NULL, id=%d", id);
511                 _ast_assert(0, buf, file, line, function);
512                 }
513 #endif
514                 *last_id = id;
515                 return -1;
516         } else if (!s) {
517                 return -1;
518         }
519
520         return 0;
521 }
522
523 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
524 {
525         int i, x;
526         struct sched *cur;
527         int countlist[cbnames->numassocs + 1];
528         size_t heap_size;
529         
530         memset(countlist, 0, sizeof(countlist));
531         ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
532
533         ast_mutex_lock(&con->lock);
534
535         heap_size = ast_heap_size(con->sched_heap);
536         for (x = 1; x <= heap_size; x++) {
537                 cur = ast_heap_peek(con->sched_heap, x);
538                 /* match the callback to the cblist */
539                 for (i = 0; i < cbnames->numassocs; i++) {
540                         if (cur->callback == cbnames->cblist[i]) {
541                                 break;
542                         }
543                 }
544                 if (i < cbnames->numassocs) {
545                         countlist[i]++;
546                 } else {
547                         countlist[cbnames->numassocs]++;
548                 }
549         }
550
551         ast_mutex_unlock(&con->lock);
552
553         for (i = 0; i < cbnames->numassocs; i++) {
554                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
555         }
556
557         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
558 }
559         
560 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
561 void ast_sched_dump(struct sched_context *con)
562 {
563         struct sched *q;
564         struct timeval when = ast_tvnow();
565         int x;
566         size_t heap_size;
567 #ifdef SCHED_MAX_CACHE
568         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
569 #else
570         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
571 #endif
572
573         ast_debug(1, "=============================================================\n");
574         ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
575         ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
576         ast_mutex_lock(&con->lock);
577         heap_size = ast_heap_size(con->sched_heap);
578         for (x = 1; x <= heap_size; x++) {
579                 struct timeval delta;
580                 q = ast_heap_peek(con->sched_heap, x);
581                 delta = ast_tvsub(q->when, when);
582                 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
583                         q->id,
584                         q->callback,
585                         q->data,
586                         (long)delta.tv_sec,
587                         (long int)delta.tv_usec);
588         }
589         ast_mutex_unlock(&con->lock);
590         ast_debug(1, "=============================================================\n");
591 }
592
593 /*! \brief
594  * Launch all events which need to be run at this time.
595  */
596 int ast_sched_runq(struct sched_context *con)
597 {
598         struct sched *current;
599         struct timeval when;
600         int numevents;
601         int res;
602
603         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
604                 
605         ast_mutex_lock(&con->lock);
606
607         for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
608                 /* schedule all events which are going to expire within 1ms.
609                  * We only care about millisecond accuracy anyway, so this will
610                  * help us get more than one event at one time if they are very
611                  * close together.
612                  */
613                 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
614                 if (ast_tvcmp(current->when, when) != -1) {
615                         break;
616                 }
617                 
618                 current = ast_heap_pop(con->sched_heap);
619
620                 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
621                         ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
622                 }
623
624                 con->schedcnt--;
625
626                 /*
627                  * At this point, the schedule queue is still intact.  We
628                  * have removed the first event and the rest is still there,
629                  * so it's permissible for the callback to add new events, but
630                  * trying to delete itself won't work because it isn't in
631                  * the schedule queue.  If that's what it wants to do, it 
632                  * should return 0.
633                  */
634                         
635                 ast_mutex_unlock(&con->lock);
636                 res = current->callback(current->data);
637                 ast_mutex_lock(&con->lock);
638                         
639                 if (res) {
640                         /*
641                          * If they return non-zero, we should schedule them to be
642                          * run again.
643                          */
644                         if (sched_settime(&current->when, current->variable? res : current->resched)) {
645                                 sched_release(con, current);
646                         } else {
647                                 schedule(con, current);
648                         }
649                 } else {
650                         /* No longer needed, so release it */
651                         sched_release(con, current);
652                 }
653         }
654
655         ast_mutex_unlock(&con->lock);
656         
657         return numevents;
658 }
659
660 long ast_sched_when(struct sched_context *con,int id)
661 {
662         struct sched *s, tmp;
663         long secs = -1;
664         DEBUG(ast_debug(1, "ast_sched_when()\n"));
665
666         ast_mutex_lock(&con->lock);
667         
668         /* these next 2 lines replace a lookup loop */
669         tmp.id = id;
670         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
671         
672         if (s) {
673                 struct timeval now = ast_tvnow();
674                 secs = s->when.tv_sec - now.tv_sec;
675         }
676         ast_mutex_unlock(&con->lock);
677         
678         return secs;
679 }