Create concept of a "pending agent", so you can have agents access pending calls
authorMark Spencer <markster@digium.com>
Tue, 1 Jul 2003 16:16:28 +0000 (16:16 +0000)
committerMark Spencer <markster@digium.com>
Tue, 1 Jul 2003 16:16:28 +0000 (16:16 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@1145 65c4cc65-6c06-0410-ace0-fbb531ad65f3

channels/chan_agent.c
configs/queues.conf.sample
file.c

index d216544..0e90ac3 100755 (executable)
@@ -78,6 +78,7 @@ static pthread_mutex_t agentlock = AST_MUTEX_INITIALIZER;
 static struct agent_pvt {
        pthread_mutex_t lock;                           /* Channel private lock */
        int dead;                                                       /* Poised for destruction? */
+       int pending;                                            /* Not a real agent -- just pending a match */
        unsigned int group;                                     /* Group memberships */
        char moh[80];                                           /* Which music on hold */
        char agent[AST_MAX_AGENT];                      /* Agent ID */
@@ -100,13 +101,31 @@ static struct agent_pvt {
 } while(0)
 
 
-static int add_agent(struct ast_variable *var)
+static void agent_unlink(struct agent_pvt *agent)
+{
+       struct agent_pvt *p, *prev;
+       prev = NULL;
+       p = agents;
+       while(p) {
+               if (p == agent) {
+                       if (prev)
+                               prev->next = agent->next;
+                       else
+                               agents = agent->next;
+                       break;
+               }
+               prev = p;
+               p = p->next;
+       }
+}
+
+static struct agent_pvt *add_agent(char *agent, int pending)
 {
        char tmp[256];
        char *password=NULL, *name=NULL;
-       struct agent_pvt *p;
+       struct agent_pvt *p, *prev;
        
-       strncpy(tmp, var->value, sizeof(tmp));
+       strncpy(tmp, agent, sizeof(tmp));
        if ((password = strchr(tmp, ','))) {
                *password = '\0';
                password++;
@@ -117,10 +136,12 @@ static int add_agent(struct ast_variable *var)
                name++;
                while (*name < 33) name++; 
        }
+       prev=NULL;
        p = agents;
        while(p) {
-               if (!strcmp(p->agent, tmp))
+               if (!pending && !strcmp(p->agent, tmp))
                        break;
+               prev = p;
                p = p->next;
        }
        if (!p) {
@@ -133,20 +154,29 @@ static int add_agent(struct ast_variable *var)
                        p->owning_app = -1;
                        p->app_sleep_cond = 1;
                        p->group = group;
-                       p->next = agents;
-                       agents = p;
+                       p->pending = pending;
+                       p->next = NULL;
+                       if (prev)
+                               prev->next = p;
+                       else
+                               agents = p;
                        
                }
        }
        if (!p)
-               return -1;
+               return NULL;
        strncpy(p->password, password ? password : "", sizeof(p->password) - 1);
        strncpy(p->name, name ? name : "", sizeof(p->name) - 1);
        strncpy(p->moh, moh, sizeof(p->moh) - 1);
-       p->dead = 0;
-       return 0;
+       if (pending)
+               p->dead = 1;
+       else
+               p->dead = 0;
+       return p;
 }
 
+static int check_availability(struct agent_pvt *newlyavailable, int needlock);
+
 static int agent_answer(struct ast_channel *ast)
 {
        ast_log(LOG_WARNING, "Huh?  Agent is being asked to answer?\n");
@@ -157,9 +187,12 @@ static struct ast_frame  *agent_read(struct ast_channel *ast)
 {
        struct agent_pvt *p = ast->pvt->pvt;
        struct ast_frame *f = NULL;
+       static struct ast_frame null_frame = { AST_FRAME_NULL, };
        ast_pthread_mutex_lock(&p->lock);
        if (p->chan)
                f = ast_read(p->chan);
+       else
+               f = &null_frame;
        if (!f) {
                /* If there's a channel, make it NULL */
                if (p->chan)
@@ -182,6 +215,8 @@ static int agent_write(struct ast_channel *ast, struct ast_frame *f)
        ast_pthread_mutex_lock(&p->lock);
        if (p->chan)
                res = ast_write(p->chan, f);
+       else
+               res = 0;
        CLEANUP(ast, p);
        ast_pthread_mutex_unlock(&p->lock);
        return res;
@@ -208,6 +243,8 @@ static int agent_indicate(struct ast_channel *ast, int condition)
        ast_pthread_mutex_lock(&p->lock);
        if (p->chan)
                res = ast_indicate(p->chan, condition);
+       else
+               res = 0;
        ast_pthread_mutex_unlock(&p->lock);
        return res;
 }
@@ -219,6 +256,8 @@ static int agent_digit(struct ast_channel *ast, char digit)
        ast_pthread_mutex_lock(&p->lock);
        if (p->chan)
                res = p->chan->pvt->send_digit(p->chan, digit);
+       else
+               res = 0;
        ast_pthread_mutex_unlock(&p->lock);
        return res;
 }
@@ -228,6 +267,18 @@ static int agent_call(struct ast_channel *ast, char *dest, int timeout)
        struct agent_pvt *p = ast->pvt->pvt;
        int res = -1;
        ast_pthread_mutex_lock(&p->lock);
+       if (!p->chan) {
+               if (p->pending) {
+                       ast_log(LOG_DEBUG, "Pretending to dial on pending agent\n");
+                       ast_setstate(ast, AST_STATE_DIALING);
+                       res = 0;
+               } else {
+                       ast_log(LOG_NOTICE, "Whoa, they hung up between alloc and call...  what are the odds of that?\n");
+                       res = -1;
+               }
+               ast_pthread_mutex_unlock(&p->lock);
+               return res;
+       }
        ast_verbose( VERBOSE_PREFIX_3 "agent_call, call to agent '%s' call on '%s'\n", p->agent, p->chan->name);
        ast_log( LOG_DEBUG, "Playing beep, lang '%s'\n", p->chan->language);
        res = ast_streamfile(p->chan, "beep", p->chan->language);
@@ -241,8 +292,7 @@ static int agent_call(struct ast_channel *ast, char *dest, int timeout)
                ast_log( LOG_DEBUG, "Set read format, result '%d'\n", res);
                if (res)
                        ast_log(LOG_WARNING, "Unable to set read format to %d\n", ast_best_codec(p->chan->nativeformats));
-       }
-       else {
+       } else {
                // Agent hung-up
                p->chan = NULL;
        }
@@ -277,6 +327,8 @@ static int agent_hangup(struct ast_channel *ast)
                        ast_softhangup(p->chan, AST_SOFTHANGUP_EXPLICIT);
                ast_moh_start(p->chan, p->moh);
                ast_pthread_mutex_unlock(&p->chan->lock);
+       }
+#if 0
                ast_pthread_mutex_unlock(&p->lock);
                /* Release ownership of the agent to other threads (presumably running the login app). */
                ast_pthread_mutex_unlock(&p->app_lock);
