Big agent / queue fixes
authorMark Spencer <markster@digium.com>
Sat, 13 Nov 2004 22:44:33 +0000 (22:44 +0000)
committerMark Spencer <markster@digium.com>
Sat, 13 Nov 2004 22:44:33 +0000 (22:44 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@4231 65c4cc65-6c06-0410-ace0-fbb531ad65f3

apps/app_queue.c
channels/chan_agent.c
include/asterisk/pbx.h
loader.c
pbx.c

index 4e669c7..0b08673 100755 (executable)
@@ -303,6 +303,79 @@ static int has_no_members(struct ast_call_queue *q)
        return empty;
 }
 
+struct statechange {
+       int state;
+       char dev[0];
+};
+
+static void *changethread(void *data)
+{
+       struct ast_call_queue *q;
+       struct statechange *sc = data;
+       struct member *cur;
+       char *loc;
+       loc = strchr(sc->dev, '/');
+       if (loc) {
+               *loc = '\0';
+               loc++;
+       } else {
+               ast_log(LOG_WARNING, "Can't change device with no technology!\n");
+               free(sc);
+               return NULL;
+       }
+       if (option_debug)
+               ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d'\n", sc->dev, loc, sc->state);
+       ast_mutex_lock(&qlock);
+       for (q = queues; q; q = q->next) {
+               ast_mutex_lock(&q->lock);
+               cur = q->members;
+               while(cur) {
+                       if (!strcasecmp(sc->dev, cur->tech) && !strcmp(loc, cur->loc)) {
+                               if (cur->status != sc->state) {
+                                       cur->status = sc->state;
+                                       manager_event(EVENT_FLAG_AGENT, "QueueMemberStatus",
+                                               "Queue: %s\r\n"
+                                               "Location: %s/%s\r\n"
+                                               "Membership: %s\r\n"
+                                               "Penalty: %d\r\n"
+                                               "CallsTaken: %d\r\n"
+                                               "LastCall: %ld\r\n"
+                                               "Status: %d\r\n",
+                                       q->name, cur->tech, cur->loc, cur->dynamic ? "dynamic" : "static",
+                                       cur->penalty, cur->calls, cur->lastcall, cur->status);
+                               }
+                       }
+                       cur = cur->next;
+               }
+               ast_mutex_unlock(&q->lock);
+       }
+       ast_mutex_unlock(&qlock);
+       ast_log(LOG_DEBUG, "Device '%s/%s' changed to state '%d'\n", sc->dev, loc, sc->state);
+       free(sc);
+       return NULL;
+}
+
+static int statechange_queue(const char *dev, int state, void *ign)
+{
+       /* Avoid potential for deadlocks by spawning a new thread to handle
+          the event */
+       struct statechange *sc;
+       pthread_t t;
+       pthread_attr_t attr;
+       sc = malloc(sizeof(struct statechange) + strlen(dev) + 1);
+       if (sc) {
+               sc->state = state;
+               strcpy(sc->dev, dev);
+               pthread_attr_init(&attr);
+               pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
+               if (ast_pthread_create(&t, &attr, changethread, sc)) {
+                       ast_log(LOG_WARNING, "Failed to create update thread!\n");
+                       free(sc);
+               }
+       }
+       return 0;
+}
+
 static int join_queue(char *queuename, struct queue_ent *qe)
 {
        struct ast_call_queue *q;
@@ -316,7 +389,7 @@ static int join_queue(char *queuename, struct queue_ent *qe)
                if (!strcasecmp(q->name, queuename)) {
                        /* This is our one */
                        ast_mutex_lock(&q->lock);
-                       if ((!has_no_members(q) || q->joinempty || !q->head) && (!q->maxlen || (q->count < q->maxlen))) {
+                       if ((!has_no_members(q) || q->joinempty) && (!q->maxlen || (q->count < q->maxlen))) {
                                /* There's space for us, put us at the right position inside
                                 * the queue. 
                                 * Take into account the priority of the calling user */
@@ -591,8 +664,7 @@ static int update_status(struct ast_call_queue *q, struct member *member, int st
                                "Penalty: %d\r\n"
                                "CallsTaken: %d\r\n"
                                "LastCall: %ld\r\n"
-                               "Status: %d\r\n"
-                               "\r\n",
+                               "Status: %d\r\n",
                                        q->name, cur->tech, cur->loc, cur->dynamic ? "dynamic" : "static",
                                        cur->penalty, cur->calls, cur->lastcall, cur->status);
                        break;
@@ -604,6 +676,19 @@ static int update_status(struct ast_call_queue *q, struct member *member, int st
        return 0;
 }
 
+static int update_dial_status(struct ast_call_queue *q, struct member *member, int status)
+{
+       if (status == AST_CAUSE_BUSY)
+               status = AST_DEVICE_BUSY;
+       else if (status == AST_CAUSE_UNREGISTERED)
+               status = AST_DEVICE_UNAVAILABLE;
+       else if (status == AST_CAUSE_NOSUCHDRIVER)
+               status = AST_DEVICE_INVALID;
+       else
+               status = AST_DEVICE_UNKNOWN;
+       return update_status(q, member, status);
+}
+
 static int ring_entry(struct queue_ent *qe, struct localuser *tmp)
 {
        int res;
@@ -624,10 +709,10 @@ static int ring_entry(struct queue_ent *qe, struct localuser *tmp)
                if (qe->chan->cdr)
                        ast_cdr_busy(qe->chan->cdr);
                tmp->stillgoing = 0;
-               update_status(qe->parent, tmp->member, status);
+               update_dial_status(qe->parent, tmp->member, status);
                return 0;
        } else if (status != tmp->oldstatus) 
-               update_status(qe->parent, tmp->member, status);
+               update_dial_status(qe->parent, tmp->member, status);
        
        tmp->chan->appl = "AppQueue";
        tmp->chan->data = "(Outgoing Line)";
@@ -855,7 +940,7 @@ static struct localuser *wait_for_answer(struct queue_ent *qe, struct localuser
                                        /* Setup parameters */
                                        o->chan = ast_request(tech, in->nativeformats, stuff, &status);
                                        if (status != o->oldstatus) 
-                                               update_status(qe->parent, o->member, status);                                           
+                                               update_dial_status(qe->parent, o->member, status);                                              
                                        if (!o->chan) {
                                                ast_log(LOG_NOTICE, "Unable to create local channel for call forward to '%s/%s'\n", tech, stuff);
                                                o->stillgoing = 0;
@@ -2095,8 +2180,16 @@ static void reload_queues(void)
                                free(q);
                        } else
                                ast_log(LOG_WARNING, "XXX Leaking a little memory :( XXX\n");
-               } else
+               } else {
+                       char tmp[256];
+                       cur = q->members;
+                       while(cur) {
+                               snprintf(tmp, sizeof(tmp), "%s/%s", cur->tech, cur->loc);
+                               cur->status = ast_device_state(tmp);
+                               cur = cur->next;
+                       }
                        ql = q;
+               }
                q = qn;
        }
        ast_mutex_unlock(&qlock);
@@ -2105,20 +2198,23 @@ static void reload_queues(void)
 static char *status2str(int status, char *buf, int buflen)
 {
        switch(status) {
-       case AST_CAUSE_BUSY:
-               strncpy(buf, "busy", buflen - 1);
+       case AST_DEVICE_UNKNOWN:
+               strncpy(buf, "unknown", buflen - 1);
                break;
-       case AST_CAUSE_CONGESTION:
-               strncpy(buf, "congestion", buflen - 1);
+       case AST_DEVICE_NOT_INUSE:
+               strncpy(buf, "notinuse", buflen - 1);
                break;
-       case AST_CAUSE_FAILURE:
-               strncpy(buf, "failure", buflen - 1);
+       case AST_DEVICE_INUSE:
+               strncpy(buf, "inuse", buflen - 1);
+               break;
+       case AST_DEVICE_BUSY:
+               strncpy(buf, "busy", buflen - 1);
                break;
-       case AST_CAUSE_UNREGISTERED:
-               strncpy(buf, "unregistered", buflen - 1);
+       case AST_DEVICE_INVALID:
+               strncpy(buf, "invalid", buflen - 1);
                break;
-       case AST_CAUSE_NOSUCHDRIVER:
-               strncpy(buf, "nosuchdriver", buflen - 1);
+       case AST_DEVICE_UNAVAILABLE:
+               strncpy(buf, "unavailable", buflen - 1);
                break;
        default:
                snprintf(buf, buflen, "unknown status %d", status);
@@ -2593,6 +2689,7 @@ int unload_module(void)
        ast_manager_unregister("QueueStatus");
        ast_manager_unregister("QueueAdd");
        ast_manager_unregister("QueueRemove");
+       ast_devstate_del(statechange_queue, NULL);
        ast_unregister_application(app_aqm);
        ast_unregister_application(app_rqm);
        return ast_unregister_application(app);
@@ -2607,6 +2704,7 @@ int load_module(void)
                ast_cli_register(&cli_show_queues);
                ast_cli_register(&cli_add_queue_member);
                ast_cli_register(&cli_remove_queue_member);
+               ast_devstate_add(statechange_queue, NULL);
                ast_manager_register( "Queues", 0, manager_queues_show, "Queues" );
                ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" );
                ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." );
index a000caa..e13bbca 100755 (executable)
@@ -591,6 +591,7 @@ static int agent_hangup(struct ast_channel *ast)
                                snprintf(agent, sizeof(agent), "Agent/%s", p->agent);
                                ast_queue_log("NONE", ast->uniqueid, agent, "AGENTCALLBACKLOGOFF", "%s|%ld|%s", p->loginchan, logintime, "Autologoff");
                                p->loginchan[0] = '\0';
+                           ast_device_state_changed("Agent/%s", p->agent);
                        }
                } else if (p->dead) {
                        ast_mutex_lock(&p->chan->lock);
@@ -1479,6 +1480,7 @@ static int __login_exec(struct ast_channel *chan, void *data, int callbackmode)
                                                                ast_queue_log("NONE", chan->uniqueid, agent, "AGENTCALLBACKLOGIN", "%s", p->loginchan);
                                                                if (option_verbose > 2)
                                                                        ast_verbose(VERBOSE_PREFIX_3 "Callback Agent '%s' logged in on %s\n", p->agent, p->loginchan);
+                                                           ast_device_state_changed("Agent/%s", p->agent);
                                                        } else {
                                                                logintime = time(NULL) - p->loginstart;
                                                                p->loginstart = 0;
@@ -1491,6 +1493,7 @@ static int __login_exec(struct ast_channel *chan, void *data, int callbackmode)
                                                                ast_queue_log("NONE", chan->uniqueid, agent, "AGENTCALLBACKLOGOFF", "%s|%ld|", last_loginchan, logintime);
                                                                if (option_verbose > 2)
                                                                        ast_verbose(VERBOSE_PREFIX_3 "Callback Agent '%s' logged out\n", p->agent);
+                                                           ast_device_state_changed("Agent/%s", p->agent);
                                                        }
                                                        ast_mutex_unlock(&agentlock);
                                                        if (!res)
@@ -1525,6 +1528,7 @@ static int __login_exec(struct ast_channel *chan, void *data, int callbackmode)
                                                                check_availability(p, 0);
                                                        ast_mutex_unlock(&p->lock);
                                                        ast_mutex_unlock(&agentlock);
+                                                   ast_device_state_changed("Agent/%s", p->agent);
                                                        while (res >= 0) {
                                                                ast_mutex_lock(&p->lock);
                                                                if (p->chan != chan)
@@ -1591,6 +1595,7 @@ static int __login_exec(struct ast_channel *chan, void *data, int callbackmode)
                                                        if (option_verbose > 2)
                                                                ast_verbose(VERBOSE_PREFIX_3 "Agent '%s' logged out\n", p->agent);
                                                        /* If there is no owner, go ahead and kill it now */
+                                                   ast_device_state_changed("Agent/%s", p->agent);
                                                        if (p->dead && !p->owner) {
                                                                ast_mutex_destroy(&p->lock);
                                                                ast_mutex_destroy(&p->app_lock);
@@ -1745,7 +1750,6 @@ static int agent_devicestate(void *data)
        while(p) {
                ast_mutex_lock(&p->lock);
                if (!p->pending && ((groupmatch && (p->group & groupmatch)) || !strcmp(data, p->agent))) {
-                               res = AST_DEVICE_UNKNOWN;
                        if (p->owner) {
                                if (res != AST_DEVICE_INUSE)
                                        res = AST_DEVICE_BUSY;
index a83df1a..327a4ee 100755 (executable)
@@ -51,6 +51,8 @@ struct ast_sw;
 
 typedef int (*ast_state_cb_type)(char *context, char* id, int state, void *data);
 
+typedef int (*ast_devstate_cb_type)(const char *dev, int state, void *data);
+
 //! Data structure associated with an asterisk switch
 struct ast_switch {
        /*! NULL */
@@ -262,6 +264,15 @@ int ast_device_state_changed(const char *fmt, ...)
 int ast_extension_state_add(const char *context, const char *exten, 
                            ast_state_cb_type callback, void *data);
 
+//! Registers a device state change callback
+/*!
+ * \param data to pass to callback
+ * The callback is called if the state for extension is changed
+ * Return -1 on failure, ID on success
+ */ 
+int ast_devstate_add(ast_devstate_cb_type callback, void *data);
+void ast_devstate_del(ast_devstate_cb_type callback, void *data);
+
 //! Deletes a registered state change callback by ID
 /*!
  * \param id of the callback to delete
index 0f3876a..5280d5a 100755 (executable)
--- a/loader.c
+++ b/loader.c
@@ -408,6 +408,14 @@ static int ast_resource_exists(char *resource)
                return 0;
 }
 
+static const char *loadorder[] =
+{
+       "res_",
+       "chan_",
+       "pbx_",
+       NULL,
+};
+
 int load_modules()
 {
        struct ast_config *cfg;
@@ -442,13 +450,14 @@ int load_modules()
                DIR *mods;
                struct dirent *d;
                int x;
-               /* Make two passes.  First, load any resource modules, then load the others. */
-               for (x=0;x<2;x++) {
+               /* Loop through each order */
+               for (x=0;x<sizeof(loadorder) / sizeof(loadorder[0]);x++) {
                        mods = opendir((char *)ast_config_AST_MODULE_DIR);
                        if (mods) {
                                while((d = readdir(mods))) {
                                        /* Must end in .so to load it.  */
-                                       if ((strlen(d->d_name) > 3) && (x || !strncasecmp(d->d_name, "res_", 4)) && 
+                                       if ((strlen(d->d_name) > 3) && 
+                                           (!loadorder[x] || !strncasecmp(d->d_name, loadorder[x], strlen(loadorder[x]))) && 
                                            !strcasecmp(d->d_name + strlen(d->d_name) - 3, ".so") &&
                                                !ast_resource_exists(d->d_name)) {
                                                /* It's a shared library -- Just be sure we're allowed to load it -- kinda
diff --git a/pbx.c b/pbx.c
index 9f71877..426b12a 100755 (executable)
--- a/pbx.c
+++ b/pbx.c
@@ -135,6 +135,15 @@ struct ast_state_cb {
     struct ast_state_cb *next;
 };
            
+/* ast_state_cb: An extension state notify */
+struct ast_devstate_cb {
+    void *data;
+    ast_devstate_cb_type callback;
+    struct ast_devstate_cb *next;
+};
+
+static struct ast_devstate_cb *devcbs;
+
 struct ast_hint {
     struct ast_exten *exten;
     int laststate; 
@@ -1445,6 +1454,7 @@ int ast_device_state_changed(const char *fmt, ...)
 {
        struct ast_hint *list;
        struct ast_state_cb *cblist;
+       struct ast_devstate_cb *devcb;
        char hint[AST_MAX_EXTENSION] = "";
        char device[AST_MAX_EXTENSION];
        char *cur, *rest;
@@ -1461,8 +1471,16 @@ int ast_device_state_changed(const char *fmt, ...)
                *rest = 0;
        }
 
+       state = ast_device_state(device);
+
        ast_mutex_lock(&hintlock);
 
+       devcb = devcbs;
+       while(devcb) {
+               if (devcb->callback)
+                       devcb->callback(device, state, devcb->data);
+               devcb = devcb->next;
+       }
        list = hints;
 
        while (list) {
@@ -1506,6 +1524,42 @@ int ast_device_state_changed(const char *fmt, ...)
        return 1;
 }
                        
+int ast_devstate_add(ast_devstate_cb_type callback, void *data)
+{
+       struct ast_devstate_cb *devcb;
+       devcb = malloc(sizeof(struct ast_devstate_cb));
+       if (devcb) {
+               memset(devcb, 0, sizeof(struct ast_devstate_cb));
+               ast_mutex_lock(&hintlock);
+               devcb->data = data;
+               devcb->callback = callback;
+               devcb->next = devcbs;
+               devcbs = devcb;
+               ast_mutex_unlock(&hintlock);
+       }
+       return 0;
+}
+
+void ast_devstate_del(ast_devstate_cb_type callback, void *data)
+{
+       struct ast_devstate_cb *devcb, *prev = NULL, *next;
+       ast_mutex_lock(&hintlock);
+       devcb = devcbs;
+       while(devcb) {
+               next = devcb->next;
+               if ((devcb->data == data) && (devcb->callback == callback)) {
+                       if (prev)
+                               prev->next = next;
+                       else
+                               devcbs = next;
+                       free(devcb);
+               } else
+                       prev = devcb;
+               devcb = next;
+       }
+       ast_mutex_unlock(&hintlock);
+}
+
 int ast_extension_state_add(const char *context, const char *exten, 
                            ast_state_cb_type callback, void *data)
 {