struct ast_taskprocessor {
/*! \brief Friendly name of the taskprocessor */
const char *name;
- /*! \brief Thread poll condition */
- ast_cond_t poll_cond;
- /*! \brief Taskprocessor thread */
- pthread_t poll_thread;
- /*! \brief Taskprocessor lock */
- ast_mutex_t taskprocessor_lock;
- /*! \brief Taskprocesor thread run flag */
- unsigned char poll_thread_run;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
/*! \brief Taskprocessor current queue size */
return;
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
- /* kill it */
- ast_mutex_destroy(&t->taskprocessor_lock);
/* free it */
if (t->stats) {
ast_free(t->stats);
ast_log(LOG_ERROR, "missing taskprocessor\n");
return NULL;
}
- ast_mutex_lock(&tps->taskprocessor_lock);
+ ao2_lock(tps);
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
}
- ast_mutex_unlock(&tps->taskprocessor_lock);
+ ao2_unlock(tps);
return task;
}
ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
return -1;
}
- ast_mutex_lock(&tps->taskprocessor_lock);
+ ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
- ast_mutex_unlock(&tps->taskprocessor_lock);
+ ao2_unlock(tps);
tps->listener->callbacks->task_pushed(tps->listener, previous_size ? 0 : 1);
return 0;
}
tps_task_free(t);
- ast_mutex_lock(&tps->taskprocessor_lock);
+ ao2_lock(tps);
size = tps_taskprocessor_depth(tps);
if (tps->stats) {
tps->stats->_tasks_processed_count++;
tps->stats->max_qsize = size;
}
}
- ast_mutex_unlock(&tps->taskprocessor_lock);
+ ao2_unlock(tps);
if (size == 0) {
tps->listener->callbacks->emptied(tps->listener);