Add a common implementation of a scheduler context with a dedicated thread.
authorRussell Bryant <russell@russellbryant.com>
Fri, 6 Feb 2009 10:55:35 +0000 (10:55 +0000)
committerRussell Bryant <russell@russellbryant.com>
Fri, 6 Feb 2009 10:55:35 +0000 (10:55 +0000)
This commit expands the Asterisk scheduler API to include a common implementation
of a scheduler context being processed by a dedicated thread.  chan_iax2 has been
updated to use this new code.  Also, as a result, this resolves some race
conditions related to the previous chan_iax2 scheduler handling.

Related to rev 171452 which resolved the same issues in 1.4.

Code from team/russell/sched_thread2

Review: http://reviewboard.digium.com/r/129/

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@173858 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_iax2.c
include/asterisk/sched.h
main/sched.c

index c7ff9fd..f263dcc 100644 (file)
@@ -305,7 +305,7 @@ int (*iax2_regfunk)(const char *username, int onoff) = NULL;
        } while(0)
 
 static struct io_context *io;
-static struct sched_context *sched;
+static struct ast_sched_thread *sched;
 
 static int iax2_capability = IAX_CAPABILITY_FULLBANDWIDTH;
 
@@ -332,9 +332,6 @@ static int iax2_encryption = 0;
 static struct ast_flags globalflags = { 0 };
 
 static pthread_t netthreadid = AST_PTHREADT_NULL;
-static pthread_t schedthreadid = AST_PTHREADT_NULL;
-AST_MUTEX_DEFINE_STATIC(sched_lock);
-static ast_cond_t sched_cond;
 
 enum iax2_state {
        IAX_STATE_STARTED =             (1 << 0),
@@ -1261,22 +1258,18 @@ static int __schedule_action(void (*func)(const void *data), const void *data, c
 #define schedule_action(func, data) __schedule_action(func, data, __PRETTY_FUNCTION__)
 #endif
 
-static int iax2_sched_replace(int id, struct sched_context *con, int when, ast_sched_cb callback, const void *data)
+static int iax2_sched_replace(int id, struct ast_sched_thread *st, int when, 
+               ast_sched_cb callback, const void *data)
 {
-       AST_SCHED_REPLACE(id, con, when, callback, data);
-       signal_condition(&sched_lock, &sched_cond);
+       ast_sched_thread_del(st, id);
 
-       return id;
+       return ast_sched_thread_add(st, when, callback, data);
 }
 
-static int iax2_sched_add(struct sched_context *con, int when, ast_sched_cb callback, const void *data)
+static int iax2_sched_add(struct ast_sched_thread *st, int when, 
+               ast_sched_cb callback, const void *data)
 {
-       int res;
-
-       res = ast_sched_add(con, when, callback, data);
-       signal_condition(&sched_lock, &sched_cond);
-
-       return res;
+       return ast_sched_thread_add(st, when, callback, data);
 }
 
 static int send_ping(const void *data);
@@ -1525,18 +1518,18 @@ static void iax2_destroy_helper(struct chan_iax2_pvt *pvt)
                ast_clear_flag(pvt, IAX_MAXAUTHREQ);
        }
        /* No more pings or lagrq's */
-       AST_SCHED_DEL_SPINLOCK(sched, pvt->pingid, &iaxsl[pvt->callno]);
-       AST_SCHED_DEL_SPINLOCK(sched, pvt->lagid, &iaxsl[pvt->callno]);
-       AST_SCHED_DEL(sched, pvt->autoid);
-       AST_SCHED_DEL(sched, pvt->authid);
-       AST_SCHED_DEL(sched, pvt->initid);
-       AST_SCHED_DEL(sched, pvt->jbid);
-       AST_SCHED_DEL(sched, pvt->keyrotateid);
+       AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->pingid, &iaxsl[pvt->callno]);
+       AST_SCHED_DEL_SPINLOCK(ast_sched_thread_get_context(sched), pvt->lagid, &iaxsl[pvt->callno]);
+       ast_sched_thread_del(sched, pvt->autoid);
+       ast_sched_thread_del(sched, pvt->authid);
+       ast_sched_thread_del(sched, pvt->initid);
+       ast_sched_thread_del(sched, pvt->jbid);
+       ast_sched_thread_del(sched, pvt->keyrotateid);
 }
 
 static void iax2_frame_free(struct iax_frame *fr)
 {
-       AST_SCHED_DEL(sched, fr->retrans);
+       ast_sched_thread_del(sched, fr->retrans);
        iax_frame_free(fr);
 }
 
