2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 1999 - 2010, Digium, Inc.
6 * Mark Spencer <markster@digium.com>
7 * Russell Bryant <russell@digium.com>
9 * See http://www.asterisk.org for more information about
10 * the Asterisk project. Please do not directly contact
11 * any of the maintainers of this project for assistance;
12 * the project provides a web site, mailing lists and IRC
13 * channels for your use.
15 * This program is free software, distributed under the terms of
16 * the GNU General Public License Version 2. See the LICENSE file
17 * at the top of the source tree.
22 * \brief Scheduler Routines (from cheops-NG)
24 * \author Mark Spencer <markster@digium.com>
28 <support_level>core</support_level>
33 ASTERISK_REGISTER_FILE()
35 #ifdef DEBUG_SCHEDULER
36 #define DEBUG(a) do { \
46 #include "asterisk/sched.h"
47 #include "asterisk/channel.h"
48 #include "asterisk/lock.h"
49 #include "asterisk/utils.h"
50 #include "asterisk/heap.h"
51 #include "asterisk/threadstorage.h"
54 * \brief Max num of schedule structs
56 * \note The max number of schedule structs to keep around
57 * for use. Undefine to disable schedule structure
58 * caching. (Only disable this on very low memory
61 #define SCHED_MAX_CACHE 128
63 AST_THREADSTORAGE(last_del_id);
66 * \brief Scheduler ID holder
68 * These form a queue on a scheduler context. When a new
69 * scheduled item is created, a sched_id is popped off the
70 * queue and its id is assigned to the new scheduled item.
71 * When the scheduled task is complete, the sched_id on that
72 * task is then pushed to the back of the queue to be re-used
73 * on some future scheduled item.
76 /*! Immutable ID number that is copied onto the scheduled task */
78 AST_LIST_ENTRY(sched_id) list;
82 AST_LIST_ENTRY(sched) list;
83 /*! The ID that has been popped off the scheduler context's queue */
84 struct sched_id *sched_id;
85 struct timeval when; /*!< Absolute time event should take place */
86 int resched; /*!< When to reschedule */
87 int variable; /*!< Use return value from callback to reschedule */
88 const void *data; /*!< Data */
89 ast_sched_cb callback; /*!< Callback */
92 * Used to synchronize between thread running a task and thread
93 * attempting to delete a task
96 /*! Indication that a running task was deleted. */
97 unsigned int deleted:1;
100 struct sched_thread {
106 struct ast_sched_context {
108 unsigned int eventcnt; /*!< Number of events processed */
109 unsigned int highwater; /*!< highest count so far */
110 struct ast_heap *sched_heap;
111 struct sched_thread *sched_thread;
112 /*! The scheduled task that is currently executing */
113 struct sched *currently_executing;
115 #ifdef SCHED_MAX_CACHE
116 AST_LIST_HEAD_NOLOCK(, sched) schedc; /*!< Cache of unused schedule structures and how many */
117 unsigned int schedccnt;
119 /*! Queue of scheduler task IDs to assign */
120 AST_LIST_HEAD_NOLOCK(, sched_id) id_queue;
121 /*! The number of IDs in the id_queue */
125 static void *sched_run(void *data)
127 struct ast_sched_context *con = data;
129 while (!con->sched_thread->stop) {
131 struct timespec ts = {
135 ast_mutex_lock(&con->lock);
137 if (con->sched_thread->stop) {
138 ast_mutex_unlock(&con->lock);
142 ms = ast_sched_wait(con);
145 ast_cond_wait(&con->sched_thread->cond, &con->lock);
148 tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
149 ts.tv_sec = tv.tv_sec;
150 ts.tv_nsec = tv.tv_usec * 1000;
151 ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
154 ast_mutex_unlock(&con->lock);
156 if (con->sched_thread->stop) {
166 static void sched_thread_destroy(struct ast_sched_context *con)
168 if (!con->sched_thread) {
172 if (con->sched_thread->thread != AST_PTHREADT_NULL) {
173 ast_mutex_lock(&con->lock);
174 con->sched_thread->stop = 1;
175 ast_cond_signal(&con->sched_thread->cond);
176 ast_mutex_unlock(&con->lock);
177 pthread_join(con->sched_thread->thread, NULL);
178 con->sched_thread->thread = AST_PTHREADT_NULL;
181 ast_cond_destroy(&con->sched_thread->cond);
183 ast_free(con->sched_thread);
185 con->sched_thread = NULL;
188 int ast_sched_start_thread(struct ast_sched_context *con)
190 struct sched_thread *st;
192 if (con->sched_thread) {
193 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
197 if (!(st = ast_calloc(1, sizeof(*st)))) {
201 ast_cond_init(&st->cond, NULL);
203 st->thread = AST_PTHREADT_NULL;
205 con->sched_thread = st;
207 if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
208 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
209 sched_thread_destroy(con);
216 static int sched_time_cmp(void *a, void *b)
218 return ast_tvcmp(((struct sched *) b)->when, ((struct sched *) a)->when);
221 struct ast_sched_context *ast_sched_context_create(void)
223 struct ast_sched_context *tmp;
225 if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
229 ast_mutex_init(&tmp->lock);
232 AST_LIST_HEAD_INIT_NOLOCK(&tmp->id_queue);
234 if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
235 offsetof(struct sched, __heap_index)))) {
236 ast_sched_context_destroy(tmp);
243 static void sched_free(struct sched *task)
245 /* task->sched_id will be NULL most of the time, but when the
246 * scheduler context shuts down, it will free all scheduled
247 * tasks, and in that case, the task->sched_id will be non-NULL
249 ast_free(task->sched_id);
250 ast_cond_destroy(&task->cond);
254 void ast_sched_context_destroy(struct ast_sched_context *con)
257 struct sched_id *sid;
259 sched_thread_destroy(con);
260 con->sched_thread = NULL;
262 ast_mutex_lock(&con->lock);
264 #ifdef SCHED_MAX_CACHE
265 while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
270 if (con->sched_heap) {
271 while ((s = ast_heap_pop(con->sched_heap))) {
274 ast_heap_destroy(con->sched_heap);
275 con->sched_heap = NULL;
278 while ((sid = AST_LIST_REMOVE_HEAD(&con->id_queue, list))) {
282 ast_mutex_unlock(&con->lock);
283 ast_mutex_destroy(&con->lock);
288 #define ID_QUEUE_INCREMENT 16
291 * \brief Add new scheduler IDs to the queue.
293 * \retval The number of IDs added to the queue
295 static int add_ids(struct ast_sched_context *con)
301 original_size = con->id_queue_size;
302 /* So we don't go overboard with the mallocs here, we'll just up
303 * the size of the list by a fixed amount each time instead of
304 * multiplying the size by any particular factor
306 new_size = original_size + ID_QUEUE_INCREMENT;
308 /* Overflow. Cap it at INT_MAX. */
311 for (i = original_size; i < new_size; ++i) {
312 struct sched_id *new_id;
314 new_id = ast_calloc(1, sizeof(*new_id));
319 AST_LIST_INSERT_TAIL(&con->id_queue, new_id, list);
320 ++con->id_queue_size;
323 return con->id_queue_size - original_size;
326 static int set_sched_id(struct ast_sched_context *con, struct sched *new_sched)
328 if (AST_LIST_EMPTY(&con->id_queue) && (add_ids(con) == 0)) {
332 new_sched->sched_id = AST_LIST_REMOVE_HEAD(&con->id_queue, list);
336 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
339 AST_LIST_INSERT_TAIL(&con->id_queue, tmp->sched_id, list);
340 tmp->sched_id = NULL;
344 * Add to the cache, or just free() if we
345 * already have too many cache entries
347 #ifdef SCHED_MAX_CACHE
348 if (con->schedccnt < SCHED_MAX_CACHE) {
349 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
356 static struct sched *sched_alloc(struct ast_sched_context *con)
361 * We keep a small cache of schedule entries
362 * to minimize the number of necessary malloc()'s
364 #ifdef SCHED_MAX_CACHE
365 if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
370 tmp = ast_calloc(1, sizeof(*tmp));
374 ast_cond_init(&tmp->cond, NULL);
377 if (set_sched_id(con, tmp)) {
378 sched_release(con, tmp);
385 void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
388 struct sched *current;
390 ast_mutex_lock(&con->lock);
391 while ((current = ast_heap_peek(con->sched_heap, i))) {
392 if (current->callback != match) {
397 ast_heap_remove(con->sched_heap, current);
399 cleanup_cb(current->data);
400 sched_release(con, current);
402 ast_mutex_unlock(&con->lock);
406 * Return the number of milliseconds
407 * until the next scheduled event
409 int ast_sched_wait(struct ast_sched_context *con)
414 DEBUG(ast_debug(1, "ast_sched_wait()\n"));
416 ast_mutex_lock(&con->lock);
417 if ((s = ast_heap_peek(con->sched_heap, 1))) {
418 ms = ast_tvdiff_ms(s->when, ast_tvnow());
425 ast_mutex_unlock(&con->lock);
432 * Take a sched structure and put it in the
433 * queue, such that the soonest event is
436 static void schedule(struct ast_sched_context *con, struct sched *s)
438 ast_heap_push(con->sched_heap, s);
440 if (ast_heap_size(con->sched_heap) > con->highwater) {
441 con->highwater = ast_heap_size(con->sched_heap);
446 * given the last event *tv and the offset in milliseconds 'when',
447 * computes the next value,
449 static int sched_settime(struct timeval *t, int when)
451 struct timeval now = ast_tvnow();
453 /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
454 if (ast_tvzero(*t)) /* not supplied, default to now */
456 *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
457 if (ast_tvcmp(*t, now) < 0) {
463 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
465 /* 0 means the schedule item is new; do not delete */
467 AST_SCHED_DEL(con, old_id);
469 return ast_sched_add_variable(con, when, callback, data, variable);
473 * Schedule callback(data) to happen when ms into the future
475 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
480 DEBUG(ast_debug(1, "ast_sched_add()\n"));
482 ast_mutex_lock(&con->lock);
483 if ((tmp = sched_alloc(con))) {
485 tmp->callback = callback;
488 tmp->variable = variable;
489 tmp->when = ast_tv(0, 0);
491 if (sched_settime(&tmp->when, when)) {
492 sched_release(con, tmp);
495 res = tmp->sched_id->id;
498 #ifdef DUMP_SCHEDULER
499 /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
503 if (con->sched_thread) {
504 ast_cond_signal(&con->sched_thread->cond);
506 ast_mutex_unlock(&con->lock);
511 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
514 AST_SCHED_DEL(con, old_id);
516 return ast_sched_add(con, when, callback, data);
519 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
521 return ast_sched_add_variable(con, when, callback, data, 0);
524 static struct sched *sched_find(struct ast_sched_context *con, int id)
529 heap_size = ast_heap_size(con->sched_heap);
530 for (x = 1; x <= heap_size; x++) {
531 struct sched *cur = ast_heap_peek(con->sched_heap, x);
533 if (cur->sched_id->id == id) {
541 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
544 const void *data = NULL;
546 ast_mutex_lock(&con->lock);
548 s = sched_find(con, id);
553 ast_mutex_unlock(&con->lock);
559 * Delete the schedule entry with number
560 * "id". It's nearly impossible that there
561 * would be two or more in the list with that
565 int ast_sched_del(struct ast_sched_context *con, int id)
567 int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
570 struct sched *s = NULL;
571 int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
573 DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
579 ast_mutex_lock(&con->lock);
581 s = sched_find(con, id);
583 if (!ast_heap_remove(con->sched_heap, s)) {
584 ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->sched_id->id);
586 sched_release(con, s);
587 } else if (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
588 s = con->currently_executing;
590 /* Wait for executing task to complete so that caller of ast_sched_del() does not
591 * free memory out from under the task.
593 while (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
594 ast_cond_wait(&s->cond, &con->lock);
596 /* Do not sched_release() here because ast_sched_runq() will do it */
599 #ifdef DUMP_SCHEDULER
600 /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
604 if (con->sched_thread) {
605 ast_cond_signal(&con->sched_thread->cond);
607 ast_mutex_unlock(&con->lock);
609 if (!s && *last_id != id) {
610 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
611 /* Removing nonexistent schedule entry shouldn't trigger assert (it was enabled in DEV_MODE);
612 * because in many places entries is deleted without having valid id. */
622 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
626 int countlist[cbnames->numassocs + 1];
629 memset(countlist, 0, sizeof(countlist));
630 ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
632 ast_mutex_lock(&con->lock);
634 heap_size = ast_heap_size(con->sched_heap);
635 for (x = 1; x <= heap_size; x++) {
636 cur = ast_heap_peek(con->sched_heap, x);
637 /* match the callback to the cblist */
638 for (i = 0; i < cbnames->numassocs; i++) {
639 if (cur->callback == cbnames->cblist[i]) {
643 if (i < cbnames->numassocs) {
646 countlist[cbnames->numassocs]++;
650 ast_mutex_unlock(&con->lock);
652 for (i = 0; i < cbnames->numassocs; i++) {
653 ast_str_append(buf, 0, " %s : %d\n", cbnames->list[i], countlist[i]);
656 ast_str_append(buf, 0, " <unknown> : %d\n", countlist[cbnames->numassocs]);
659 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
660 void ast_sched_dump(struct ast_sched_context *con)
663 struct timeval when = ast_tvnow();
666 #ifdef SCHED_MAX_CACHE
667 ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
669 ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
672 ast_debug(1, "=============================================================\n");
673 ast_debug(1, "|ID Callback Data Time (sec:ms) |\n");
674 ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
675 ast_mutex_lock(&con->lock);
676 heap_size = ast_heap_size(con->sched_heap);
677 for (x = 1; x <= heap_size; x++) {
678 struct timeval delta;
679 q = ast_heap_peek(con->sched_heap, x);
680 delta = ast_tvsub(q->when, when);
681 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
686 (long int)delta.tv_usec);
688 ast_mutex_unlock(&con->lock);
689 ast_debug(1, "=============================================================\n");
693 * Launch all events which need to be run at this time.
695 int ast_sched_runq(struct ast_sched_context *con)
697 struct sched *current;
702 DEBUG(ast_debug(1, "ast_sched_runq()\n"));
704 ast_mutex_lock(&con->lock);
706 when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
707 for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
708 /* schedule all events which are going to expire within 1ms.
709 * We only care about millisecond accuracy anyway, so this will
710 * help us get more than one event at one time if they are very
713 if (ast_tvcmp(current->when, when) != -1) {
717 current = ast_heap_pop(con->sched_heap);
720 * At this point, the schedule queue is still intact. We
721 * have removed the first event and the rest is still there,
722 * so it's permissible for the callback to add new events, but
723 * trying to delete itself won't work because it isn't in
724 * the schedule queue. If that's what it wants to do, it
728 con->currently_executing = current;
729 ast_mutex_unlock(&con->lock);
730 res = current->callback(current->data);
731 ast_mutex_lock(&con->lock);
732 con->currently_executing = NULL;
733 ast_cond_signal(¤t->cond);
735 if (res && !current->deleted) {
737 * If they return non-zero, we should schedule them to be
740 if (sched_settime(¤t->when, current->variable? res : current->resched)) {
741 sched_release(con, current);
743 schedule(con, current);
746 /* No longer needed, so release it */
747 sched_release(con, current);
751 ast_mutex_unlock(&con->lock);
756 long ast_sched_when(struct ast_sched_context *con,int id)
760 DEBUG(ast_debug(1, "ast_sched_when()\n"));
762 ast_mutex_lock(&con->lock);
764 s = sched_find(con, id);
766 struct timeval now = ast_tvnow();
767 secs = s->when.tv_sec - now.tv_sec;
770 ast_mutex_unlock(&con->lock);