CI: Various updates to buildAsterisk.sh
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2010, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  * Russell Bryant <russell@digium.com>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19
20 /*! \file
21  *
22  * \brief Scheduler Routines (from cheops-NG)
23  *
24  * \author Mark Spencer <markster@digium.com>
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 #ifdef DEBUG_SCHEDULER
34 #define DEBUG(a) a
35 #else
36 #define DEBUG(a)
37 #endif
38
39 #include <sys/time.h>
40
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/heap.h"
46 #include "asterisk/threadstorage.h"
47
48 /*!
49  * \brief Max num of schedule structs
50  *
51  * \note The max number of schedule structs to keep around
52  * for use.  Undefine to disable schedule structure
53  * caching. (Only disable this on very low memory
54  * machines)
55  */
56 #define SCHED_MAX_CACHE 128
57
58 AST_THREADSTORAGE(last_del_id);
59
60 /*!
61  * \brief Scheduler ID holder
62  *
63  * These form a queue on a scheduler context. When a new
64  * scheduled item is created, a sched_id is popped off the
65  * queue and its id is assigned to the new scheduled item.
66  * When the scheduled task is complete, the sched_id on that
67  * task is then pushed to the back of the queue to be re-used
68  * on some future scheduled item.
69  */
70 struct sched_id {
71         /*! Immutable ID number that is copied onto the scheduled task */
72         int id;
73         AST_LIST_ENTRY(sched_id) list;
74 };
75
76 struct sched {
77         AST_LIST_ENTRY(sched) list;
78         /*! The ID that has been popped off the scheduler context's queue */
79         struct sched_id *sched_id;
80         struct timeval when;          /*!< Absolute time event should take place */
81         /*!
82          * \brief Tie breaker in case the when is the same for multiple entries.
83          *
84          * \note The oldest expiring entry in the scheduler heap goes first.
85          * This is possible when multiple events are scheduled to expire at
86          * the same time by internal coding.
87          */
88         unsigned int tie_breaker;
89         int resched;                  /*!< When to reschedule */
90         int variable;                 /*!< Use return value from callback to reschedule */
91         const void *data;             /*!< Data */
92         ast_sched_cb callback;        /*!< Callback */
93         ssize_t __heap_index;
94         /*!
95          * Used to synchronize between thread running a task and thread
96          * attempting to delete a task
97          */
98         ast_cond_t cond;
99         /*! Indication that a running task was deleted. */
100         unsigned int deleted:1;
101 };
102
103 struct sched_thread {
104         pthread_t thread;
105         ast_cond_t cond;
106         unsigned int stop:1;
107 };
108
109 struct ast_sched_context {
110         ast_mutex_t lock;
111         unsigned int eventcnt;                  /*!< Number of events processed */
112         unsigned int highwater;                                 /*!< highest count so far */
113         /*! Next tie breaker in case events expire at the same time. */
114         unsigned int tie_breaker;
115         struct ast_heap *sched_heap;
116         struct sched_thread *sched_thread;
117         /*! The scheduled task that is currently executing */
118         struct sched *currently_executing;
119
120 #ifdef SCHED_MAX_CACHE
121         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
122         unsigned int schedccnt;
123 #endif
124         /*! Queue of scheduler task IDs to assign */
125         AST_LIST_HEAD_NOLOCK(, sched_id) id_queue;
126         /*! The number of IDs in the id_queue */
127         int id_queue_size;
128 };
129
130 static void *sched_run(void *data)
131 {
132         struct ast_sched_context *con = data;
133
134         while (!con->sched_thread->stop) {
135                 int ms;
136                 struct timespec ts = {
137                         .tv_sec = 0,
138                 };
139
140                 ast_mutex_lock(&con->lock);
141
142                 if (con->sched_thread->stop) {
143                         ast_mutex_unlock(&con->lock);
144                         return NULL;
145                 }
146
147                 ms = ast_sched_wait(con);
148
149                 if (ms == -1) {
150                         ast_cond_wait(&con->sched_thread->cond, &con->lock);
151                 } else {
152                         struct timeval tv;
153                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
154                         ts.tv_sec = tv.tv_sec;
155                         ts.tv_nsec = tv.tv_usec * 1000;
156                         ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
157                 }
158
159                 ast_mutex_unlock(&con->lock);
160
161                 if (con->sched_thread->stop) {
162                         return NULL;
163                 }
164
165                 ast_sched_runq(con);
166         }
167
168         return NULL;
169 }
170
171 static void sched_thread_destroy(struct ast_sched_context *con)
172 {
173         if (!con->sched_thread) {
174                 return;
175         }
176
177         if (con->sched_thread->thread != AST_PTHREADT_NULL) {
178                 ast_mutex_lock(&con->lock);
179                 con->sched_thread->stop = 1;
180                 ast_cond_signal(&con->sched_thread->cond);
181                 ast_mutex_unlock(&con->lock);
182                 pthread_join(con->sched_thread->thread, NULL);
183                 con->sched_thread->thread = AST_PTHREADT_NULL;
184         }
185
186         ast_cond_destroy(&con->sched_thread->cond);
187
188         ast_free(con->sched_thread);
189
190         con->sched_thread = NULL;
191 }
192
193 int ast_sched_start_thread(struct ast_sched_context *con)
194 {
195         struct sched_thread *st;
196
197         if (con->sched_thread) {
198                 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
199                 return -1;
200         }
201
202         if (!(st = ast_calloc(1, sizeof(*st)))) {
203                 return -1;
204         }
205
206         ast_cond_init(&st->cond, NULL);
207
208         st->thread = AST_PTHREADT_NULL;
209
210         con->sched_thread = st;
211
212         if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
213                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
214                 sched_thread_destroy(con);
215                 return -1;
216         }
217
218         return 0;
219 }
220
221 static int sched_time_cmp(void *va, void *vb)
222 {
223         struct sched *a = va;
224         struct sched *b = vb;
225         int cmp;
226
227         cmp = ast_tvcmp(b->when, a->when);
228         if (!cmp) {
229                 cmp = b->tie_breaker - a->tie_breaker;
230         }
231         return cmp;
232 }
233
234 struct ast_sched_context *ast_sched_context_create(void)
235 {
236         struct ast_sched_context *tmp;
237
238         if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
239                 return NULL;
240         }
241
242         ast_mutex_init(&tmp->lock);
243         tmp->eventcnt = 1;
244
245         AST_LIST_HEAD_INIT_NOLOCK(&tmp->id_queue);
246
247         if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
248                         offsetof(struct sched, __heap_index)))) {
249                 ast_sched_context_destroy(tmp);
250                 return NULL;
251         }
252
253         return tmp;
254 }
255
256 static void sched_free(struct sched *task)
257 {
258         /* task->sched_id will be NULL most of the time, but when the
259          * scheduler context shuts down, it will free all scheduled
260          * tasks, and in that case, the task->sched_id will be non-NULL
261          */
262         ast_free(task->sched_id);
263         ast_cond_destroy(&task->cond);
264         ast_free(task);
265 }
266
267 void ast_sched_context_destroy(struct ast_sched_context *con)
268 {
269         struct sched *s;
270         struct sched_id *sid;
271
272         sched_thread_destroy(con);
273         con->sched_thread = NULL;
274
275         ast_mutex_lock(&con->lock);
276
277 #ifdef SCHED_MAX_CACHE
278         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
279                 sched_free(s);
280         }
281 #endif
282
283         if (con->sched_heap) {
284                 while ((s = ast_heap_pop(con->sched_heap))) {
285                         sched_free(s);
286                 }
287                 ast_heap_destroy(con->sched_heap);
288                 con->sched_heap = NULL;
289         }
290
291         while ((sid = AST_LIST_REMOVE_HEAD(&con->id_queue, list))) {
292                 ast_free(sid);
293         }
294
295         ast_mutex_unlock(&con->lock);
296         ast_mutex_destroy(&con->lock);
297
298         ast_free(con);
299 }
300
301 #define ID_QUEUE_INCREMENT 16
302
303 /*!
304  * \brief Add new scheduler IDs to the queue.
305  *
306  * \retval The number of IDs added to the queue
307  */
308 static int add_ids(struct ast_sched_context *con)
309 {
310         int new_size;
311         int original_size;
312         int i;
313
314         original_size = con->id_queue_size;
315         /* So we don't go overboard with the mallocs here, we'll just up
316          * the size of the list by a fixed amount each time instead of
317          * multiplying the size by any particular factor
318          */
319         new_size = original_size + ID_QUEUE_INCREMENT;
320         if (new_size < 0) {
321                 /* Overflow. Cap it at INT_MAX. */
322                 new_size = INT_MAX;
323         }
324         for (i = original_size; i < new_size; ++i) {
325                 struct sched_id *new_id;
326
327                 new_id = ast_calloc(1, sizeof(*new_id));
328                 if (!new_id) {
329                         break;
330                 }
331
332                 /*
333                  * According to the API doxygen a sched ID of 0 is valid.
334                  * Unfortunately, 0 was never returned historically and
335                  * several users incorrectly coded usage of the returned
336                  * sched ID assuming that 0 was invalid.
337                  */
338                 new_id->id = ++con->id_queue_size;
339
340                 AST_LIST_INSERT_TAIL(&con->id_queue, new_id, list);
341         }
342
343         return con->id_queue_size - original_size;
344 }
345
346 static int set_sched_id(struct ast_sched_context *con, struct sched *new_sched)
347 {
348         if (AST_LIST_EMPTY(&con->id_queue) && (add_ids(con) == 0)) {
349                 return -1;
350         }
351
352         new_sched->sched_id = AST_LIST_REMOVE_HEAD(&con->id_queue, list);
353         return 0;
354 }
355
356 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
357 {
358         if (tmp->sched_id) {
359                 AST_LIST_INSERT_TAIL(&con->id_queue, tmp->sched_id, list);
360                 tmp->sched_id = NULL;
361         }
362
363         /*
364          * Add to the cache, or just free() if we
365          * already have too many cache entries
366          */
367 #ifdef SCHED_MAX_CACHE
368         if (con->schedccnt < SCHED_MAX_CACHE) {
369                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
370                 con->schedccnt++;
371         } else
372 #endif
373                 sched_free(tmp);
374 }
375
376 static struct sched *sched_alloc(struct ast_sched_context *con)
377 {
378         struct sched *tmp;
379
380         /*
381          * We keep a small cache of schedule entries
382          * to minimize the number of necessary malloc()'s
383          */
384 #ifdef SCHED_MAX_CACHE
385         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
386                 con->schedccnt--;
387         } else
388 #endif
389         {
390                 tmp = ast_calloc(1, sizeof(*tmp));
391                 if (!tmp) {
392                         return NULL;
393                 }
394                 ast_cond_init(&tmp->cond, NULL);
395         }
396
397         if (set_sched_id(con, tmp)) {
398                 sched_release(con, tmp);
399                 return NULL;
400         }
401
402         return tmp;
403 }
404
405 void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
406 {
407         int i = 1;
408         struct sched *current;
409
410         ast_mutex_lock(&con->lock);
411         while ((current = ast_heap_peek(con->sched_heap, i))) {
412                 if (current->callback != match) {
413                         i++;
414                         continue;
415                 }
416
417                 ast_heap_remove(con->sched_heap, current);
418
419                 cleanup_cb(current->data);
420                 sched_release(con, current);
421         }
422         ast_mutex_unlock(&con->lock);
423 }
424
425 /*! \brief
426  * Return the number of milliseconds
427  * until the next scheduled event
428  */
429 int ast_sched_wait(struct ast_sched_context *con)
430 {
431         int ms;
432         struct sched *s;
433
434         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
435
436         ast_mutex_lock(&con->lock);
437         if ((s = ast_heap_peek(con->sched_heap, 1))) {
438                 ms = ast_tvdiff_ms(s->when, ast_tvnow());
439                 if (ms < 0) {
440                         ms = 0;
441                 }
442         } else {
443                 ms = -1;
444         }
445         ast_mutex_unlock(&con->lock);
446
447         return ms;
448 }
449
450
451 /*! \brief
452  * Take a sched structure and put it in the
453  * queue, such that the soonest event is
454  * first in the list.
455  */
456 static void schedule(struct ast_sched_context *con, struct sched *s)
457 {
458         size_t size;
459
460         size = ast_heap_size(con->sched_heap);
461
462         /* Record the largest the scheduler heap became for reporting purposes. */
463         if (con->highwater <= size) {
464                 con->highwater = size + 1;
465         }
466
467         /* Determine the tie breaker value for the new entry. */
468         if (size) {
469                 ++con->tie_breaker;
470         } else {
471                 /*
472                  * Restart the sequence for the first entry to make integer
473                  * roll over more unlikely.
474                  */
475                 con->tie_breaker = 0;
476         }
477         s->tie_breaker = con->tie_breaker;
478
479         ast_heap_push(con->sched_heap, s);
480 }
481
482 /*! \brief
483  * given the last event *tv and the offset in milliseconds 'when',
484  * computes the next value,
485  */
486 static int sched_settime(struct timeval *t, int when)
487 {
488         struct timeval now = ast_tvnow();
489
490         if (when < 0) {
491                 /*
492                  * A negative when value is likely a bug as it
493                  * represents a VERY large timeout time.
494                  */
495                 ast_log(LOG_WARNING,
496                         "Bug likely: Negative time interval %d (interpreted as %u ms) requested!\n",
497                         when, (unsigned int) when);
498                 ast_assert(0);
499         }
500
501         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
502         if (ast_tvzero(*t))     /* not supplied, default to now */
503                 *t = now;
504         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
505         if (ast_tvcmp(*t, now) < 0) {
506                 *t = now;
507         }
508         return 0;
509 }
510
511 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
512 {
513         /* 0 means the schedule item is new; do not delete */
514         if (old_id > 0) {
515                 AST_SCHED_DEL(con, old_id);
516         }
517         return ast_sched_add_variable(con, when, callback, data, variable);
518 }
519
520 /*! \brief
521  * Schedule callback(data) to happen when ms into the future
522  */
523 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
524 {
525         struct sched *tmp;
526         int res = -1;
527
528         DEBUG(ast_debug(1, "ast_sched_add()\n"));
529
530         ast_mutex_lock(&con->lock);
531         if ((tmp = sched_alloc(con))) {
532                 con->eventcnt++;
533                 tmp->callback = callback;
534                 tmp->data = data;
535                 tmp->resched = when;
536                 tmp->variable = variable;
537                 tmp->when = ast_tv(0, 0);
538                 tmp->deleted = 0;
539                 if (sched_settime(&tmp->when, when)) {
540                         sched_release(con, tmp);
541                 } else {
542                         schedule(con, tmp);
543                         res = tmp->sched_id->id;
544                 }
545         }
546 #ifdef DUMP_SCHEDULER
547         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
548         ast_sched_dump(con);
549 #endif
550         if (con->sched_thread) {
551                 ast_cond_signal(&con->sched_thread->cond);
552         }
553         ast_mutex_unlock(&con->lock);
554
555         return res;
556 }
557
558 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
559 {
560         if (old_id > -1) {
561                 AST_SCHED_DEL(con, old_id);
562         }
563         return ast_sched_add(con, when, callback, data);
564 }
565
566 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
567 {
568         return ast_sched_add_variable(con, when, callback, data, 0);
569 }
570
571 static struct sched *sched_find(struct ast_sched_context *con, int id)
572 {
573         int x;
574         size_t heap_size;
575
576         heap_size = ast_heap_size(con->sched_heap);
577         for (x = 1; x <= heap_size; x++) {
578                 struct sched *cur = ast_heap_peek(con->sched_heap, x);
579
580                 if (cur->sched_id->id == id) {
581                         return cur;
582                 }
583         }
584
585         return NULL;
586 }
587
588 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
589 {
590         struct sched *s;
591         const void *data = NULL;
592
593         ast_mutex_lock(&con->lock);
594
595         s = sched_find(con, id);
596         if (s) {
597                 data = s->data;
598         }
599
600         ast_mutex_unlock(&con->lock);
601
602         return data;
603 }
604
605 /*! \brief
606  * Delete the schedule entry with number
607  * "id".  It's nearly impossible that there
608  * would be two or more in the list with that
609  * id.
610  */
611 int ast_sched_del(struct ast_sched_context *con, int id)
612 {
613         struct sched *s = NULL;
614         int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
615
616         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
617
618         if (id < 0) {
619                 return 0;
620         }
621
622         ast_mutex_lock(&con->lock);
623
624         s = sched_find(con, id);
625         if (s) {
626                 if (!ast_heap_remove(con->sched_heap, s)) {
627                         ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->sched_id->id);
628                 }
629                 sched_release(con, s);
630         } else if (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
631                 s = con->currently_executing;
632                 s->deleted = 1;
633                 /* Wait for executing task to complete so that caller of ast_sched_del() does not
634                  * free memory out from under the task.
635                  */
636                 while (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
637                         ast_cond_wait(&s->cond, &con->lock);
638                 }
639                 /* Do not sched_release() here because ast_sched_runq() will do it */
640         }
641
642 #ifdef DUMP_SCHEDULER
643         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
644         ast_sched_dump(con);
645 #endif
646         if (con->sched_thread) {
647                 ast_cond_signal(&con->sched_thread->cond);
648         }
649         ast_mutex_unlock(&con->lock);
650
651         if (!s && *last_id != id) {
652                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
653                 /* Removing nonexistent schedule entry shouldn't trigger assert (it was enabled in DEV_MODE);
654                  * because in many places entries is deleted without having valid id. */
655                 *last_id = id;
656                 return -1;
657         } else if (!s) {
658                 return -1;
659         }
660
661         return 0;
662 }
663
664 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
665 {
666         int i, x;
667         struct sched *cur;
668         int countlist[cbnames->numassocs + 1];
669         size_t heap_size;
670
671         memset(countlist, 0, sizeof(countlist));
672         ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
673
674         ast_mutex_lock(&con->lock);
675
676         heap_size = ast_heap_size(con->sched_heap);
677         for (x = 1; x <= heap_size; x++) {
678                 cur = ast_heap_peek(con->sched_heap, x);
679                 /* match the callback to the cblist */
680                 for (i = 0; i < cbnames->numassocs; i++) {
681                         if (cur->callback == cbnames->cblist[i]) {
682                                 break;
683                         }
684                 }
685                 if (i < cbnames->numassocs) {
686                         countlist[i]++;
687                 } else {
688                         countlist[cbnames->numassocs]++;
689                 }
690         }
691
692         ast_mutex_unlock(&con->lock);
693
694         for (i = 0; i < cbnames->numassocs; i++) {
695                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
696         }
697
698         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
699 }
700
701 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
702 void ast_sched_dump(struct ast_sched_context *con)
703 {
704         struct sched *q;
705         struct timeval when;
706         int x;
707         size_t heap_size;
708
709         if (!DEBUG_ATLEAST(1)) {
710                 return;
711         }
712
713         when = ast_tvnow();
714 #ifdef SCHED_MAX_CACHE
715         ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n",
716                 ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
717 #else
718         ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n",
719                 ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
720 #endif
721
722         ast_log(LOG_DEBUG, "=============================================================\n");
723         ast_log(LOG_DEBUG, "|ID    Callback          Data              Time  (sec:ms)   |\n");
724         ast_log(LOG_DEBUG, "+-----+-----------------+-----------------+-----------------+\n");
725         ast_mutex_lock(&con->lock);
726         heap_size = ast_heap_size(con->sched_heap);
727         for (x = 1; x <= heap_size; x++) {
728                 struct timeval delta;
729                 q = ast_heap_peek(con->sched_heap, x);
730                 delta = ast_tvsub(q->when, when);
731                 ast_log(LOG_DEBUG, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
732                         q->sched_id->id,
733                         q->callback,
734                         q->data,
735                         (long)delta.tv_sec,
736                         (long int)delta.tv_usec);
737         }
738         ast_mutex_unlock(&con->lock);
739         ast_log(LOG_DEBUG, "=============================================================\n");
740 }
741
742 /*! \brief
743  * Launch all events which need to be run at this time.
744  */
745 int ast_sched_runq(struct ast_sched_context *con)
746 {
747         struct sched *current;
748         struct timeval when;
749         int numevents;
750         int res;
751
752         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
753
754         ast_mutex_lock(&con->lock);
755
756         when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
757         for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
758                 /* schedule all events which are going to expire within 1ms.
759                  * We only care about millisecond accuracy anyway, so this will
760                  * help us get more than one event at one time if they are very
761                  * close together.
762                  */
763                 if (ast_tvcmp(current->when, when) != -1) {
764                         break;
765                 }
766
767                 current = ast_heap_pop(con->sched_heap);
768
769                 /*
770                  * At this point, the schedule queue is still intact.  We
771                  * have removed the first event and the rest is still there,
772                  * so it's permissible for the callback to add new events, but
773                  * trying to delete itself won't work because it isn't in
774                  * the schedule queue.  If that's what it wants to do, it
775                  * should return 0.
776                  */
777
778                 con->currently_executing = current;
779                 ast_mutex_unlock(&con->lock);
780                 res = current->callback(current->data);
781                 ast_mutex_lock(&con->lock);
782                 con->currently_executing = NULL;
783                 ast_cond_signal(&current->cond);
784
785                 if (res && !current->deleted) {
786                         /*
787                          * If they return non-zero, we should schedule them to be
788                          * run again.
789                          */
790                         if (sched_settime(&current->when, current->variable? res : current->resched)) {
791                                 sched_release(con, current);
792                         } else {
793                                 schedule(con, current);
794                         }
795                 } else {
796                         /* No longer needed, so release it */
797                         sched_release(con, current);
798                 }
799         }
800
801         ast_mutex_unlock(&con->lock);
802
803         return numevents;
804 }
805
806 long ast_sched_when(struct ast_sched_context *con,int id)
807 {
808         struct sched *s;
809         long secs = -1;
810         DEBUG(ast_debug(1, "ast_sched_when()\n"));
811
812         ast_mutex_lock(&con->lock);
813
814         s = sched_find(con, id);
815         if (s) {
816                 struct timeval now = ast_tvnow();
817                 secs = s->when.tv_sec - now.tv_sec;
818         }
819
820         ast_mutex_unlock(&con->lock);
821
822         return secs;
823 }