@@ -1725,8 +1718,8 @@ static int make_trunk(unsigned short callno, int locked)
                         * \note We delete these before switching the slot, because if
                         * they fire in the meantime, they will generate a warning.
                         */
-                       AST_SCHED_DEL(sched, iaxs[callno]->pingid);
-                       AST_SCHED_DEL(sched, iaxs[callno]->lagid);
+                       ast_sched_thread_del(sched, iaxs[callno]->pingid);
+                       ast_sched_thread_del(sched, iaxs[callno]->lagid);
                        iaxs[x] = iaxs[callno];
                        iaxs[x]->callno = x;
                        iaxs[callno] = NULL;
@@ -3214,7 +3207,7 @@ static int schedule_delivery(struct iax_frame *fr, int updatehistory, int fromtr
 
                jb_reset(iaxs[fr->callno]->jb);
 
-               AST_SCHED_DEL(sched, iaxs[fr->callno]->jbid);
+               ast_sched_thread_del(sched, iaxs[fr->callno]->jbid);
 
                /* deliver this frame now */
                if (tsout)
@@ -3254,7 +3247,7 @@ static int iax2_transmit(struct iax_frame *fr)
        /* Wake up the network and scheduler thread */
        if (netthreadid != AST_PTHREADT_NULL)
                pthread_kill(netthreadid, SIGURG);
-       signal_condition(&sched_lock, &sched_cond);
+       ast_sched_thread_poke(sched);
        return 0;
 }
 