@@ -285,13 +337,27 @@ static int agent_hangup(struct ast_channel *ast)
                ast_pthread_mutex_unlock(&p->lock);
                /* Release ownership of the agent to other threads (presumably running the login app). */
                ast_pthread_mutex_unlock(&p->app_lock);
-               free(p);
        } else {
                ast_pthread_mutex_unlock(&p->lock);
                /* Release ownership of the agent to other threads (presumably running the login app). */
                ast_pthread_mutex_unlock(&p->app_lock);
        }
+#endif 
+       ast_pthread_mutex_unlock(&p->lock);
+       /* Release ownership of the agent to other threads (presumably running the login app). */
+       ast_pthread_mutex_unlock(&p->app_lock);
 
+       if (p->pending) {
+               agent_unlink(p);
+       }
+       if (p->dead)
+               free(p);
+       else {
+               /* Not dead -- check availability now */
+               ast_pthread_mutex_lock(&p->lock);
+               check_availability(p, 1);
+               ast_pthread_mutex_unlock(&p->lock);
+       }
        return 0;
 }
 
@@ -305,8 +371,10 @@ static int agent_cont_sleep( void *data )
        ast_pthread_mutex_lock(&p->lock);
        res = p->app_sleep_cond;
        ast_pthread_mutex_unlock(&p->lock);
+#if 0
        if( !res )
                ast_log( LOG_DEBUG, "agent_cont_sleep() returning %d\n", res );
+#endif         
        return res;
 }
 
