Merge "Revert "PJSIP_CONTACT: add missing argument documentation""
[asterisk/asterisk.git] / main / sched.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 1999 - 2010, Digium, Inc.
5  *
6  * Mark Spencer <markster@digium.com>
7  * Russell Bryant <russell@digium.com>
8  *
9  * See http://www.asterisk.org for more information about
10  * the Asterisk project. Please do not directly contact
11  * any of the maintainers of this project for assistance;
12  * the project provides a web site, mailing lists and IRC
13  * channels for your use.
14  *
15  * This program is free software, distributed under the terms of
16  * the GNU General Public License Version 2. See the LICENSE file
17  * at the top of the source tree.
18  */
19
20 /*! \file
21  *
22  * \brief Scheduler Routines (from cheops-NG)
23  *
24  * \author Mark Spencer <markster@digium.com>
25  */
26
27 /*** MODULEINFO
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 #ifdef DEBUG_SCHEDULER
34 #define DEBUG(a) a
35 #else
36 #define DEBUG(a)
37 #endif
38
39 #include <sys/time.h>
40
41 #include "asterisk/sched.h"
42 #include "asterisk/channel.h"
43 #include "asterisk/lock.h"
44 #include "asterisk/utils.h"
45 #include "asterisk/heap.h"
46 #include "asterisk/threadstorage.h"
47
48 /*!
49  * \brief Max num of schedule structs
50  *
51  * \note The max number of schedule structs to keep around
52  * for use.  Undefine to disable schedule structure
53  * caching. (Only disable this on very low memory
54  * machines)
55  */
56 #define SCHED_MAX_CACHE 128
57
58 AST_THREADSTORAGE(last_del_id);
59
60 /*!
61  * \brief Scheduler ID holder
62  *
63  * These form a queue on a scheduler context. When a new
64  * scheduled item is created, a sched_id is popped off the
65  * queue and its id is assigned to the new scheduled item.
66  * When the scheduled task is complete, the sched_id on that
67  * task is then pushed to the back of the queue to be re-used
68  * on some future scheduled item.
69  */
70 struct sched_id {
71         /*! Immutable ID number that is copied onto the scheduled task */
72         int id;
73         AST_LIST_ENTRY(sched_id) list;
74 };
75
76 struct sched {
77         AST_LIST_ENTRY(sched) list;
78         /*! The ID that has been popped off the scheduler context's queue */
79         struct sched_id *sched_id;
80         struct timeval when;          /*!< Absolute time event should take place */
81         /*!
82          * \brief Tie breaker in case the when is the same for multiple entries.
83          *
84          * \note The oldest expiring entry in the scheduler heap goes first.
85          * This is possible when multiple events are scheduled to expire at
86          * the same time by internal coding.
87          */
88         unsigned int tie_breaker;
89         int resched;                  /*!< When to reschedule */
90         int variable;                 /*!< Use return value from callback to reschedule */
91         const void *data;             /*!< Data */
92         ast_sched_cb callback;        /*!< Callback */
93         ssize_t __heap_index;
94         /*!
95          * Used to synchronize between thread running a task and thread
96          * attempting to delete a task
97          */
98         ast_cond_t cond;
99         /*! Indication that a running task was deleted. */
100         unsigned int deleted:1;
101 };
102
103 struct sched_thread {
104         pthread_t thread;
105         ast_cond_t cond;
106         unsigned int stop:1;
107 };
108
109 struct ast_sched_context {
110         ast_mutex_t lock;
111         unsigned int eventcnt;                  /*!< Number of events processed */
112         unsigned int highwater;                                 /*!< highest count so far */
113         /*! Next tie breaker in case events expire at the same time. */
114         unsigned int tie_breaker;
115         struct ast_heap *sched_heap;
116         struct sched_thread *sched_thread;
117         /*! The scheduled task that is currently executing */
118         struct sched *currently_executing;
119         /*! Valid while currently_executing is not NULL */
120         pthread_t executing_thread_id;
121
122 #ifdef SCHED_MAX_CACHE
123         AST_LIST_HEAD_NOLOCK(, sched) schedc;   /*!< Cache of unused schedule structures and how many */
124         unsigned int schedccnt;
125 #endif
126         /*! Queue of scheduler task IDs to assign */
127         AST_LIST_HEAD_NOLOCK(, sched_id) id_queue;
128         /*! The number of IDs in the id_queue */
129         int id_queue_size;
130 };
131
132 static void *sched_run(void *data)
133 {
134         struct ast_sched_context *con = data;
135
136         while (!con->sched_thread->stop) {
137                 int ms;
138                 struct timespec ts = {
139                         .tv_sec = 0,
140                 };
141
142                 ast_mutex_lock(&con->lock);
143
144                 if (con->sched_thread->stop) {
145                         ast_mutex_unlock(&con->lock);
146                         return NULL;
147                 }
148
149                 ms = ast_sched_wait(con);
150
151                 if (ms == -1) {
152                         ast_cond_wait(&con->sched_thread->cond, &con->lock);
153                 } else {
154                         struct timeval tv;
155                         tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
156                         ts.tv_sec = tv.tv_sec;
157                         ts.tv_nsec = tv.tv_usec * 1000;
158                         ast_cond_timedwait(&con->sched_thread->cond, &con->lock, &ts);
159                 }
160
161                 ast_mutex_unlock(&con->lock);
162
163                 if (con->sched_thread->stop) {
164                         return NULL;
165                 }
166
167                 ast_sched_runq(con);
168         }
169
170         return NULL;
171 }
172
173 static void sched_thread_destroy(struct ast_sched_context *con)
174 {
175         if (!con->sched_thread) {
176                 return;
177         }
178
179         if (con->sched_thread->thread != AST_PTHREADT_NULL) {
180                 ast_mutex_lock(&con->lock);
181                 con->sched_thread->stop = 1;
182                 ast_cond_signal(&con->sched_thread->cond);
183                 ast_mutex_unlock(&con->lock);
184                 pthread_join(con->sched_thread->thread, NULL);
185                 con->sched_thread->thread = AST_PTHREADT_NULL;
186         }
187
188         ast_cond_destroy(&con->sched_thread->cond);
189
190         ast_free(con->sched_thread);
191
192         con->sched_thread = NULL;
193 }
194
195 int ast_sched_start_thread(struct ast_sched_context *con)
196 {
197         struct sched_thread *st;
198
199         if (con->sched_thread) {
200                 ast_log(LOG_ERROR, "Thread already started on this scheduler context\n");
201                 return -1;
202         }
203
204         if (!(st = ast_calloc(1, sizeof(*st)))) {
205                 return -1;
206         }
207
208         ast_cond_init(&st->cond, NULL);
209
210         st->thread = AST_PTHREADT_NULL;
211
212         con->sched_thread = st;
213
214         if (ast_pthread_create_background(&st->thread, NULL, sched_run, con)) {
215                 ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
216                 sched_thread_destroy(con);
217                 return -1;
218         }
219
220         return 0;
221 }
222
223 static int sched_time_cmp(void *va, void *vb)
224 {
225         struct sched *a = va;
226         struct sched *b = vb;
227         int cmp;
228
229         cmp = ast_tvcmp(b->when, a->when);
230         if (!cmp) {
231                 cmp = b->tie_breaker - a->tie_breaker;
232         }
233         return cmp;
234 }
235
236 struct ast_sched_context *ast_sched_context_create(void)
237 {
238         struct ast_sched_context *tmp;
239
240         if (!(tmp = ast_calloc(1, sizeof(*tmp)))) {
241                 return NULL;
242         }
243
244         ast_mutex_init(&tmp->lock);
245         tmp->eventcnt = 1;
246
247         AST_LIST_HEAD_INIT_NOLOCK(&tmp->id_queue);
248
249         if (!(tmp->sched_heap = ast_heap_create(8, sched_time_cmp,
250                         offsetof(struct sched, __heap_index)))) {
251                 ast_sched_context_destroy(tmp);
252                 return NULL;
253         }
254
255         return tmp;
256 }
257
258 static void sched_free(struct sched *task)
259 {
260         /* task->sched_id will be NULL most of the time, but when the
261          * scheduler context shuts down, it will free all scheduled
262          * tasks, and in that case, the task->sched_id will be non-NULL
263          */
264         ast_free(task->sched_id);
265         ast_cond_destroy(&task->cond);
266         ast_free(task);
267 }
268
269 void ast_sched_context_destroy(struct ast_sched_context *con)
270 {
271         struct sched *s;
272         struct sched_id *sid;
273
274         sched_thread_destroy(con);
275         con->sched_thread = NULL;
276
277         ast_mutex_lock(&con->lock);
278
279 #ifdef SCHED_MAX_CACHE
280         while ((s = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
281                 sched_free(s);
282         }
283 #endif
284
285         if (con->sched_heap) {
286                 while ((s = ast_heap_pop(con->sched_heap))) {
287                         sched_free(s);
288                 }
289                 ast_heap_destroy(con->sched_heap);
290                 con->sched_heap = NULL;
291         }
292
293         while ((sid = AST_LIST_REMOVE_HEAD(&con->id_queue, list))) {
294                 ast_free(sid);
295         }
296
297         ast_mutex_unlock(&con->lock);
298         ast_mutex_destroy(&con->lock);
299
300         ast_free(con);
301 }
302
303 #define ID_QUEUE_INCREMENT 16
304
305 /*!
306  * \brief Add new scheduler IDs to the queue.
307  *
308  * \retval The number of IDs added to the queue
309  */
310 static int add_ids(struct ast_sched_context *con)
311 {
312         int new_size;
313         int original_size;
314         int i;
315
316         original_size = con->id_queue_size;
317         /* So we don't go overboard with the mallocs here, we'll just up
318          * the size of the list by a fixed amount each time instead of
319          * multiplying the size by any particular factor
320          */
321         new_size = original_size + ID_QUEUE_INCREMENT;
322         if (new_size < 0) {
323                 /* Overflow. Cap it at INT_MAX. */
324                 new_size = INT_MAX;
325         }
326         for (i = original_size; i < new_size; ++i) {
327                 struct sched_id *new_id;
328
329                 new_id = ast_calloc(1, sizeof(*new_id));
330                 if (!new_id) {
331                         break;
332                 }
333
334                 /*
335                  * According to the API doxygen a sched ID of 0 is valid.
336                  * Unfortunately, 0 was never returned historically and
337                  * several users incorrectly coded usage of the returned
338                  * sched ID assuming that 0 was invalid.
339                  */
340                 new_id->id = ++con->id_queue_size;
341
342                 AST_LIST_INSERT_TAIL(&con->id_queue, new_id, list);
343         }
344
345         return con->id_queue_size - original_size;
346 }
347
348 static int set_sched_id(struct ast_sched_context *con, struct sched *new_sched)
349 {
350         if (AST_LIST_EMPTY(&con->id_queue) && (add_ids(con) == 0)) {
351                 return -1;
352         }
353
354         new_sched->sched_id = AST_LIST_REMOVE_HEAD(&con->id_queue, list);
355         return 0;
356 }
357
358 static void sched_release(struct ast_sched_context *con, struct sched *tmp)
359 {
360         if (tmp->sched_id) {
361                 AST_LIST_INSERT_TAIL(&con->id_queue, tmp->sched_id, list);
362                 tmp->sched_id = NULL;
363         }
364
365         /*
366          * Add to the cache, or just free() if we
367          * already have too many cache entries
368          */
369 #ifdef SCHED_MAX_CACHE
370         if (con->schedccnt < SCHED_MAX_CACHE) {
371                 AST_LIST_INSERT_HEAD(&con->schedc, tmp, list);
372                 con->schedccnt++;
373         } else
374 #endif
375                 sched_free(tmp);
376 }
377
378 static struct sched *sched_alloc(struct ast_sched_context *con)
379 {
380         struct sched *tmp;
381
382         /*
383          * We keep a small cache of schedule entries
384          * to minimize the number of necessary malloc()'s
385          */
386 #ifdef SCHED_MAX_CACHE
387         if ((tmp = AST_LIST_REMOVE_HEAD(&con->schedc, list))) {
388                 con->schedccnt--;
389         } else
390 #endif
391         {
392                 tmp = ast_calloc(1, sizeof(*tmp));
393                 if (!tmp) {
394                         return NULL;
395                 }
396                 ast_cond_init(&tmp->cond, NULL);
397         }
398
399         if (set_sched_id(con, tmp)) {
400                 sched_release(con, tmp);
401                 return NULL;
402         }
403
404         return tmp;
405 }
406
407 void ast_sched_clean_by_callback(struct ast_sched_context *con, ast_sched_cb match, ast_sched_cb cleanup_cb)
408 {
409         int i = 1;
410         struct sched *current;
411
412         ast_mutex_lock(&con->lock);
413         while ((current = ast_heap_peek(con->sched_heap, i))) {
414                 if (current->callback != match) {
415                         i++;
416                         continue;
417                 }
418
419                 ast_heap_remove(con->sched_heap, current);
420
421                 cleanup_cb(current->data);
422                 sched_release(con, current);
423         }
424         ast_mutex_unlock(&con->lock);
425 }
426
427 /*! \brief
428  * Return the number of milliseconds
429  * until the next scheduled event
430  */
431 int ast_sched_wait(struct ast_sched_context *con)
432 {
433         int ms;
434         struct sched *s;
435
436         DEBUG(ast_debug(1, "ast_sched_wait()\n"));
437
438         ast_mutex_lock(&con->lock);
439         if ((s = ast_heap_peek(con->sched_heap, 1))) {
440                 ms = ast_tvdiff_ms(s->when, ast_tvnow());
441                 if (ms < 0) {
442                         ms = 0;
443                 }
444         } else {
445                 ms = -1;
446         }
447         ast_mutex_unlock(&con->lock);
448
449         return ms;
450 }
451
452
453 /*! \brief
454  * Take a sched structure and put it in the
455  * queue, such that the soonest event is
456  * first in the list.
457  */
458 static void schedule(struct ast_sched_context *con, struct sched *s)
459 {
460         size_t size;
461
462         size = ast_heap_size(con->sched_heap);
463
464         /* Record the largest the scheduler heap became for reporting purposes. */
465         if (con->highwater <= size) {
466                 con->highwater = size + 1;
467         }
468
469         /* Determine the tie breaker value for the new entry. */
470         if (size) {
471                 ++con->tie_breaker;
472         } else {
473                 /*
474                  * Restart the sequence for the first entry to make integer
475                  * roll over more unlikely.
476                  */
477                 con->tie_breaker = 0;
478         }
479         s->tie_breaker = con->tie_breaker;
480
481         ast_heap_push(con->sched_heap, s);
482 }
483
484 /*! \brief
485  * given the last event *tv and the offset in milliseconds 'when',
486  * computes the next value,
487  */
488 static void sched_settime(struct timeval *t, int when)
489 {
490         struct timeval now = ast_tvnow();
491
492         if (when < 0) {
493                 /*
494                  * A negative when value is likely a bug as it
495                  * represents a VERY large timeout time.
496                  */
497                 ast_log(LOG_WARNING,
498                         "Bug likely: Negative time interval %d (interpreted as %u ms) requested!\n",
499                         when, (unsigned int) when);
500                 ast_assert(0);
501         }
502
503         /*ast_debug(1, "TV -> %lu,%lu\n", tv->tv_sec, tv->tv_usec);*/
504         if (ast_tvzero(*t))     /* not supplied, default to now */
505                 *t = now;
506         *t = ast_tvadd(*t, ast_samp2tv(when, 1000));
507         if (ast_tvcmp(*t, now) < 0) {
508                 *t = now;
509         }
510 }
511
512 int ast_sched_replace_variable(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
513 {
514         /* 0 means the schedule item is new; do not delete */
515         if (old_id > 0) {
516                 AST_SCHED_DEL(con, old_id);
517         }
518         return ast_sched_add_variable(con, when, callback, data, variable);
519 }
520
521 /*! \brief
522  * Schedule callback(data) to happen when ms into the future
523  */
524 int ast_sched_add_variable(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data, int variable)
525 {
526         struct sched *tmp;
527         int res = -1;
528
529         DEBUG(ast_debug(1, "ast_sched_add()\n"));
530
531         ast_mutex_lock(&con->lock);
532         if ((tmp = sched_alloc(con))) {
533                 con->eventcnt++;
534                 tmp->callback = callback;
535                 tmp->data = data;
536                 tmp->resched = when;
537                 tmp->variable = variable;
538                 tmp->when = ast_tv(0, 0);
539                 tmp->deleted = 0;
540
541                 sched_settime(&tmp->when, when);
542                 schedule(con, tmp);
543                 res = tmp->sched_id->id;
544         }
545 #ifdef DUMP_SCHEDULER
546         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
547         ast_sched_dump(con);
548 #endif
549         if (con->sched_thread) {
550                 ast_cond_signal(&con->sched_thread->cond);
551         }
552         ast_mutex_unlock(&con->lock);
553
554         return res;
555 }
556
557 int ast_sched_replace(int old_id, struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
558 {
559         if (old_id > -1) {
560                 AST_SCHED_DEL(con, old_id);
561         }
562         return ast_sched_add(con, when, callback, data);
563 }
564
565 int ast_sched_add(struct ast_sched_context *con, int when, ast_sched_cb callback, const void *data)
566 {
567         return ast_sched_add_variable(con, when, callback, data, 0);
568 }
569
570 static struct sched *sched_find(struct ast_sched_context *con, int id)
571 {
572         int x;
573         size_t heap_size;
574
575         heap_size = ast_heap_size(con->sched_heap);
576         for (x = 1; x <= heap_size; x++) {
577                 struct sched *cur = ast_heap_peek(con->sched_heap, x);
578
579                 if (cur->sched_id->id == id) {
580                         return cur;
581                 }
582         }
583
584         return NULL;
585 }
586
587 const void *ast_sched_find_data(struct ast_sched_context *con, int id)
588 {
589         struct sched *s;
590         const void *data = NULL;
591
592         ast_mutex_lock(&con->lock);
593
594         s = sched_find(con, id);
595         if (s) {
596                 data = s->data;
597         }
598
599         ast_mutex_unlock(&con->lock);
600
601         return data;
602 }
603
604 /*! \brief
605  * Delete the schedule entry with number
606  * "id".  It's nearly impossible that there
607  * would be two or more in the list with that
608  * id.
609  */
610 int ast_sched_del(struct ast_sched_context *con, int id)
611 {
612         struct sched *s = NULL;
613         int *last_id = ast_threadstorage_get(&last_del_id, sizeof(int));
614
615         DEBUG(ast_debug(1, "ast_sched_del(%d)\n", id));
616
617         if (id < 0) {
618                 return 0;
619         }
620
621         ast_mutex_lock(&con->lock);
622
623         s = sched_find(con, id);
624         if (s) {
625                 if (!ast_heap_remove(con->sched_heap, s)) {
626                         ast_log(LOG_WARNING,"sched entry %d not in the sched heap?\n", s->sched_id->id);
627                 }
628                 sched_release(con, s);
629         } else if (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
630                 if (con->executing_thread_id == pthread_self()) {
631                         /* The scheduled callback is trying to delete itself.
632                          * Not good as that is a deadlock. */
633                         ast_log(LOG_ERROR,
634                                 "BUG! Trying to delete sched %d from within the callback %p.  "
635                                 "Ignoring so we don't deadlock\n",
636                                 id, con->currently_executing->callback);
637                         ast_log_backtrace();
638                         /* We'll return -1 below because s is NULL.
639                          * The caller will rightly assume that the unscheduling failed. */
640                 } else {
641                         s = con->currently_executing;
642                         s->deleted = 1;
643                         /* Wait for executing task to complete so that the caller of
644                          * ast_sched_del() does not free memory out from under the task. */
645                         while (con->currently_executing && (id == con->currently_executing->sched_id->id)) {
646                                 ast_cond_wait(&s->cond, &con->lock);
647                         }
648                         /* Do not sched_release() here because ast_sched_runq() will do it */
649                 }
650         }
651
652 #ifdef DUMP_SCHEDULER
653         /* Dump contents of the context while we have the lock so nothing gets screwed up by accident. */
654         ast_sched_dump(con);
655 #endif
656         if (con->sched_thread) {
657                 ast_cond_signal(&con->sched_thread->cond);
658         }
659         ast_mutex_unlock(&con->lock);
660
661         if (!s && *last_id != id) {
662                 ast_debug(1, "Attempted to delete nonexistent schedule entry %d!\n", id);
663                 /* Removing nonexistent schedule entry shouldn't trigger assert (it was enabled in DEV_MODE);
664                  * because in many places entries is deleted without having valid id. */
665                 *last_id = id;
666                 return -1;
667         } else if (!s) {
668                 return -1;
669         }
670
671         return 0;
672 }
673
674 void ast_sched_report(struct ast_sched_context *con, struct ast_str **buf, struct ast_cb_names *cbnames)
675 {
676         int i, x;
677         struct sched *cur;
678         int countlist[cbnames->numassocs + 1];
679         size_t heap_size;
680
681         memset(countlist, 0, sizeof(countlist));
682         ast_str_set(buf, 0, " Highwater = %u\n schedcnt = %zu\n", con->highwater, ast_heap_size(con->sched_heap));
683
684         ast_mutex_lock(&con->lock);
685
686         heap_size = ast_heap_size(con->sched_heap);
687         for (x = 1; x <= heap_size; x++) {
688                 cur = ast_heap_peek(con->sched_heap, x);
689                 /* match the callback to the cblist */
690                 for (i = 0; i < cbnames->numassocs; i++) {
691                         if (cur->callback == cbnames->cblist[i]) {
692                                 break;
693                         }
694                 }
695                 if (i < cbnames->numassocs) {
696                         countlist[i]++;
697                 } else {
698                         countlist[cbnames->numassocs]++;
699                 }
700         }
701
702         ast_mutex_unlock(&con->lock);
703
704         for (i = 0; i < cbnames->numassocs; i++) {
705                 ast_str_append(buf, 0, "    %s : %d\n", cbnames->list[i], countlist[i]);
706         }
707
708         ast_str_append(buf, 0, "   <unknown> : %d\n", countlist[cbnames->numassocs]);
709 }
710
711 /*! \brief Dump the contents of the scheduler to LOG_DEBUG */
712 void ast_sched_dump(struct ast_sched_context *con)
713 {
714         struct sched *q;
715         struct timeval when;
716         int x;
717         size_t heap_size;
718
719         if (!DEBUG_ATLEAST(1)) {
720                 return;
721         }
722
723         when = ast_tvnow();
724 #ifdef SCHED_MAX_CACHE
725         ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u Cache, %u high-water)\n",
726                 ast_heap_size(con->sched_heap), con->eventcnt - 1, con->schedccnt, con->highwater);
727 #else
728         ast_log(LOG_DEBUG, "Asterisk Schedule Dump (%zu in Q, %u Total, %u high-water)\n",
729                 ast_heap_size(con->sched_heap), con->eventcnt - 1, con->highwater);
730 #endif
731
732         ast_log(LOG_DEBUG, "=============================================================\n");
733         ast_log(LOG_DEBUG, "|ID    Callback          Data              Time  (sec:ms)   |\n");
734         ast_log(LOG_DEBUG, "+-----+-----------------+-----------------+-----------------+\n");
735         ast_mutex_lock(&con->lock);
736         heap_size = ast_heap_size(con->sched_heap);
737         for (x = 1; x <= heap_size; x++) {
738                 struct timeval delta;
739                 q = ast_heap_peek(con->sched_heap, x);
740                 delta = ast_tvsub(q->when, when);
741                 ast_log(LOG_DEBUG, "|%.4d | %-15p | %-15p | %.6ld : %.6ld |\n",
742                         q->sched_id->id,
743                         q->callback,
744                         q->data,
745                         (long)delta.tv_sec,
746                         (long int)delta.tv_usec);
747         }
748         ast_mutex_unlock(&con->lock);
749         ast_log(LOG_DEBUG, "=============================================================\n");
750 }
751
752 /*! \brief
753  * Launch all events which need to be run at this time.
754  */
755 int ast_sched_runq(struct ast_sched_context *con)
756 {
757         struct sched *current;
758         struct timeval when;
759         int numevents;
760         int res;
761
762         DEBUG(ast_debug(1, "ast_sched_runq()\n"));
763
764         ast_mutex_lock(&con->lock);
765
766         when = ast_tvadd(ast_tvnow(), ast_tv(0, 1000));
767         for (numevents = 0; (current = ast_heap_peek(con->sched_heap, 1)); numevents++) {
768                 /* schedule all events which are going to expire within 1ms.
769                  * We only care about millisecond accuracy anyway, so this will
770                  * help us get more than one event at one time if they are very
771                  * close together.
772                  */
773                 if (ast_tvcmp(current->when, when) != -1) {
774                         break;
775                 }
776
777                 current = ast_heap_pop(con->sched_heap);
778
779                 /*
780                  * At this point, the schedule queue is still intact.  We
781                  * have removed the first event and the rest is still there,
782                  * so it's permissible for the callback to add new events, but
783                  * trying to delete itself won't work because it isn't in
784                  * the schedule queue.  If that's what it wants to do, it
785                  * should return 0.
786                  */
787
788                 con->currently_executing = current;
789                 con->executing_thread_id = pthread_self();
790                 ast_mutex_unlock(&con->lock);
791                 res = current->callback(current->data);
792                 ast_mutex_lock(&con->lock);
793                 con->currently_executing = NULL;
794                 ast_cond_signal(&current->cond);
795
796                 if (res && !current->deleted) {
797                         /*
798                          * If they return non-zero, we should schedule them to be
799                          * run again.
800                          */
801                         sched_settime(&current->when, current->variable ? res : current->resched);
802                         schedule(con, current);
803                 } else {
804                         /* No longer needed, so release it */
805                         sched_release(con, current);
806                 }
807         }
808
809         ast_mutex_unlock(&con->lock);
810
811         return numevents;
812 }
813
814 long ast_sched_when(struct ast_sched_context *con,int id)
815 {
816         struct sched *s;
817         long secs = -1;
818         DEBUG(ast_debug(1, "ast_sched_when()\n"));
819
820         ast_mutex_lock(&con->lock);
821
822         s = sched_find(con, id);
823         if (s) {
824                 struct timeval now = ast_tvnow();
825                 secs = s->when.tv_sec - now.tv_sec;
826         }
827
828         ast_mutex_unlock(&con->lock);
829
830         return secs;
831 }