Fix a number of problems with ast_sched_report().
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2008, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Scheduler Routines (from cheops-NG)
22  *
23  * \author Mark Spencer <markster@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #ifdef DEBUG_SCHEDULER
31 #define DEBUG(a) do { \
32         if (option_debug) \
33                 DEBUG_M(a) \
34         } while (0)
35 #else
36 #define DEBUG(a) 
37 #endif
38
39 #include <sys/time.h>
40
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/linkedlists.h"
46 #include "asterisk/dlinkedlists.h"
47 #include "asterisk/hashtab.h"
48
49 struct sched {
50         AST_DLLIST_ENTRY(sched) list;
51         int id;                       /*!< ID number of event */
52         struct timeval when;          /*!< Absolute time event should take place */
53         int resched;                  /*!< When to reschedule */
54         int variable;                 /*!< Use return value from callback to reschedule */
55         const void *data;             /*!< Data */
56         ast_sched_cb callback;        /*!< Callback */
57 };
58
59 struct sched_context {
60         ast_mutex_t lock;
61         unsigned int eventcnt;                  /*!< Number of events processed */
62         unsigned int schedcnt;                  /*!< Number of outstanding schedule events */
63         unsigned int highwater;                                 /*!< highest count so far */
64         AST_DLLIST_HEAD_NOLOCK(, sched) schedq;   /*!< Schedule entry and main queue */
65         struct ast_hashtab *schedq_ht;             /*!< hash table for fast searching */
66
67 #ifdef SCHED_MAX_CACHE
68         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
69         unsigned int schedccnt;
70 #endif
71 };
72
73 struct ast_sched_thread {
74         pthread_t thread;
75         ast_mutex_t lock;
76         ast_cond_t cond;
77         struct sched_context *context;
78         unsigned int stop:1;
79 };
80
81 static void *sched_run(void *data)
82 {
83         struct ast_sched_thread *st = data;
84
85         while (!st->stop) {
86                 int ms;
87                 struct timespec ts = {
88                         .tv_sec = 0,    
89                 };
90
91                 ast_mutex_lock(&st->lock);
92
93                 if (st->stop) {
94                         ast_mutex_unlock(&st->lock);
95                         return NULL;
96                 }
97
98                 ms = ast_sched_wait(st->context);
99
100                 if (ms == -1) {
101                         ast_cond_wait(&st->cond, &st->lock);
102                 } else {        
103                         struct timeval tv;
104                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
105                         ts.tv_sec = tv.tv_sec;
106                         ts.tv_nsec = tv.tv_usec * 1000;
107                         ast_cond_timedwait(&st->cond, &st->lock, &ts);
108                 }
109
110                 ast_mutex_unlock(&st->lock);
111
112                 if (st->stop) {
113                         return NULL;
114                 }
115
116                 ast_sched_runq(st->context);
117         }
118
119         return NULL;
120 }
121
122 void ast_sched_thread_poke(struct ast_sched_thread *st)
123 {
124         ast_mutex_lock(&st->lock);
125         ast_cond_signal(&st->cond);
126         ast_mutex_unlock(&st->lock);
127 }
128
129 struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
130 {
131         return st->context;
132 }
133
134 struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
135 {
136         if (st->thread != AST_PTHREADT_NULL) {
137                 ast_mutex_lock(&st->lock);
138                 st->stop = 1;
139                 ast_cond_signal(&st->cond);
140                 ast_mutex_unlock(&st->lock);
141                 pthread_join(st->thread, NULL);
142                 st->thread = AST_PTHREADT_NULL;
143         }
144
145         ast_mutex_destroy(&st->lock);
146         ast_cond_destroy(&st->cond);
147
148         if (st->context) {
149                 sched_context_destroy(st->context);
150                 st->context = NULL;
151         }
152
153         ast_free(st);
154
155         return NULL;
156 }
157
158 struct ast_sched_thread *ast_sched_thread_create(void)
159 {
160         struct ast_sched_thread *st;
161
162         if (!(st = ast_calloc(1, sizeof(*st)))) {
163                 return NULL;
164         }
165
166         ast_mutex_init(&st->lock);
167         ast_cond_init(&st->cond, NULL);
168
169         st->thread = AST_PTHREADT_NULL;
170
171         if (!(st->context = sched_context_create())) {
172                 ast_log(LOG_ERROR, "Failed to create scheduler\n");
173                 ast_sched_thread_destroy(st);
174                 return NULL;
175         }
176         
177         if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
178                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
179                 ast_sched_thread_destroy(st);
180                 return NULL;
181         }
182
183         return st;
184 }
185
186 int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
187                 const void *data, int variable)
188 {
189         int res;
190
191         ast_mutex_lock(&st->lock);
192         res = ast_sched_add_variable(st->context, when, cb, data, variable);
193         if (res != -1) {
194                 ast_cond_signal(&st->cond);
195         }
196         ast_mutex_unlock(&st->lock);
197
198         return res;
199 }
200
201 int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
202                 const void *data)
203 {
204         int res;
205
206         ast_mutex_lock(&st->lock);
207         res = ast_sched_add(st->context, when, cb, data);
208         if (res != -1) {
209                 ast_cond_signal(&st->cond);
210         }
211         ast_mutex_unlock(&st->lock);
212
213         return res;
214 }
215
216 /* hash routines for sched */
217
218 static int sched_cmp(const void *a, const void *b)
219 {
220         const struct sched *as = a;
221         const struct sched *bs = b;
222         return as->id != bs->id; /* return 0 on a match like strcmp would */
223 }
224
225 static unsigned int sched_hash(const void *obj)
226 {
227         const struct sched *s = obj;
228         unsigned int h = s->id;
229         return h;
230 }
231
232 struct sched_context *sched_context_create(void)
233 {
234         struct sched_context *tmp;
235
236         if (!(tmp = ast_calloc(1, sizeof(*tmp))))
237                 return NULL;
238
239         ast_mutex_init(&tmp->lock);
240         tmp->eventcnt = 1;
241         
242         tmp->schedq_ht = ast_hashtab_create(23, sched_cmp, ast_hashtab_resize_java, ast_hashtab_newsize_java, sched_hash, 1);
243         
244         return tmp;
245 }
246
247 void sched_context_destroy(struct sched_context *con)
248 {
249         struct sched *s;
250
251         ast_mutex_lock(&con->lock);
252
253 #ifdef SCHED_MAX_CACHE
254         /* Eliminate the cache */
255         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
256                 ast_free(s);
257 #endif
258
259         /* And the queue */
260         while ((s = AST_DLLIST_REMOVE_HEAD(&con->schedq, list)))
261                 ast_free(s);
262
263         ast_hashtab_destroy(con->schedq_ht, NULL);
264         con->schedq_ht = NULL;
265         
266         /* And the context */
267         ast_mutex_unlock(&con->lock);
268         ast_mutex_destroy(&con->lock);
269         ast_free(con);
270 }
271
272 static struct sched *sched_alloc(struct sched_context *con)
273 {
274         struct sched *tmp;
275
276         /*
277          * We keep a small cache of schedule entries
278          * to minimize the number of necessary malloc()'s
279          */
280 #ifdef SCHED_MAX_CACHE
281         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list)))
282                 con->schedccnt--;
283         else
284 #endif
285                 tmp = ast_calloc(1, sizeof(*tmp));
286
287         return tmp;
288 }
289
290 static void sched_release(struct sched_context *con, struct sched *tmp)
291 {
292         /*
293          * Add to the cache, or just free() if we
294          * already have too many cache entries
295          */
296
297 #ifdef SCHED_MAX_CACHE   
298         if (con->schedccnt < SCHED_MAX_CACHE) {
299                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
300                 con->schedccnt++;
301         } else
302 #endif
303                 ast_free(tmp);
304 }
305
306 /*! \brief
307  * Return the number of milliseconds 
308  * until the next scheduled event
309  */
310 int ast_sched_wait(struct sched_context *con)
311 {
312         int ms;
313
314         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
315
316         ast_mutex_lock(&con->lock);
317         if (AST_DLLIST_EMPTY(&con->schedq)) {
318                 ms = -1;
319         } else {
320                 ms = ast_tvdiff_ms(AST_DLLIST_FIRST(&con->schedq)->when, ast_tvnow());
321                 if (ms < 0)
322                         ms = 0;
323         }
324         ast_mutex_unlock(&con->lock);
325
326         return ms;
327 }
328
329
330 /*! \brief
331  * Take a sched structure and put it in the
332  * queue, such that the soonest event is
333  * first in the list. 
334  */
335 static void schedule(struct sched_context *con, struct sched *s)
336 {
337         struct sched *cur = NULL;
338         int ret;
339         int df = 0;
340         int de = 0;
341         struct sched *first = AST_DLLIST_FIRST(&con->schedq);
342         struct sched *last = AST_DLLIST_LAST(&con->schedq);
343
344         if (first)
345                 df = ast_tvdiff_us(s->when, first->when);
346         if (last)
347                 de = ast_tvdiff_us(s->when, last->when);
348
349         if (df < 0)
350                 df = -df;
351         if (de < 0)
352                 de = -de;
353
354         if (df < de) {
355                 AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
356                         if (ast_tvcmp(s->when, cur->when) == -1) {
357                                 AST_DLLIST_INSERT_BEFORE(&con->schedq, cur, s, list);
358                                 break;
359                         }
360                 }
361                 if (!cur) {
362                         AST_DLLIST_INSERT_TAIL(&con->schedq, s, list);
363                 }
364         } else {
365                 AST_DLLIST_TRAVERSE_BACKWARDS(&con->schedq, cur, list) {
366                         if (ast_tvcmp(s->when, cur->when) == 1) {
367                                 AST_DLLIST_INSERT_AFTER(&con->schedq, cur, s, list);
368                                 break;
369                         }
370                 }
371                 if (!cur) {
372                         AST_DLLIST_INSERT_HEAD(&con->schedq, s, list);
373                 }
374         }
375
376         ret = ast_hashtab_insert_safe(con->schedq_ht, s);
377         if (!ret)
378                 ast_log(LOG_WARNING,"Schedule Queue entry %d is already in table!\n",s->id);
379
380         con->schedcnt++;
381
382         if (con->schedcnt > con->highwater)
383                 con->highwater = con->schedcnt;
384 }
385
386 /*! \brief
387  * given the last event *tv and the offset in milliseconds 'when',
388  * computes the next value,
389  */
390 static int sched_settime(struct timeval *t, int when)
391 {
392         struct timeval now = ast_tvnow();
393
394         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
395         if (ast_tvzero(*t))     /* not supplied, default to now */
396                 *t = now;
397         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
398         if (ast_tvcmp(*t, now) < 0) {
399                 *t = now;
400         }
401         return 0;
402 }
403
404 int ast_sched_replace_variable(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
405 {
406         /* 0 means the schedule item is new; do not delete */
407         if (old_id > 0) {
408                 AST_SCHED_DEL(con, old_id);
409         }
410         return ast_sched_add_variable(con, when, callback, data, variable);
411 }
412
413 /*! \brief
414  * Schedule callback(data) to happen when ms into the future
415  */
416 int ast_sched_add_variable(struct sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
417 {
418         struct sched *tmp;
419         int res = -1;
420
421         DEBUG(ast_debug(1, "ast_sched_add()\n"));
422
423         ast_mutex_lock(&con->lock);
424         if ((tmp = sched_alloc(con))) {
425                 tmp->id = con->eventcnt++;
426                 tmp->callback = callback;
427                 tmp->data = data;
428                 tmp->resched = when;
429                 tmp->variable = variable;
430                 tmp->when = ast_tv(0, 0);
431                 if (sched_settime(&tmp->when, when)) {
432                         sched_release(con, tmp);
433                 } else {
434                         schedule(con, tmp);
435                         res = tmp->id;
436                 }
437         }
438 #ifdef DUMP_SCHEDULER
439         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
440         if (option_debug)
441                 ast_sched_dump(con);
442 #endif
443         ast_mutex_unlock(&con->lock);
444
445         return res;
446 }
447
448 int ast_sched_replace(int old_id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
449 {
450         if (old_id > -1) {
451                 AST_SCHED_DEL(con, old_id);
452         }
453         return ast_sched_add(con, when, callback, data);
454 }
455
456 int ast_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
457 {
458         return ast_sched_add_variable(con, when, callback, data, 0);
459 }
460
461 const void *ast_sched_find_data(struct sched_context *con, int id)
462 {
463         struct sched tmp,*res;
464         tmp.id = id;
465         res = ast_hashtab_lookup(con->schedq_ht, &tmp);
466         if (res)
467                 return res->data;
468         return NULL;
469 }
470         
471 /*! \brief
472  * Delete the schedule entry with number
473  * "id".  It's nearly impossible that there
474  * would be two or more in the list with that
475  * id.
476  */
477 #ifndef AST_DEVMODE
478 int ast_sched_del(struct sched_context *con, int id)
479 #else
480 int _ast_sched_del(struct sched_context *con, int id, const char *file, int line, const char *function)
481 #endif
482 {
483         struct sched *s, tmp;
484
485         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
486         
487         ast_mutex_lock(&con->lock);
488
489         /* OK, this is the heart of the sched performance upgrade.
490            If we have 4700 peers, we can have 4700+ entries in the
491            schedq list. searching this would take time. So, I add a 
492            hashtab to the context to keep track of each entry, by id.
493            I also leave the linked list alone, almost, --  I implement
494        a doubly-linked list instead, because it would do little good
495            to look up the id in a hashtab, and then have to run thru 
496            a couple thousand entries to remove it from the schedq list! */
497         tmp.id = id;
498         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
499         if (s) {
500                 struct sched *x = AST_DLLIST_REMOVE(&con->schedq, s, list);
501                 
502                 if (!x)
503                         ast_log(LOG_WARNING,"sched entry %d not in the schedq list?\n", s->id);
504
505                 if (!ast_hashtab_remove_this_object(con->schedq_ht, s))
506                         ast_log(LOG_WARNING,"Found sched entry %d, then couldn't remove it?\n", s->id);
507                 con->schedcnt--;
508                 sched_release(con, s);
509         }
510         
511 #ifdef DUMP_SCHEDULER
512         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
513         if (option_debug)
514                 ast_sched_dump(con);
515 #endif
516         ast_mutex_unlock(&con->lock);
517
518         if (!s) {
519                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
520 #ifndef AST_DEVMODE
521                 ast_assert(s != NULL);
522 #else
523                 _ast_assert(0, "s != NULL", file, line, function);
524 #endif
525                 return -1;
526         }
527         
528         return 0;
529 }
530
531 void ast_sched_report(struct sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
532 {
533         int i;
534         struct sched *cur;
535         int countlist[cbnames->numassocs + 1];
536         
537         ast_str_set(buf, 0, " Highwater = %d\n schedcnt = %d\n", con->highwater, con->schedcnt);
538
539         AST_DLLIST_TRAVERSE(&con->schedq, cur, list) {
540                 /* match the callback to the cblist */
541                 for (i = 0; i < cbnames->numassocs; i++) {
542                         if (cur->callback == cbnames->cblist[i]) {
543                                 break;
544                         }
545                 }
546                 if (i < cbnames->numassocs) {
547                         countlist[i]++;
548                 } else {
549                         countlist[cbnames->numassocs]++;
550                 }
551         }
552
553         for (i = 0; i < cbnames->numassocs; i++) {
554                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
555         }
556
557         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
558 }
559         
560 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
561 void ast_sched_dump(const struct sched_context *con)
562 {
563         struct sched *q;
564         struct timeval when = ast_tvnow();
565 #ifdef SCHED_MAX_CACHE
566         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d Cache, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->schedccnt, con->highwater);
567 #else
568         ast_debug(1, "Asterisk Schedule Dump (%d in Q, %d Total, %d high-water)\n", con->schedcnt, con->eventcnt - 1, con->highwater);
569 #endif
570
571         ast_debug(1, "=============================================================\n");
572         ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
573         ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
574         AST_DLLIST_TRAVERSE(&con->schedq, q, list) {
575                 struct timeval delta = ast_tvsub(q->when, when);
576
577                 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n", 
578                         q->id,
579                         q->callback,
580                         q->data,
581                         (long)delta.tv_sec,
582                         (long int)delta.tv_usec);
583         }
584         ast_debug(1, "=============================================================\n");
585         
586 }
587
588 /*! \brief
589  * Launch all events which need to be run at this time.
590  */
591 int ast_sched_runq(struct sched_context *con)
592 {
593         struct sched *current;
594         struct timeval when;
595         int numevents;
596         int res;
597
598         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
599                 
600         ast_mutex_lock(&con->lock);
601
602         for (numevents = 0; !AST_DLLIST_EMPTY(&con->schedq); numevents++) {
603                 /* schedule all events which are going to expire within 1ms.
604                  * We only care about millisecond accuracy anyway, so this will
605                  * help us get more than one event at one time if they are very
606                  * close together.
607                  */
608                 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
609                 if (ast_tvcmp(AST_DLLIST_FIRST(&con->schedq)->when, when) != -1)
610                         break;
611                 
612                 current = AST_DLLIST_REMOVE_HEAD(&con->schedq, list);
613                 if (!ast_hashtab_remove_this_object(con->schedq_ht, current))
614                         ast_log(LOG_ERROR,"Sched entry %d was in the schedq list but not in the hashtab???\n", current->id);
615
616                 con->schedcnt--;
617
618                 /*
619                  * At this point, the schedule queue is still intact.  We
620                  * have removed the first event and the rest is still there,
621                  * so it's permissible for the callback to add new events, but
622                  * trying to delete itself won't work because it isn't in
623                  * the schedule queue.  If that's what it wants to do, it 
624                  * should return 0.
625                  */
626                         
627                 ast_mutex_unlock(&con->lock);
628                 res = current->callback(current->data);
629                 ast_mutex_lock(&con->lock);
630                         
631                 if (res) {
632                         /*
633                          * If they return non-zero, we should schedule them to be
634                          * run again.
635                          */
636                         if (sched_settime(&current->when, current->variable? res : current->resched)) {
637                                 sched_release(con, current);
638                         } else
639                                 schedule(con, current);
640                 } else {
641                         /* No longer needed, so release it */
642                         sched_release(con, current);
643                 }
644         }
645
646         ast_mutex_unlock(&con->lock);
647         
648         return numevents;
649 }
650
651 long ast_sched_when(struct sched_context *con,int id)
652 {
653         struct sched *s, tmp;
654         long secs = -1;
655         DEBUG(ast_debug(1, "ast_sched_when()\n"));
656
657         ast_mutex_lock(&con->lock);
658         
659         /* these next 2 lines replace a lookup loop */
660         tmp.id = id;
661         s = ast_hashtab_lookup(con->schedq_ht, &tmp);
662         
663         if (s) {
664                 struct timeval now = ast_tvnow();
665                 secs = s->when.tv_sec - now.tv_sec;
666         }
667         ast_mutex_unlock(&con->lock);
668         
669         return secs;
670 }