@@ -314,20 +382,36 @@ static struct ast_channel *agent_new(struct agent_pvt *p, int state)
 {
        struct ast_channel *tmp;
        struct ast_frame null_frame = { AST_FRAME_NULL };
+#if 0
        if (!p->chan) {
                ast_log(LOG_WARNING, "No channel? :(\n");
                return NULL;
        }
+#endif 
        tmp = ast_channel_alloc(0);
        if (tmp) {
-               tmp->nativeformats = p->chan->nativeformats;
-               snprintf(tmp->name, sizeof(tmp->name), "Agent/%s", p->agent);
+               if (p->chan) {
+                       tmp->nativeformats = p->chan->nativeformats;
+                       tmp->writeformat = p->chan->writeformat;
+                       tmp->pvt->rawwriteformat = p->chan->writeformat;
+                       tmp->readformat = p->chan->readformat;
+                       tmp->pvt->rawreadformat = p->chan->readformat;
+                       strncpy(tmp->language, p->chan->language, sizeof(tmp->language)-1);
+                       strncpy(tmp->context, p->chan->context, sizeof(tmp->context)-1);
+                       strncpy(tmp->exten, p->chan->exten, sizeof(tmp->exten)-1);
+               } else {
+                       tmp->nativeformats = AST_FORMAT_SLINEAR;
+                       tmp->writeformat = AST_FORMAT_SLINEAR;
+                       tmp->pvt->rawwriteformat = AST_FORMAT_SLINEAR;
+                       tmp->readformat = AST_FORMAT_SLINEAR;
+                       tmp->pvt->rawreadformat = AST_FORMAT_SLINEAR;
+               }
+               if (p->pending)
+                       snprintf(tmp->name, sizeof(tmp->name), "Agent/P%s-%d", p->agent, rand() & 0xffff);
+               else
+                       snprintf(tmp->name, sizeof(tmp->name), "Agent/%s", p->agent);
                tmp->type = type;
                ast_setstate(tmp, state);
-               tmp->writeformat = p->chan->writeformat;
-               tmp->pvt->rawwriteformat = p->chan->writeformat;
-               tmp->readformat = p->chan->readformat;
-               tmp->pvt->rawreadformat = p->chan->readformat;
                tmp->pvt->pvt = p;
                tmp->pvt->send_digit = agent_digit;
                tmp->pvt->call = agent_call;
@@ -338,14 +422,11 @@ static struct ast_channel *agent_new(struct agent_pvt *p, int state)
                tmp->pvt->exception = agent_read;
                tmp->pvt->indicate = agent_indicate;
                tmp->pvt->fixup = agent_fixup;
-               strncpy(tmp->language, p->chan->language, sizeof(tmp->language)-1);
                p->owner = tmp;
                ast_pthread_mutex_lock(&usecnt_lock);
                usecnt++;
                ast_pthread_mutex_unlock(&usecnt_lock);
                ast_update_use_count();
-               strncpy(tmp->context, p->chan->context, sizeof(tmp->context)-1);
-               strncpy(tmp->exten, p->chan->exten, sizeof(tmp->exten)-1);
                tmp->priority = 1;
                /* Wake up and wait for other applications (by definition the login app)
                 * to release this channel). Takes ownership of the agent channel
@@ -373,11 +454,13 @@ static struct ast_channel *agent_new(struct agent_pvt *p, int state)
                }
                p->owning_app = pthread_self();
                /* After the above step, there should not be any blockers. */
-               if (p->chan->blocking) {
-                       ast_log( LOG_ERROR, "A blocker exists after agent channel ownership acquired\n" );
-                       CRASH;
+               if (p->chan) {
+                       if (p->chan->blocking) {
+                               ast_log( LOG_ERROR, "A blocker exists after agent channel ownership acquired\n" );
+                               CRASH;
+                       }
+                       ast_moh_stop(p->chan);
                }
-               ast_moh_stop(p->chan);
        } else
                ast_log(LOG_WARNING, "Unable to allocate channel structure\n");
        return tmp;
@@ -406,7 +489,7 @@ static int read_agent_config(void)
        while(v) {
                /* Create the interface list */
                if (!strcasecmp(v->name, "agent")) {
-                       add_agent(v);
+                       add_agent(v->value, 0);
                } else if (!strcasecmp(v->name, "group")) {
                        group = ast_get_group(v->value);
                } else if (!strcasecmp(v->name, "musiconhold")) {
@@ -442,23 +525,76 @@ static int read_agent_config(void)
        return 0;
 }
 
+static int check_availability(struct agent_pvt *newlyavailable, int needlock)
+{
+       struct ast_channel *chan=NULL, *parent=NULL;
+       struct agent_pvt *p;
+       int res;
+       ast_log(LOG_DEBUG, "Checking availability of '%s'\n", newlyavailable->agent);
+       if (needlock)
+               ast_pthread_mutex_lock(&agentlock);
+       p = agents;
+       while(p) {
+               if (p == newlyavailable) {
+                       p = p->next;
+                       continue;
+               }
+               ast_pthread_mutex_lock(&p->lock);
+               if (p->pending && ((p->group && (newlyavailable->group & p->group)) || !strcmp(p->agent, newlyavailable->agent))) {
+                       ast_log(LOG_DEBUG, "Call '%s' looks like a winner for agent '%s'\n", p->owner->name, newlyavailable->agent);
+                       /* We found a pending call, time to merge */
+                       chan = agent_new(newlyavailable, AST_STATE_DOWN);
+                       parent = p->owner;
+                       ast_pthread_mutex_unlock(&p->lock);
+                       break;
+               }
+               ast_pthread_mutex_unlock(&p->lock);
+               p = p->next;
+       }
+       if (needlock)
+               ast_pthread_mutex_unlock(&agentlock);
+       if (parent && chan)  {
+               ast_log( LOG_DEBUG, "Playing beep, lang '%s'\n", newlyavailable->chan->language);
+               res = ast_streamfile(newlyavailable->chan, "beep", newlyavailable->chan->language);
+               ast_log( LOG_DEBUG, "Played beep, result '%d'\n", res);
+               if (!res) {
+                       res = ast_waitstream(newlyavailable->chan, "");
+                       ast_log( LOG_DEBUG, "Waited for stream, result '%d'\n", res);
+               }
+               if (!res) {
+                       ast_setstate(parent, AST_STATE_UP);
+                       ast_setstate(chan, AST_STATE_UP);
+                       ast_channel_masquerade(parent, chan);
+                       ast_hangup(chan);
+               } else {
+                       ast_log(LOG_NOTICE, "Not sure this will always work..\n");
+                       ast_hangup(chan);
+               }
+       }
+       return 0;
+}
+
 static struct ast_channel *agent_request(char *type, int format, void *data)
 {
        struct agent_pvt *p;
        struct ast_channel *chan = NULL;
        char *s;
        unsigned int groupmatch;
+       int waitforagent=0;
        s = data;
        if ((s[0] == '@') && (sscanf(s + 1, "%d", &groupmatch) == 1)) {
                groupmatch = (1 << groupmatch);
+       } else if ((s[0] == ':') && (sscanf(s + 1, "%d", &groupmatch) == 1)) {
+               groupmatch = (1 << groupmatch);
+               waitforagent = 1;
        } else {
                groupmatch = 0;
        }
        ast_pthread_mutex_lock(&agentlock);
        p = agents;
        while(p) {
-               if ((groupmatch && (p->group & groupmatch)) || !strcmp(data, p->agent)) {
-                       ast_pthread_mutex_lock(&p->lock);
+               ast_pthread_mutex_lock(&p->lock);
+               if (!p->pending && ((groupmatch && (p->group & groupmatch)) || !strcmp(data, p->agent))) {
                        /* Agent must be registered, but not have any active call */
                        if (!p->owner && p->chan) {
                                chan = agent_new(p, AST_STATE_DOWN);
@@ -466,12 +602,33 @@ static struct ast_channel *agent_request(char *type, int format, void *data)
                        ast_pthread_mutex_unlock(&p->lock);
                        break;
                }
+               ast_pthread_mutex_unlock(&p->lock);
                p = p->next;
        }
+       if (!chan && waitforagent) {
+               /* No agent available -- but we're requesting to wait for one.
+                  Allocate a place holder */
+               ast_log(LOG_DEBUG, "Creating place holder for '%s'\n", s);
+               p = add_agent(data, 1);
+               p->group = groupmatch;
+               chan = agent_new(p, AST_STATE_DOWN);
+               if (!chan) {
+                       ast_log(LOG_WARNING, "Weird...  Fix this to drop the unused pending agent\n");
+               }
+       }
        ast_pthread_mutex_unlock(&agentlock);
        return chan;
 }
 
+static int powerof(unsigned int v)
+{
+       int x;
+       for (x=0;x<32;x++) {
+               if (v & (1 << x)) return x;
+       }
+       return 0;
+}
+
 static int agents_show(int fd, int argc, char **argv)
 {
        struct agent_pvt *p;
@@ -485,24 +642,31 @@ static int agents_show(int fd, int argc, char **argv)
        p = agents;
        while(p) {
                ast_pthread_mutex_lock(&p->lock);
-               if (strlen(p->name))
-                       snprintf(username, sizeof(username), "(%s) ", p->name);
-               else
-                       strcpy(username, "");
-               if (p->chan) {
-                       snprintf(location, sizeof(location), "logged in on %s", p->chan->name);
-                       if (p->owner && p->owner->bridge) {
-                               snprintf(talkingto, sizeof(talkingto), " talking to %s", p->owner->bridge->name);
+               if (p->pending) {
+                       if (p->group)
+                               ast_cli(fd, "-- Pending call to group %d\n", powerof(p->group));
+                       else
+                               ast_cli(fd, "-- Pending call to agent %s\n", p->agent);
+               } else {
+                       if (strlen(p->name))
+                               snprintf(username, sizeof(username), "(%s) ", p->name);
+                       else
+                               strcpy(username, "");
+                       if (p->chan) {
+                               snprintf(location, sizeof(location), "logged in on %s", p->chan->name);
+                               if (p->owner && p->owner->bridge) {
+                                       snprintf(talkingto, sizeof(talkingto), " talking to %s", p->owner->bridge->name);
+                               } else {
+                                       strcpy(talkingto, " is idle");
+                               }
                        } else {
-                               strcpy(talkingto, " is idle");
+                               strcpy(location, "not logged in");
+                               strcpy(talkingto, "");
                        }
-               } else {
-                       strcpy(location, "not logged in");
-                       strcpy(talkingto, "");
+                       ast_cli(fd, "%-12.12s %s%s%s\n", p->agent, 
+                                       username, location, talkingto);
                }
                ast_pthread_mutex_unlock(&p->lock);
-               ast_cli(fd, "%-12.12s %s%s%s\n", p->agent, 
-                               username, location, talkingto);
                p = p->next;
        }
        ast_pthread_mutex_unlock(&agentlock);
@@ -561,7 +725,7 @@ static int login_exec(struct ast_channel *chan, void *data)
                ast_pthread_mutex_lock(&agentlock);
                p = agents;
                while(p) {
-                       if (!strcmp(p->agent, user))
+                       if (!strcmp(p->agent, user) && !p->pending)
                                strncpy(xpass, p->password, sizeof(xpass) - 1);
                        p = p->next;
                }
@@ -584,7 +748,7 @@ static int login_exec(struct ast_channel *chan, void *data)
                while(p) {
                        ast_pthread_mutex_lock(&p->lock);
                        if (!strcmp(p->agent, user) &&
-                               !strcmp(p->password, pass)) {
+                               !strcmp(p->password, pass) && !p->pending) {
                                        if (!p->chan) {
                                                play_announcement = 1;
                                                if( options )
@@ -622,6 +786,7 @@ static int login_exec(struct ast_channel *chan, void *data)
                                                        /* Login this channel and wait for it to
                                                           go away */
                                                        p->chan = chan;
+                                                       check_availability(p, 0);
                                                        ast_pthread_mutex_unlock(&p->lock);
                                                        ast_pthread_mutex_unlock(&agentlock);
                                                        while (res >= 0) {
index 7b67830..e0da44c 100755 (executable)
@@ -50,3 +50,6 @@
 ;member => Zap/2
 ;member => Agent/1001
 ;member => Agent/1002
+
+;member => Agent/@1            ; Any agent in group 1
+;member => Agent/:1            ; Any agent in group 1, wait for first.
diff --git a/file.c b/file.c
index 6b5f5af..b3e92e3 100755 (executable)
--- a/file.c
+++ b/file.c
@@ -778,10 +778,7 @@ char ast_waitstream(struct ast_channel *c, char *breakon)
        while(c->stream) {
                res = ast_sched_wait(c->sched);
                if ((res < 0) && !c->timingfunc) {
-                       if (c->stream)
-                               ast_closestream(c->stream);
-                       if (c->vstream)
-                               ast_closestream(c->vstream);
+                       ast_stopstream(c);
                        break;
                }
                if (res < 0)
@@ -835,10 +832,7 @@ char ast_waitstream_fr(struct ast_channel *c, char *breakon, char *forward, char
        while(c->stream) {
                res = ast_sched_wait(c->sched);
                if ((res < 0) && !c->timingfunc) {
-                       if (c->stream)
-                               ast_closestream(c->stream);
-                       if (c->vstream)
-                               ast_closestream(c->vstream);
+                       ast_stopstream(c);
                        break;
                }
                if (res < 0)
@@ -903,10 +897,7 @@ char ast_waitstream_full(struct ast_channel *c, char *breakon, int audiofd, int
        while(c->stream) {
                ms = ast_sched_wait(c->sched);
                if ((ms < 0) && !c->timingfunc) {
-                       if (c->stream)
-                               ast_closestream(c->stream);
-                       if (c->vstream)
-                               ast_closestream(c->vstream);
+                       ast_stopstream(c);
                        break;
                }
                if (ms < 0)