8699cfbabc7d62154f64a2d14a965c5ade41c910
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2008, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Scheduler Routines (from cheops-NG)
22  *
23  * \author Mark Spencer <markster@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #ifdef DEBUG_SCHEDULER
31 #define DEBUG(a) do { \
32         if (option_debug) \
33                 DEBUG_M(a) \
34         } while (0)
35 #else
36 #define DEBUG(a) 
37 #endif
38
39 #include <sys/time.h>
40
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/linkedlists.h"
46 #include "asterisk/dlinkedlists.h"
47 #include "asterisk/hashtab.h"
48 #include "asterisk/heap.h"
49
50 struct sched {
51         AST_LIST_ENTRY(sched) list;
52         int id;                       /*!< ID number of event */
53         struct timeval when;          /*!< Absolute time event should take place */
54         int resched;                  /*!< When to reschedule */
55         int variable;                 /*!< Use return value from callback to reschedule */
56         const void *data;             /*!< Data */
57         ast_sched_cb callback;        /*!< Callback */
58         ssize_t __heap_index;
59 };
60
61 struct sched_context {
62         ast_mutex_t lock;
63         unsigned int eventcnt;                  /*!< Number of events processed */
64         unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
65         unsigned int highwater;                                 /*!< highest count so far */
66         struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
67         struct ast_heap *sched_heap;
68
69 #ifdef SCHED_MAX_CACHE
70         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
71         unsigned int schedccnt;
72 #endif
73 };
74
75 struct ast_sched_thread {
76         pthread_t thread;
77         ast_mutex_t lock;
78         ast_cond_t cond;
79         struct sched_context *context;
80         unsigned int stop:1;
81 };
82
83 static void *sched_run(void *data)
84 {
85         struct ast_sched_thread *st = data;
86
87         while (!st->stop) {
88                 int ms;
89                 struct timespec ts = {
90                         .tv_sec = 0,    
91                 };
92
93                 ast_mutex_lock(&st->lock);
94
95                 if (st->stop) {
96                         ast_mutex_unlock(&st->lock);
97                         return NULL;
98                 }
99
100                 ms = ast_sched_wait(st->context);
101
102                 if (ms == -1) {
103                         ast_cond_wait(&st->cond, &st->lock);
104                 } else {        
105                         struct timeval tv;
106                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
107                         ts.tv_sec = tv.tv_sec;
108                         ts.tv_nsec = tv.tv_usec * 1000;
109                         ast_cond_timedwait(&st->cond, &st->lock, &ts);
110                 }
111
112                 ast_mutex_unlock(&st->lock);
113
114                 if (st->stop) {
115                         return NULL;
116                 }
117
118                 ast_sched_runq(st->context);
119         }
120
121         return NULL;
122 }
123
124 void ast_sched_thread_poke(struct ast_sched_thread *st)
125 {
126         ast_mutex_lock(&st->lock);
127         ast_cond_signal(&st->cond);
128         ast_mutex_unlock(&st->lock);
129 }
130
131 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
132 {
133         return st->context;
134 }
135
136 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
137 {
138         if (st->thread != AST_PTHREADT_NULL) {
139                 ast_mutex_lock(&st->lock);
140                 st->stop = 1;
141                 ast_cond_signal(&st->cond);
142                 ast_mutex_unlock(&st->lock);
143                 pthread_join(st->thread, NULL);
144                 st->thread = AST_PTHREADT_NULL;
145         }
146
147         ast_mutex_destroy(&st->lock);
148         ast_cond_destroy(&st->cond);
149
150         if (st->context) {
151                 sched_context_destroy(st->context);
152                 st->context = NULL;
153         }
154
155         ast_free(st);
156
157         return NULL;
158 }
159
160 struct ast_sched_thread *ast_sched_thread_create(void)
161 {
162         struct ast_sched_thread *st;
163
164         if (!(st = ast_calloc(1, sizeof(*st)))) {
165                 return NULL;
166         }
167
168         ast_mutex_init(&st->lock);
169         ast_cond_init(&st->cond, NULL);
170
171         st->thread = AST_PTHREADT_NULL;
172
173         if (!(st->context = sched_context_create())) {
174                 ast_log(LOG_ERROR, "Failed to create scheduler\n");
175                 ast_sched_thread_destroy(st);
176                 return NULL;
177         }
178         
179         if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
180                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
181                 ast_sched_thread_destroy(st);
182                 return NULL;
183         }
184
185         return st;
186 }
187
188 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
189                 const void *data, int variable)
190 {
191         int res;
192
193         ast_mutex_lock(&st->lock);
194         res = ast_sched_add_variable(st->context, when, cb, data, variable);
195         if (res != -1) {
196                 ast_cond_signal(&st->cond);
197         }
198         ast_mutex_unlock(&st->lock);
199
200         return res;
201 }
202
203 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
204                 const void *data)
205 {
206         int res;
207
208         ast_mutex_lock(&st->lock);
209         res = ast_sched_add(st->context, when, cb, data);
210         if (res != -1) {
211                 ast_cond_signal(&st->cond);
212         }
213         ast_mutex_unlock(&st->lock);
214
215         return res;
216 }
217
218 /* hash routines for sched */
219
220 static int sched_cmp(const void *a, const void *b)
221 {
222         const struct sched *as = a;
223         const struct sched *bs = b;
224         return as->id != bs->id; /* return 0 on a match like strcmp would */
225 }
226
227 static unsigned int sched_hash(const void *obj)
228 {
229         const struct sched *s = obj;
230         unsigned int h = s->id;
231         return h;
232 }
233
234 static int sched_time_cmp(void *a, void *b)
235 {
236         return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
237 }
238
239 struct sched_context *sched_context_create(void)
240 {
241         struct sched_context *tmp;
242
243         if (!(tmp = ast_calloc(1, sizeof(*tmp))))
244                 return NULL;
245
246         ast_mutex_init(&tmp->lock);
247         tmp->eventcnt = 1;
248
249         tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
250
251         if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
252                         offsetof(struct sched, __heap_index)))) {
253                 sched_context_destroy(tmp);
254                 return NULL;
255         }
256
257         return tmp;
258 }
259
260 void sched_context_destroy(struct sched_context *con)
261 {
262         struct sched *s;
263
264         ast_mutex_lock(&con->lock);
265
266 #ifdef SCHED_MAX_CACHE
267         /* Eliminate the cache */
268         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
269                 ast_free(s);
270 #endif
271
272         if (con->sched_heap) {
273                 while ((s = ast_heap_pop(con->sched_heap))) {
274                         ast_free(s);
275                 }
276                 ast_heap_destroy(con->sched_heap);
277                 con->sched_heap = NULL;
278         }
279
280         ast_hashtab_destroy(con->schedq_ht, NULL);
281         con->schedq_ht = NULL;
282         
283         /* And the context */
284         ast_mutex_unlock(&con->lock);
285         ast_mutex_destroy(&con->lock);
286         ast_free(con);
287 }
288
289 static struct sched *sched_alloc(struct sched_context *con)
290 {
291         struct sched *tmp;
292
293         /*
294          * We keep a small cache of schedule entries
295          * to minimize the number of necessary malloc()'s
296          */
297 #ifdef SCHED_MAX_CACHE
298         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
299                 con->schedccnt--;
300         else
301 #endif
302                 tmp = ast_calloc(1, sizeof(*tmp));
303
304         return tmp;
305 }
306
307 static void sched_release(struct sched_context *con, struct sched *tmp)
308 {
309         /*
310          * Add to the cache, or just free() if we
311          * already have too many cache entries
312          */
313
314 #ifdef SCHED_MAX_CACHE   
315         if (con->schedccnt < SCHED_MAX_CACHE) {
316                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
317                 con->schedccnt++;
318         } else
319 #endif
320                 ast_free(tmp);
321 }
322
323 /*! \brief
324  * Return the number of milliseconds 
325  * until the next scheduled event
326  */
327 int ast_sched_wait(struct sched_context *con)
328 {
329         int ms;
330         struct sched *s;
331
332         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
333
334         ast_mutex_lock(&con->lock);
335         if ((s = ast_heap_peek(con->sched_heap, 1))) {
336                 ms = ast_tvdiff_ms(s->when, ast_tvnow());
337                 if (ms < 0) {
338                         ms = 0;
339                 }
340         } else {
341                 ms = -1;
342         }
343         ast_mutex_unlock(&con->lock);
344
345         return ms;
346 }
347
348
349 /*! \brief
350  * Take a sched structure and put it in the
351  * queue, such that the soonest event is
352  * first in the list. 
353  */
354 static void schedule(struct sched_context *con, struct sched *s)
355 {
356         ast_heap_push(con->sched_heap, s);
357
358         if (!ast_hashtab_insert_safe(con->schedq_ht, s)) {
359                 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n", s->id);
360         }
361
362         con->schedcnt++;
363
364         if (con->schedcnt > con->highwater) {
365                 con->highwater = con->schedcnt;
366         }
367 }
368
369 /*! \brief
370  * given the last event *tv and the offset in milliseconds 'when',
371  * computes the next value,
372  */
373 static int sched_settime(struct timeval *t, int when)
374 {
375         struct timeval now = ast_tvnow();
376
377         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
378         if (ast_tvzero(*t))     /* not supplied, default to now */
379                 *t = now;
380         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
381         if (ast_tvcmp(*t, now) < 0) {
382                 *t = now;
383         }
384         return 0;
385 }
386
387 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
388 {
389         /* 0 means the schedule item is new; do not delete */
390         if (old_id > 0) {
391                 AST_SCHED_DEL(con, old_id);
392         }
393         return ast_sched_add_variable(con, when, callback, data, variable);
394 }
395
396 /*! \brief
397  * Schedule callback(data) to happen when ms into the future
398  */
399 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
400 {
401         struct sched *tmp;
402         int res = -1;
403
404         DEBUG(ast_debug(1, "ast_sched_add()\n"));
405
406         ast_mutex_lock(&con->lock);
407         if ((tmp = sched_alloc(con))) {
408                 tmp->id = con->eventcnt++;
409                 tmp->callback = callback;
410                 tmp->data = data;
411                 tmp->resched = when;
412                 tmp->variable = variable;
413                 tmp->when = ast_tv(0, 0);
414                 if (sched_settime(&tmp->when, when)) {
415                         sched_release(con, tmp);
416                 } else {
417                         schedule(con, tmp);
418                         res = tmp->id;
419                 }
420         }
421 #ifdef DUMP_SCHEDULER
422         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
423         if (option_debug)
424                 ast_sched_dump(con);
425 #endif
426         ast_mutex_unlock(&con->lock);
427
428         return res;
429 }
430
431 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
432 {
433         if (old_id > -1) {
434                 AST_SCHED_DEL(con, old_id);
435         }
436         return ast_sched_add(con, when, callback, data);
437 }
438
439 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
440 {
441         return ast_sched_add_variable(con, when, callback, data, 0);
442 }
443
444 const void *ast_sched_find_data(struct sched_context *con, int id)
445 {
446         struct sched tmp,*res;
447         tmp.id = id;
448         res = ast_hashtab_lookup(con->schedq_ht, &tmp);
449         if (res)
450                 return res->data;
451         return NULL;
452 }
453         
454 /*! \brief
455  * Delete the schedule entry with number
456  * "id".  It's nearly impossible that there
457  * would be two or more in the list with that
458  * id.
459  */
460 #ifndef AST_DEVMODE
461 int ast_sched_del(struct sched_context *con, int id)
462 #else
463 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
464 #endif
465 {
466         struct sched *s, tmp = {
467                 .id = id,
468         };
469
470         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
471         
472         ast_mutex_lock(&con->lock);
473         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
474         if (s) {
475                 if (!ast_heap_remove(con->sched_heap, s)) {
476                         ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->id);
477                 }
478
479                 if (!ast_hashtab_remove_this_object(con->schedq_ht, s)) {
480                         ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
481                 }
482
483                 con->schedcnt--;
484
485                 sched_release(con, s);
486         }
487         
488 #ifdef DUMP_SCHEDULER
489         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
490         if (option_debug)
491                 ast_sched_dump(con);
492 #endif
493         ast_mutex_unlock(&con->lock);
494
495         if (!s) {
496                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
497 #ifndef AST_DEVMODE
498                 ast_assert(s != NULL);
499 #else
500                 _ast_assert(0, "s != NULL", file, line, function);
501 #endif
502                 return -1;
503         }
504         
505         return 0;
506 }
507
508 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
509 {
510         int i, x;
511         struct sched *cur;
512         int countlist[cbnames->numassocs + 1];
513         size_t heap_size;
514         
515         memset(countlist, 0, sizeof(countlist));
516         ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
517
518         ast_mutex_lock(&con->lock);
519
520         heap_size = ast_heap_size(con->sched_heap);
521         for (x = 1; x <= heap_size; x++) {
522                 cur = ast_heap_peek(con->sched_heap, x);
523                 /* match the callback to the cblist */
524                 for (i = 0; i < cbnames->numassocs; i++) {
525                         if (cur->callback == cbnames->cblist[i]) {
526                                 break;
527                         }
528                 }
529                 if (i < cbnames->numassocs) {
530                         countlist[i]++;
531                 } else {
532                         countlist[cbnames->numassocs]++;
533                 }
534         }
535
536         ast_mutex_unlock(&con->lock);
537
538         for (i = 0; i < cbnames->numassocs; i++) {
539                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
540         }
541
542         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
543 }
544         
545 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
546 void ast_sched_dump(struct sched_context *con)
547 {
548         struct sched *q;
549         struct timeval when = ast_tvnow();
550         int x;
551         size_t heap_size;
552 #ifdef SCHED_MAX_CACHE
553         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
554 #else
555         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
556 #endif
557
558         ast_debug(1, "=============================================================\n");
559         ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
560         ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
561         ast_mutex_lock(&con->lock);
562         heap_size = ast_heap_size(con->sched_heap);
563         for (x = 1; x <= heap_size; x++) {
564                 struct timeval delta;
565                 q = ast_heap_peek(con->sched_heap, x);
566                 delta = ast_tvsub(q->when, when);
567                 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
568                         q->id,
569                         q->callback,
570                         q->data,
571                         (long)delta.tv_sec,
572                         (long int)delta.tv_usec);
573         }
574         ast_mutex_unlock(&con->lock);
575         ast_debug(1, "=============================================================\n");
576 }
577
578 /*! \brief
579  * Launch all events which need to be run at this time.
580  */
581 int ast_sched_runq(struct sched_context *con)
582 {
583         struct sched *current;
584         struct timeval when;
585         int numevents;
586         int res;
587
588         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
589                 
590         ast_mutex_lock(&con->lock);
591
592         for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
593                 /* schedule all events which are going to expire within 1ms.
594                  * We only care about millisecond accuracy anyway, so this will
595                  * help us get more than one event at one time if they are very
596                  * close together.
597                  */
598                 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
599                 if (ast_tvcmp(current->when, when) != -1) {
600                         break;
601                 }
602                 
603                 current = ast_heap_pop(con->sched_heap);
604
605                 if (!ast_hashtab_remove_this_object(con->schedq_ht, current)) {
606                         ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
607                 }
608
609                 con->schedcnt--;
610
611                 /*
612                  * At this point, the schedule queue is still intact.  We
613                  * have removed the first event and the rest is still there,
614                  * so it's permissible for the callback to add new events, but
615                  * trying to delete itself won't work because it isn't in
616                  * the schedule queue.  If that's what it wants to do, it 
617                  * should return 0.
618                  */
619                         
620                 ast_mutex_unlock(&con->lock);
621                 res = current->callback(current->data);
622                 ast_mutex_lock(&con->lock);
623                         
624                 if (res) {
625                         /*
626                          * If they return non-zero, we should schedule them to be
627                          * run again.
628                          */
629                         if (sched_settime(&current->when, current->variable? res : current->resched)) {
630                                 sched_release(con, current);
631                         } else {
632                                 schedule(con, current);
633                         }
634                 } else {
635                         /* No longer needed, so release it */
636                         sched_release(con, current);
637                 }
638         }
639
640         ast_mutex_unlock(&con->lock);
641         
642         return numevents;
643 }
644
645 long ast_sched_when(struct sched_context *con,int id)
646 {
647         struct sched *s, tmp;
648         long secs = -1;
649         DEBUG(ast_debug(1, "ast_sched_when()\n"));
650
651         ast_mutex_lock(&con->lock);
652         
653         /* these next 2 lines replace a lookup loop */
654         tmp.id = id;
655         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
656         
657         if (s) {
658                 struct timeval now = ast_tvnow();
659                 secs = s->when.tv_sec - now.tv_sec;
660         }
661         ast_mutex_unlock(&con->lock);
662         
663         return secs;
664 }