Merge "stasis_bridges: Remove silly usage of RAII_VAR."
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2010, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  * Russell Bryant <russell@digium.com>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19
20 /*! \file
21  *
22  * \brief Scheduler Routines (from cheops-NG)
23  *
24  * \author Mark Spencer <markster@digium.com>
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 #ifdef DEBUG_SCHEDULER
34 #define DEBUG(a) do { \
35         if (option_debug) \
36                 DEBUG_M(a) \
37         } while (0)
38 #else
39 #define DEBUG(a)
40 #endif
41
42 #include <sys/time.h>
43
44 #include "asterisk/sched.h"
45 #include "asterisk/channel.h"
46 #include "asterisk/lock.h"
47 #include "asterisk/utils.h"
48 #include "asterisk/heap.h"
49 #include "asterisk/threadstorage.h"
50
51 /*!
52  * \brief Max num of schedule structs
53  *
54  * \note The max number of schedule structs to keep around
55  * for use.  Undefine to disable schedule structure
56  * caching. (Only disable this on very low memory
57  * machines)
58  */
59 #define SCHED_MAX_CACHE 128
60
61 AST_THREADSTORAGE(last_del_id);
62
63 /*!
64  * \brief Scheduler ID holder
65  *
66  * These form a queue on a scheduler context. When a new
67  * scheduled item is created, a sched_id is popped off the
68  * queue and its id is assigned to the new scheduled item.
69  * When the scheduled task is complete, the sched_id on that
70  * task is then pushed to the back of the queue to be re-used
71  * on some future scheduled item.
72  */
73 struct sched_id {
74         /*! Immutable ID number that is copied onto the scheduled task */
75         int id;
76         AST_LIST_ENTRY(sched_id) list;
77 };
78
79 struct sched {
80         AST_LIST_ENTRY(sched) list;
81         /*! The ID that has been popped off the scheduler context's queue */
82         struct sched_id *sched_id;
83         struct timeval when;          /*!< Absolute time event should take place */
84         /*!
85          * \brief Tie breaker in case the when is the same for multiple entries.
86          *
87          * \note The oldest expiring entry in the scheduler heap goes first.
88          * This is possible when multiple events are scheduled to expire at
89          * the same time by internal coding.
90          */
91         unsigned int tie_breaker;
92         int resched;                  /*!< When to reschedule */
93         int variable;                 /*!< Use return value from callback to reschedule */
94         const void *data;             /*!< Data */
95         ast_sched_cb callback;        /*!< Callback */
96         ssize_t __heap_index;
97         /*!
98          * Used to synchronize between thread running a task and thread
99          * attempting to delete a task
100          */
101         ast_cond_t cond;
102         /*! Indication that a running task was deleted. */
103         unsigned int deleted:1;
104 };
105
106 struct sched_thread {
107         pthread_t thread;
108         ast_cond_t cond;
109         unsigned int stop:1;
110 };
111
112 struct ast_sched_context {
113         ast_mutex_t lock;
114         unsigned int eventcnt;                  /*!< Number of events processed */
115         unsigned int highwater;                                 /*!< highest count so far */
116         /*! Next tie breaker in case events expire at the same time. */
117         unsigned int tie_breaker;
118         struct ast_heap *sched_heap;
119         struct sched_thread *sched_thread;
120         /*! The scheduled task that is currently executing */
121         struct sched *currently_executing;
122
123 #ifdef SCHED_MAX_CACHE
124         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
125         unsigned int schedccnt;
126 #endif
127         /*! Queue of scheduler task IDs to assign */
128         AST_LIST_HEAD_NOLOCK(, sched_id) id_queue;
129         /*! The number of IDs in the id_queue */
130         int id_queue_size;
131 };
132
133 static void *sched_run(void *data)
134 {
135         struct ast_sched_context *con = data;
136
137         while (!con->sched_thread->stop) {
138                 int ms;
139                 struct timespec ts = {
140                         .tv_sec = 0,
141                 };
142
143                 ast_mutex_lock(&con->lock);
144
145                 if (con->sched_thread->stop) {
146                         ast_mutex_unlock(&con->lock);
147                         return NULL;
148                 }
149
150                 ms = ast_sched_wait(con);
151
152                 if (ms == -1) {
153                         ast_cond_wait(&con->sched_thread->cond, &con->lock);
154                 } else {
155                         struct timeval tv;
156                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
157                         ts.tv_sec = tv.tv_sec;
158                         ts.tv_nsec = tv.tv_usec * 1000;
159                         ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
160                 }
161
162                 ast_mutex_unlock(&con->lock);
163
164                 if (con->sched_thread->stop) {
165                         return NULL;
166                 }
167
168                 ast_sched_runq(con);
169         }
170
171         return NULL;
172 }
173
174 static void sched_thread_destroy(struct ast_sched_context *con)
175 {
176         if (!con->sched_thread) {
177                 return;
178         }
179
180         if (con->sched_thread->thread != AST_PTHREADT_NULL) {
181                 ast_mutex_lock(&con->lock);
182                 con->sched_thread->stop = 1;
183                 ast_cond_signal(&con->sched_thread->cond);
184                 ast_mutex_unlock(&con->lock);
185                 pthread_join(con->sched_thread->thread, NULL);
186                 con->sched_thread->thread = AST_PTHREADT_NULL;
187         }
188
189         ast_cond_destroy(&con->sched_thread->cond);
190
191         ast_free(con->sched_thread);
192
193         con->sched_thread = NULL;
194 }
195
196 int ast_sched_start_thread(struct ast_sched_context *con)
197 {
198         struct sched_thread *st;
199
200         if (con->sched_thread) {
201                 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
202                 return -1;
203         }
204
205         if (!(st = ast_calloc(1, sizeof(*st)))) {
206                 return -1;
207         }
208
209         ast_cond_init(&st->cond, NULL);
210
211         st->thread = AST_PTHREADT_NULL;
212
213         con->sched_thread = st;
214
215         if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
216                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
217                 sched_thread_destroy(con);
218                 return -1;
219         }
220
221         return 0;
222 }
223
224 static int sched_time_cmp(void *va, void *vb)
225 {
226         struct sched *a = va;
227         struct sched *b = vb;
228         int cmp;
229
230         cmp = ast_tvcmp(b->when, a->when);
231         if (!cmp) {
232                 cmp = b->tie_breaker - a->tie_breaker;
233         }
234         return cmp;
235 }
236
237 struct ast_sched_context *ast_sched_context_create(void)
238 {
239         struct ast_sched_context *tmp;
240
241         if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
242                 return NULL;
243         }
244
245         ast_mutex_init(&tmp->lock);
246         tmp->eventcnt = 1;
247
248         AST_LIST_HEAD_INIT_NOLOCK(&tmp->id_queue);
249
250         if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
251                         offsetof(struct sched, __heap_index)))) {
252                 ast_sched_context_destroy(tmp);
253                 return NULL;
254         }
255
256         return tmp;
257 }
258
259 static void sched_free(struct sched *task)
260 {
261         /* task->sched_id will be NULL most of the time, but when the
262          * scheduler context shuts down, it will free all scheduled
263          * tasks, and in that case, the task->sched_id will be non-NULL
264          */
265         ast_free(task->sched_id);
266         ast_cond_destroy(&task->cond);
267         ast_free(task);
268 }
269
270 void ast_sched_context_destroy(struct ast_sched_context *con)
271 {
272         struct sched *s;
273         struct sched_id *sid;
274
275         sched_thread_destroy(con);
276         con->sched_thread = NULL;
277
278         ast_mutex_lock(&con->lock);
279
280 #ifdef SCHED_MAX_CACHE
281         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
282                 sched_free(s);
283         }
284 #endif
285
286         if (con->sched_heap) {
287                 while ((s = ast_heap_pop(con->sched_heap))) {
288                         sched_free(s);
289                 }
290                 ast_heap_destroy(con->sched_heap);
291                 con->sched_heap = NULL;
292         }
293
294         while ((sid = AST_LIST_REMOVE_HEAD(&con->id_queue, list))) {
295                 ast_free(sid);
296         }
297
298         ast_mutex_unlock(&con->lock);
299         ast_mutex_destroy(&con->lock);
300
301         ast_free(con);
302 }
303
304 #define ID_QUEUE_INCREMENT 16
305
306 /*!
307  * \brief Add new scheduler IDs to the queue.
308  *
309  * \retval The number of IDs added to the queue
310  */
311 static int add_ids(struct ast_sched_context *con)
312 {
313         int new_size;
314         int original_size;
315         int i;
316
317         original_size = con->id_queue_size;
318         /* So we don't go overboard with the mallocs here, we'll just up
319          * the size of the list by a fixed amount each time instead of
320          * multiplying the size by any particular factor
321          */
322         new_size = original_size + ID_QUEUE_INCREMENT;
323         if (new_size < 0) {
324                 /* Overflow. Cap it at INT_MAX. */
325                 new_size = INT_MAX;
326         }
327         for (i = original_size; i < new_size; ++i) {
328                 struct sched_id *new_id;
329
330                 new_id = ast_calloc(1, sizeof(*new_id));
331                 if (!new_id) {
332                         break;
333                 }
334
335                 /*
336                  * According to the API doxygen a sched ID of 0 is valid.
337                  * Unfortunately, 0 was never returned historically and
338                  * several users incorrectly coded usage of the returned
339                  * sched ID assuming that 0 was invalid.
340                  */
341                 new_id->id = ++con->id_queue_size;
342
343                 AST_LIST_INSERT_TAIL(&con->id_queue, new_id, list);
344         }
345
346         return con->id_queue_size - original_size;
347 }
348
349 static int set_sched_id(struct ast_sched_context *con, struct sched *new_sched)
350 {
351         if (AST_LIST_EMPTY(&con->id_queue) && (add_ids(con) == 0)) {
352                 return -1;
353         }
354
355         new_sched->sched_id = AST_LIST_REMOVE_HEAD(&con->id_queue, list);
356         return 0;
357 }
358
359 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
360 {
361         if (tmp->sched_id) {
362                 AST_LIST_INSERT_TAIL(&con->id_queue, tmp->sched_id, list);
363                 tmp->sched_id = NULL;
364         }
365
366         /*
367          * Add to the cache, or just free() if we
368          * already have too many cache entries
369          */
370 #ifdef SCHED_MAX_CACHE
371         if (con->schedccnt < SCHED_MAX_CACHE) {
372                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
373                 con->schedccnt++;
374         } else
375 #endif
376                 sched_free(tmp);
377 }
378
379 static struct sched *sched_alloc(struct ast_sched_context *con)
380 {
381         struct sched *tmp;
382
383         /*
384          * We keep a small cache of schedule entries
385          * to minimize the number of necessary malloc()'s
386          */
387 #ifdef SCHED_MAX_CACHE
388         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
389                 con->schedccnt--;
390         } else
391 #endif
392         {
393                 tmp = ast_calloc(1, sizeof(*tmp));
394                 if (!tmp) {
395                         return NULL;
396                 }
397                 ast_cond_init(&tmp->cond, NULL);
398         }
399
400         if (set_sched_id(con, tmp)) {
401                 sched_release(con, tmp);
402                 return NULL;
403         }
404
405         return tmp;
406 }
407
408 void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
409 {
410         int i = 1;
411         struct sched *current;
412
413         ast_mutex_lock(&con->lock);
414         while ((current = ast_heap_peek(con->sched_heap, i))) {
415                 if (current->callback != match) {
416                         i++;
417                         continue;
418                 }
419
420                 ast_heap_remove(con->sched_heap, current);
421
422                 cleanup_cb(current->data);
423                 sched_release(con, current);
424         }
425         ast_mutex_unlock(&con->lock);
426 }
427
428 /*! \brief
429  * Return the number of milliseconds
430  * until the next scheduled event
431  */
432 int ast_sched_wait(struct ast_sched_context *con)
433 {
434         int ms;
435         struct sched *s;
436
437         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
438
439         ast_mutex_lock(&con->lock);
440         if ((s = ast_heap_peek(con->sched_heap, 1))) {
441                 ms = ast_tvdiff_ms(s->when, ast_tvnow());
442                 if (ms < 0) {
443                         ms = 0;
444                 }
445         } else {
446                 ms = -1;
447         }
448         ast_mutex_unlock(&con->lock);
449
450         return ms;
451 }
452
453
454 /*! \brief
455  * Take a sched structure and put it in the
456  * queue, such that the soonest event is
457  * first in the list.
458  */
459 static void schedule(struct ast_sched_context *con, struct sched *s)
460 {
461         size_t size;
462
463         size = ast_heap_size(con->sched_heap);
464
465         /* Record the largest the scheduler heap became for reporting purposes. */
466         if (con->highwater <= size) {
467                 con->highwater = size + 1;
468         }
469
470         /* Determine the tie breaker value for the new entry. */
471         if (size) {
472                 ++con->tie_breaker;
473         } else {
474                 /*
475                  * Restart the sequence for the first entry to make integer
476                  * roll over more unlikely.
477                  */
478                 con->tie_breaker = 0;
479         }
480         s->tie_breaker = con->tie_breaker;
481
482         ast_heap_push(con->sched_heap, s);
483 }
484
485 /*! \brief
486  * given the last event *tv and the offset in milliseconds 'when',
487  * computes the next value,
488  */
489 static int sched_settime(struct timeval *t, int when)
490 {
491         struct timeval now = ast_tvnow();
492
493         if (when < 0) {
494                 /*
495                  * A negative when value is likely a bug as it
496                  * represents a VERY large timeout time.
497                  */
498                 ast_log(LOG_WARNING,
499                         "Bug likely: Negative time interval %d (interpreted as %u ms) requested!\n",
500                         when, (unsigned int) when);
501                 ast_assert(0);
502         }
503
504         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
505         if (ast_tvzero(*t))     /* not supplied, default to now */
506                 *t = now;
507         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
508         if (ast_tvcmp(*t, now) < 0) {
509                 *t = now;
510         }
511         return 0;
512 }
513
514 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
515 {
516         /* 0 means the schedule item is new; do not delete */
517         if (old_id > 0) {
518                 AST_SCHED_DEL(con, old_id);
519         }
520         return ast_sched_add_variable(con, when, callback, data, variable);
521 }
522
523 /*! \brief
524  * Schedule callback(data) to happen when ms into the future
525  */
526 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
527 {
528         struct sched *tmp;
529         int res = -1;
530
531         DEBUG(ast_debug(1, "ast_sched_add()\n"));
532
533         ast_mutex_lock(&con->lock);
534         if ((tmp = sched_alloc(con))) {
535                 con->eventcnt++;
536                 tmp->callback = callback;
537                 tmp->data = data;
538                 tmp->resched = when;
539                 tmp->variable = variable;
540                 tmp->when = ast_tv(0, 0);
541                 tmp->deleted = 0;
542                 if (sched_settime(&tmp->when, when)) {
543                         sched_release(con, tmp);
544                 } else {
545                         schedule(con, tmp);
546                         res = tmp->sched_id->id;
547                 }
548         }
549 #ifdef DUMP_SCHEDULER
550         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
551         if (option_debug)
552                 ast_sched_dump(con);
553 #endif
554         if (con->sched_thread) {
555                 ast_cond_signal(&con->sched_thread->cond);
556         }
557         ast_mutex_unlock(&con->lock);
558
559         return res;
560 }
561
562 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
563 {
564         if (old_id > -1) {
565                 AST_SCHED_DEL(con, old_id);
566         }
567         return ast_sched_add(con, when, callback, data);
568 }
569
570 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
571 {
572         return ast_sched_add_variable(con, when, callback, data, 0);
573 }
574
575 static struct sched *sched_find(struct ast_sched_context *con, int id)
576 {
577         int x;
578         size_t heap_size;
579
580         heap_size = ast_heap_size(con->sched_heap);
581         for (x = 1; x <= heap_size; x++) {
582                 struct sched *cur = ast_heap_peek(con->sched_heap, x);
583
584                 if (cur->sched_id->id == id) {
585                         return cur;
586                 }
587         }
588
589         return NULL;
590 }
591
592 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
593 {
594         struct sched *s;
595         const void *data = NULL;
596
597         ast_mutex_lock(&con->lock);
598
599         s = sched_find(con, id);
600         if (s) {
601                 data = s->data;
602         }
603
604         ast_mutex_unlock(&con->lock);
605
606         return data;
607 }
608
609 /*! \brief
610  * Delete the schedule entry with number
611  * "id".  It's nearly impossible that there
612  * would be two or more in the list with that
613  * id.
614  */
615 #ifndef AST_DEVMODE
616 int ast_sched_del(struct ast_sched_context *con, int id)
617 #else
618 int _ast_sched_del(struct ast_sched_context *con, int id, const char *file, int line, const char *function)
619 #endif
620 {
621         struct sched *s = NULL;
622         int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
623
624         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
625
626         if (id < 0) {
627                 return 0;
628         }
629
630         ast_mutex_lock(&con->lock);
631
632         s = sched_find(con, id);
633         if (s) {
634                 if (!ast_heap_remove(con->sched_heap, s)) {
635                         ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->sched_id->id);
636                 }
637                 sched_release(con, s);
638         } else if (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
639                 s = con->currently_executing;
640                 s->deleted = 1;
641                 /* Wait for executing task to complete so that caller of ast_sched_del() does not
642                  * free memory out from under the task.
643                  */
644                 while (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
645                         ast_cond_wait(&s->cond, &con->lock);
646                 }
647                 /* Do not sched_release() here because ast_sched_runq() will do it */
648         }
649
650 #ifdef DUMP_SCHEDULER
651         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
652         if (option_debug)
653                 ast_sched_dump(con);
654 #endif
655         if (con->sched_thread) {
656                 ast_cond_signal(&con->sched_thread->cond);
657         }
658         ast_mutex_unlock(&con->lock);
659
660         if (!s && *last_id != id) {
661                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
662                 /* Removing nonexistent schedule entry shouldn't trigger assert (it was enabled in DEV_MODE);
663                  * because in many places entries is deleted without having valid id. */
664                 *last_id = id;
665                 return -1;
666         } else if (!s) {
667                 return -1;
668         }
669
670         return 0;
671 }
672
673 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
674 {
675         int i, x;
676         struct sched *cur;
677         int countlist[cbnames->numassocs + 1];
678         size_t heap_size;
679
680         memset(countlist, 0, sizeof(countlist));
681         ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
682
683         ast_mutex_lock(&con->lock);
684
685         heap_size = ast_heap_size(con->sched_heap);
686         for (x = 1; x <= heap_size; x++) {
687                 cur = ast_heap_peek(con->sched_heap, x);
688                 /* match the callback to the cblist */
689                 for (i = 0; i < cbnames->numassocs; i++) {
690                         if (cur->callback == cbnames->cblist[i]) {
691                                 break;
692                         }
693                 }
694                 if (i < cbnames->numassocs) {
695                         countlist[i]++;
696                 } else {
697                         countlist[cbnames->numassocs]++;
698                 }
699         }
700
701         ast_mutex_unlock(&con->lock);
702
703         for (i = 0; i < cbnames->numassocs; i++) {
704                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
705         }
706
707         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
708 }
709
710 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
711 void ast_sched_dump(struct ast_sched_context *con)
712 {
713         struct sched *q;
714         struct timeval when = ast_tvnow();
715         int x;
716         size_t heap_size;
717 #ifdef SCHED_MAX_CACHE
718         ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
719 #else
720         ast_debug(1, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n", ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
721 #endif
722
723         ast_debug(1, "=============================================================\n");
724         ast_debug(1, "|ID    Callback          Data              Time  (sec:ms)   |\n");
725         ast_debug(1, "+-----+-----------------+-----------------+-----------------+\n");
726         ast_mutex_lock(&con->lock);
727         heap_size = ast_heap_size(con->sched_heap);
728         for (x = 1; x <= heap_size; x++) {
729                 struct timeval delta;
730                 q = ast_heap_peek(con->sched_heap, x);
731                 delta = ast_tvsub(q->when, when);
732                 ast_debug(1, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
733                         q->sched_id->id,
734                         q->callback,
735                         q->data,
736                         (long)delta.tv_sec,
737                         (long int)delta.tv_usec);
738         }
739         ast_mutex_unlock(&con->lock);
740         ast_debug(1, "=============================================================\n");
741 }
742
743 /*! \brief
744  * Launch all events which need to be run at this time.
745  */
746 int ast_sched_runq(struct ast_sched_context *con)
747 {
748         struct sched *current;
749         struct timeval when;
750         int numevents;
751         int res;
752
753         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
754
755         ast_mutex_lock(&con->lock);
756
757         when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
758         for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
759                 /* schedule all events which are going to expire within 1ms.
760                  * We only care about millisecond accuracy anyway, so this will
761                  * help us get more than one event at one time if they are very
762                  * close together.
763                  */
764                 if (ast_tvcmp(current->when, when) != -1) {
765                         break;
766                 }
767
768                 current = ast_heap_pop(con->sched_heap);
769
770                 /*
771                  * At this point, the schedule queue is still intact.  We
772                  * have removed the first event and the rest is still there,
773                  * so it's permissible for the callback to add new events, but
774                  * trying to delete itself won't work because it isn't in
775                  * the schedule queue.  If that's what it wants to do, it
776                  * should return 0.
777                  */
778
779                 con->currently_executing = current;
780                 ast_mutex_unlock(&con->lock);
781                 res = current->callback(current->data);
782                 ast_mutex_lock(&con->lock);
783                 con->currently_executing = NULL;
784                 ast_cond_signal(&current->cond);
785
786                 if (res && !current->deleted) {
787                         /*
788                          * If they return non-zero, we should schedule them to be
789                          * run again.
790                          */
791                         if (sched_settime(&current->when, current->variable? res : current->resched)) {
792                                 sched_release(con, current);
793                         } else {
794                                 schedule(con, current);
795                         }
796                 } else {
797                         /* No longer needed, so release it */
798                         sched_release(con, current);
799                 }
800         }
801
802         ast_mutex_unlock(&con->lock);
803
804         return numevents;
805 }
806
807 long ast_sched_when(struct ast_sched_context *con,int id)
808 {
809         struct sched *s;
810         long secs = -1;
811         DEBUG(ast_debug(1, "ast_sched_when()\n"));
812
813         ast_mutex_lock(&con->lock);
814
815         s = sched_find(con, id);
816         if (s) {
817                 struct timeval now = ast_tvnow();
818                 secs = s->when.tv_sec - now.tv_sec;
819         }
820
821         ast_mutex_unlock(&con->lock);
822
823         return secs;
824 }