@@ -3390,7 +3383,7 @@ static struct iax2_peer *realtime_peer(const char *peername, struct sockaddr_in
                ast_copy_flags(peer, &globalflags, IAX_RTAUTOCLEAR|IAX_RTCACHEFRIENDS);
                if (ast_test_flag(peer, IAX_RTAUTOCLEAR)) {
                        if (peer->expire > -1) {
-                               if (!ast_sched_del(sched, peer->expire)) {
+                               if (!ast_sched_thread_del(sched, peer->expire)) {
                                        peer->expire = -1;
                                        peer_unref(peer);
                                }
@@ -3948,7 +3941,7 @@ static int iax2_hangup(struct ast_channel *c)
                        ast_debug(1, "Really destroying %s now...\n", c->name);
                        iax2_destroy(callno);
                } else if (iaxs[callno]) {
-                       if (ast_sched_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) {
+                       if (ast_sched_thread_add(sched, 10000, scheduled_destroy, CALLNO_TO_PTR(callno)) < 0) {
                                ast_log(LOG_ERROR, "Unable to schedule iax2 callno %d destruction?!!  Destroying immediately.\n", callno);
                                iax2_destroy(callno);
                        }
@@ -4051,7 +4044,7 @@ static int iax2_key_rotate(const void *vpvt)
        ast_mutex_lock(&iaxsl[pvt->callno]);
 
        pvt->keyrotateid = 
-               ast_sched_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt);
+               ast_sched_thread_add(sched, 120000 + (ast_random() % 180001), iax2_key_rotate, vpvt);
 
        snprintf(key, sizeof(key), "%lX", ast_random());
 
@@ -7072,14 +7065,14 @@ static void prune_peers(void);
 static void unlink_peer(struct iax2_peer *peer)
 {
        if (peer->expire > -1) {
-               if (!ast_sched_del(sched, peer->expire)) {
+               if (!ast_sched_thread_del(sched, peer->expire)) {
                        peer->expire = -1;
                        peer_unref(peer);
                }
        }
 
        if (peer->pokeexpire > -1) {
-               if (!ast_sched_del(sched, peer->pokeexpire)) {
+               if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
                        peer->pokeexpire = -1;
                        peer_unref(peer);
                }
@@ -7153,7 +7146,7 @@ static void reg_source_db(struct iax2_peer *p)
                                        p->addr.sin_addr = in;
                                        p->addr.sin_port = htons(atoi(c));
                                        if (p->expire > -1) {
-                                               if (!ast_sched_del(sched, p->expire)) {
+                                               if (!ast_sched_thread_del(sched, p->expire)) {
                                                        p->expire = -1;
                                                        peer_unref(p);
                                                }
@@ -7249,7 +7242,7 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i
        p->sockfd = fd;
        /* Setup the expiry */
        if (p->expire > -1) {
-               if (!ast_sched_del(sched, p->expire)) {
+               if (!ast_sched_thread_del(sched, p->expire)) {
                        p->expire = -1;
                        peer_unref(p);
                }
@@ -8733,7 +8726,7 @@ retryowner:
                        }
                }
                if (f.frametype == AST_FRAME_IAX) {
-                       AST_SCHED_DEL(sched, iaxs[fr->callno]->initid);
+                       ast_sched_thread_del(sched, iaxs[fr->callno]->initid);
                        /* Handle the IAX pseudo frame itself */
                        if (iaxdebug)
                                ast_debug(1, "IAX subclass %d received\n", f.subclass);
@@ -9215,7 +9208,7 @@ retryowner2:
 
                                        /* Remove scheduled iax2_poke_noanswer */
                                        if (peer->pokeexpire > -1) {
-                                               if (!ast_sched_del(sched, peer->pokeexpire)) {
+                                               if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
                                                        peer_unref(peer);
                                                        peer->pokeexpire = -1;
                                                }
@@ -10268,7 +10261,7 @@ static int iax2_poke_peer(struct iax2_peer *peer, int heldcall)
        iaxs[peer->callno]->peerpoke = peer;
 
        if (peer->pokeexpire > -1) {
-               if (!ast_sched_del(sched, peer->pokeexpire)) {
+               if (!ast_sched_thread_del(sched, peer->pokeexpire)) {
                        peer->pokeexpire = -1;
                        peer_unref(peer);
                }
@@ -10383,34 +10376,6 @@ static struct ast_channel *iax2_request(const char *type, int format, void *data
        return c;
 }
 
-static void *sched_thread(void *ignore)
-{
-       int count;
-       int res;
-       struct timeval wait;
-       struct timespec ts;
-
-       for (;;) {
-               pthread_testcancel();
-               ast_mutex_lock(&sched_lock);
-               res = ast_sched_wait(sched);
-               if ((res > 1000) || (res < 0))
-                       res = 1000;
-               wait = ast_tvadd(ast_tvnow(), ast_samp2tv(res, 1000));
-               ts.tv_sec = wait.tv_sec;
-               ts.tv_nsec = wait.tv_usec * 1000;
-               ast_cond_timedwait(&sched_cond, &sched_lock, &ts);
-               ast_mutex_unlock(&sched_lock);
-               pthread_testcancel();
-
-               count = ast_sched_runq(sched);
-               if (count >= 20)
-                       ast_debug(1, "chan_iax2: ast_sched_runq ran %d scheduled tasks all at once\n", count);
-       }
-
-       return NULL;
-}
-
 static void *network_thread(void *ignore)
 {
        /* Our job is simple: Send queued messages, retrying if necessary.  Read frames 
@@ -10498,7 +10463,6 @@ static int start_network_thread(void)
                        AST_LIST_UNLOCK(&idle_list);
                }
        }
-       ast_pthread_create_background(&schedthreadid, NULL, sched_thread, NULL);
        ast_pthread_create_background(&netthreadid, NULL, network_thread, NULL);
        ast_verb(2, "%d helper threads started\n", threadcount);
        return 0;
@@ -10771,7 +10735,7 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
                                        }
                                } else {
                                        /* Non-dynamic.  Make sure we become that way if we're not */
-                                       AST_SCHED_DEL(sched, peer->expire);
+                                       ast_sched_thread_del(sched, peer->expire);
                                        ast_clear_flag(peer, IAX_DYNAMIC);
                                        if (ast_dnsmgr_lookup(v->value, &peer->addr, &peer->dnsmgr, srvlookup ? "_iax._udp" : NULL))
                                                return peer_unref(peer);
@@ -11144,7 +11108,7 @@ static void delete_users(void)
 
        AST_LIST_LOCK(&registrations);
        while ((reg = AST_LIST_REMOVE_HEAD(&registrations, entry))) {
-               AST_SCHED_DEL(sched, reg->expire);
+               ast_sched_thread_del(sched, reg->expire);
                if (reg->callno) {
                        int callno = reg->callno;
                        ast_mutex_lock(&iaxsl[callno]);
@@ -12327,21 +12291,13 @@ static int __unload_module(void)
        /* Cancel the network thread, close the net socket */
        if (netthreadid != AST_PTHREADT_NULL) {
                AST_LIST_LOCK(&frame_queue);
-               ast_mutex_lock(&sched_lock);
                pthread_cancel(netthreadid);
-               ast_cond_signal(&sched_cond);
-               ast_mutex_unlock(&sched_lock);  /* Release the schedule lock resource */
                AST_LIST_UNLOCK(&frame_queue);
                pthread_join(netthreadid, NULL);
        }
-       if (schedthreadid != AST_PTHREADT_NULL) {
-               ast_mutex_lock(&sched_lock);
-               pthread_cancel(schedthreadid);
-               ast_cond_signal(&sched_cond);
-               ast_mutex_unlock(&sched_lock);
-               pthread_join(schedthreadid, NULL);
-       }
 
+       sched = ast_sched_thread_destroy(sched);
+       
        /* Call for all threads to halt */
        AST_LIST_LOCK(&idle_list);
        while ((thread = AST_LIST_REMOVE_HEAD(&idle_list, list)))
@@ -12379,7 +12335,6 @@ static int __unload_module(void)
        ast_channel_unregister(&iax2_tech);
        delete_users();
        iax_provision_unload();
-       sched_context_destroy(sched);
        reload_firmware(1);
 
        for (x = 0; x < ARRAY_LEN(iaxsl); x++) {
@@ -12494,23 +12449,21 @@ static int load_module(void)
                ast_mutex_init(&iaxsl[x]);
        }
 
-       ast_cond_init(&sched_cond, NULL);
-
-       if (!(sched = sched_context_create())) {
-               ast_log(LOG_ERROR, "Failed to create scheduler context\n");
+       if (!(sched = ast_sched_thread_create())) {
+               ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
                return AST_MODULE_LOAD_FAILURE;
        }
 
        if (!(io = io_context_create())) {
                ast_log(LOG_ERROR, "Failed to create I/O context\n");
-               sched_context_destroy(sched);
+               sched = ast_sched_thread_destroy(sched);
                return AST_MODULE_LOAD_FAILURE;
        }
 
        if (!(netsock = ast_netsock_list_alloc())) {
                ast_log(LOG_ERROR, "Failed to create netsock list\n");
                io_context_destroy(io);
-               sched_context_destroy(sched);
+               sched = ast_sched_thread_destroy(sched);
                return AST_MODULE_LOAD_FAILURE;
        }
        ast_netsock_init(netsock);
@@ -12519,7 +12472,7 @@ static int load_module(void)
        if (!outsock) {
                ast_log(LOG_ERROR, "Could not allocate outsock list.\n");
                io_context_destroy(io);
-               sched_context_destroy(sched);
+               sched = ast_sched_thread_destroy(sched);
                return AST_MODULE_LOAD_FAILURE;
        }
        ast_netsock_init(outsock);
index ab328af..f9e1ca5 100644 (file)
@@ -49,15 +49,17 @@ extern "C" {
  * and not a copy of the value of the id.
  */
 #define AST_SCHED_DEL(sched, id) \
-       do { \
+       ({ \
                int _count = 0; \
-               while (id > -1 && ast_sched_del(sched, id) && ++_count < 10) { \
+               int _sched_res = -1; \
+               while (id > -1 && (_sched_res = ast_sched_del(sched, id)) && ++_count < 10) \
                        usleep(1); \
+               if (_count == 10 && option_debug > 2) { \
+                       ast_log(LOG_DEBUG, "Unable to cancel schedule ID %d.\n", id); \
                } \
-               if (_count == 10) \
-                       ast_debug(3, "Unable to cancel schedule ID %d.\n", id); \
                id = -1; \
-       } while (0);
+               (_sched_res); \
+       })
 
 #define AST_SCHED_DEL_UNREF(sched, id, refcall)                        \
        do { \
@@ -282,6 +284,114 @@ long ast_sched_when(struct sched_context *con,int id);
        } \
 } while(0)
 
+/*!
+ * \brief An opaque type representing a scheduler thread
+ *
+ * The purpose of the ast_sched_thread API is to provide a common implementation
+ * of the case where a module wants to have a dedicated thread for handling the
+ * scheduler.
+ */
+struct ast_sched_thread;
+
+/*!
+ * \brief Create a scheduler with a dedicated thread
+ *
+ * This function should be used to allocate a scheduler context and a dedicated
+ * thread for processing scheduler entries.  The thread is started immediately.
+ *
+ * \retval NULL error
+ * \retval non-NULL a handle to the scheduler and its dedicated thread.
+ */
+struct ast_sched_thread *ast_sched_thread_create(void);
+
+/*!
+ * \brief Destroy a scheduler and its thread
+ *
+ * This function is used to destroy a scheduler context and the dedicated thread
+ * that was created for handling scheduler entries.  Any entries in the scheduler
+ * that have not yet been processed will be thrown away.  Once this function is
+ * called, the handle must not be used again.
+ *
+ * \param st the handle to the scheduler and thread
+ *
+ * \return NULL for convenience
+ */
+struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st);
+
+/*!
+ * \brief Add a scheduler entry
+ *
+ * \param st the handle to the scheduler and thread
+ * \param when the number of ms in the future to run the task.  A value <= 0
+ *        is treated as "run now".
+ * \param cb the function to call when the scheduled time arrives
+ * \param data the parameter to pass to the scheduler callback
+ *
+ * \retval 0 success
+ * \retval non-zero failure
+ */
+int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+               const void *data);
+
+/*!
+ * \brief Add a variable reschedule time scheduler entry
+ *
+ * \param st the handle to the scheduler and thread
+ * \param when the number of ms in the future to run the task.  A value <= 0
+ *        is treated as "run now".
+ * \param cb the function to call when the scheduled time arrives
+ * \param data the parameter to pass to the scheduler callback
+ * \param variable If this value is non-zero, then the scheduler will use the return
+ *        value of the scheduler as the amount of time in the future to run the
+ *        task again.  Normally, a return value of 0 means do not re-schedule, and
+ *        non-zero means re-schedule using the time provided when the scheduler
+ *        entry was first created.
+ *
+ * \retval 0 success
+ * \retval non-zero failure
+ */
+int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+               const void *data, int variable);
+
+/*!
+ * \brief Get the scheduler context for a given ast_sched_thread
+ *
+ * This function should be used only when direct access to the scheduler context
+ * is required.  Its use is discouraged unless necessary.  The cases where 
+ * this is currently required is when you want to take advantage of one of the 
+ * AST_SCHED macros.
+ *
+ * \param st the handle to the scheduler and thread
+ *
+ * \return the sched_context associated with an ast_sched_thread
+ */
+struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st);
+
+/*!
+ * \brief Delete a scheduler entry
+ *
+ * This uses the AST_SCHED_DEL macro internally.
+ *
+ * \param st the handle to the scheduler and thread
+ * \param id scheduler entry id to delete
+ *
+ * \retval 0 success
+ * \retval non-zero failure
+ */
+#define ast_sched_thread_del(st, id) ({ \
+       struct sched_context *__tmp_context = ast_sched_thread_get_context(st); \
+       AST_SCHED_DEL(__tmp_context, id); \
+})
+
+/*!
+ * \brief Force re-processing of the scheduler context
+ *
+ * \param st the handle to the scheduler and thread
+ *
+ * \return nothing
+ */
+void ast_sched_thread_poke(struct ast_sched_thread *st);
+
 #if defined(__cplusplus) || defined(c_plusplus)
 }
 #endif
index d62ca11..8b69814 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2008, Digium, Inc.
  *
  * Mark Spencer <markster@digium.com>
  *
@@ -70,6 +70,148 @@ struct sched_context {
 #endif
 };
 
+struct ast_sched_thread {
+       pthread_t thread;
+       ast_mutex_t lock;
+       ast_cond_t cond;
+       struct sched_context *context;
+       unsigned int stop:1;
+};
+
+static void *sched_run(void *data)
+{
+       struct ast_sched_thread *st = data;
+
+       while (!st->stop) {
+               int ms;
+               struct timespec ts = {
+                       .tv_sec = 0,    
+               };
+
+               ast_mutex_lock(&st->lock);
+
+               if (st->stop) {
+                       ast_mutex_unlock(&st->lock);
+                       return NULL;
+               }
+
+               ms = ast_sched_wait(st->context);
+
+               if (ms == -1) {
+                       ast_cond_wait(&st->cond, &st->lock);
+               } else {        
+                       struct timeval tv;
+                       tv = ast_tvadd(ast_tvnow(), ast_samp2tv(ms, 1000));
+                       ts.tv_sec = tv.tv_sec;
+                       ts.tv_nsec = tv.tv_usec * 1000;
+                       ast_cond_timedwait(&st->cond, &st->lock, &ts);
+               }
+
+               ast_mutex_unlock(&st->lock);
+
+               if (st->stop) {
+                       return NULL;
+               }
+
+               ast_sched_runq(st->context);
+       }
+
+       return NULL;
+}
+
+void ast_sched_thread_poke(struct ast_sched_thread *st)
+{
+       ast_mutex_lock(&st->lock);
+       ast_cond_signal(&st->cond);
+       ast_mutex_unlock(&st->lock);
+}
+
+struct sched_context *ast_sched_thread_get_context(struct ast_sched_thread *st)
+{
+       return st->context;
+}
+
+struct ast_sched_thread *ast_sched_thread_destroy(struct ast_sched_thread *st)
+{
+       if (st->thread != AST_PTHREADT_NULL) {
+               ast_mutex_lock(&st->lock);
+               st->stop = 1;
+               ast_cond_signal(&st->cond);
+               ast_mutex_unlock(&st->lock);
+               pthread_join(st->thread, NULL);
+               st->thread = AST_PTHREADT_NULL;
+       }
+
+       ast_mutex_destroy(&st->lock);
+       ast_cond_destroy(&st->cond);
+
+       if (st->context) {
+               sched_context_destroy(st->context);
+               st->context = NULL;
+       }
+
+       ast_free(st);
+
+       return NULL;
+}
+
+struct ast_sched_thread *ast_sched_thread_create(void)
+{
+       struct ast_sched_thread *st;
+
+       if (!(st = ast_calloc(1, sizeof(*st)))) {
+               return NULL;
+       }
+
+       ast_mutex_init(&st->lock);
+       ast_cond_init(&st->cond, NULL);
+
+       st->thread = AST_PTHREADT_NULL;
+
+       if (!(st->context = sched_context_create())) {
+               ast_log(LOG_ERROR, "Failed to create scheduler\n");
+               ast_sched_thread_destroy(st);
+               return NULL;
+       }
+       
+       if (ast_pthread_create_background(&st->thread, NULL, sched_run, st)) {
+               ast_log(LOG_ERROR, "Failed to create scheduler thread\n");
+               ast_sched_thread_destroy(st);
+               return NULL;
+       }
+
+       return st;
+}
+
+int ast_sched_thread_add_variable(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+               const void *data, int variable)
+{
+       int res;
+
+       ast_mutex_lock(&st->lock);
+       res = ast_sched_add_variable(st->context, when, cb, data, variable);
+       if (res != -1) {
+               ast_cond_signal(&st->cond);
+       }
+       ast_mutex_unlock(&st->lock);
+
+       return res;
+}
+
+int ast_sched_thread_add(struct ast_sched_thread *st, int when, ast_sched_cb cb,
+               const void *data)
+{
+       int res;
+
+       ast_mutex_lock(&st->lock);
+       res = ast_sched_add(st->context, when, cb, data);
+       if (res != -1) {
+               ast_cond_signal(&st->cond);
+       }
+       ast_mutex_unlock(&st->lock);
+
+       return res;
+}
 
 /* hash routines for sched */