Merge mjohnston's pause/unpause (bug #3252)
authorMark Spencer <markster@digium.com>
Fri, 21 Jan 2005 04:14:26 +0000 (04:14 +0000)
committerMark Spencer <markster@digium.com>
Fri, 21 Jan 2005 04:14:26 +0000 (04:14 +0000)
git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@4861 65c4cc65-6c06-0410-ace0-fbb531ad65f3

apps/app_queue.c

index 27a8149..8ab4351 100755 (executable)
@@ -148,6 +148,30 @@ static char *app_rqm_descrip =
 "Example: RemoveQueueMember(techsupport|SIP/3000)\n"
 "";
 
+static char *app_pqm = "PauseQueueMember" ;
+static char *app_pqm_synopsis = "Pauses a queue member" ;
+static char *app_pqm_descrip =
+"   PauseQueueMember([queuename]|interface):\n"
+"Pauses (blocks calls for) a queue member.\n"
+"The given interface will be paused in the given queue.  This prevents\n"
+"any calls from being sent from the queue to the interface until it is\n"
+"unpaused with UnpauseQueueMember or the manager interface.  If no\n"
+"queuename is given, the interface is paused in every queue it is a\n"
+"member of.  If the interface is not in the named queue, or if no queue\n"
+"is given and the interface is not in any queue, it will jump to\n"
+" priority n+101, if it exists.  Returns -1 if the interface is not\n"
+"found and no extension to jump to exists, 0 otherwise.\n"
+"Example: PauseQueueMember(|SIP/3000)\n";
+
+static char *app_upqm = "UnpauseQueueMember" ;
+static char *app_upqm_synopsis = "Unpauses a queue member" ;
+static char *app_upqm_descrip =
+"   UnpauseQueueMember([queuename]|interface):\n"
+"Unpauses (resumes calls to) a queue member.\n"
+"This is the counterpart to PauseQueueMember and operates exactly the\n"
+"same way, except it unpauses instead of pausing the given interface.\n"
+"Example: UnpauseQueueMember(|SIP/3000)\n";
+
 /* Persistent Members astdb family */
 static const char *pm_family = "/Queue/PersistentMembers";
 /* The maximum lengh of each persistent member queue database entry */
@@ -214,6 +238,7 @@ struct member {
        int calls;                      /* Number of calls serviced by this member */
        int dynamic;                    /* Are we dynamically added? */
        int status;                     /* Status of queue member */
+       int paused;                     /* Are we paused (not accepting calls)? */
        time_t lastcall;                /* When last successful call was hungup */
        struct member *next;            /* Next member */
 };
@@ -367,9 +392,10 @@ static void *changethread(void *data)
                                                "Penalty: %d\r\n"
                                                "CallsTaken: %d\r\n"
                                                "LastCall: %ld\r\n"
-                                               "Status: %d\r\n",
+                                               "Status: %d\r\n"
+                                               "Paused: %d\r\n",
                                        q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
-                                       cur->penalty, cur->calls, cur->lastcall, cur->status);
+                                       cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused);
                                }
                        }
                        cur = cur->next;
@@ -693,9 +719,10 @@ static int update_status(struct ast_call_queue *q, struct member *member, int st
                                "Penalty: %d\r\n"
                                "CallsTaken: %d\r\n"
                                "LastCall: %ld\r\n"
-                               "Status: %d\r\n",
+                               "Status: %d\r\n"
+                               "Paused: %d\r\n",
                                        q->name, cur->interface, cur->dynamic ? "dynamic" : "static",
-                                       cur->penalty, cur->calls, cur->lastcall, cur->status);
+                                       cur->penalty, cur->calls, cur->lastcall, cur->status, cur->paused);
                        break;
                }
                cur = cur->next;
@@ -789,6 +816,15 @@ static int ring_entry(struct queue_ent *qe, struct localuser *tmp, int *busies)
                return 0;
        }
        
+       if (tmp->member->paused) {
+               if (option_debug)
+                       ast_log(LOG_DEBUG, "%s paused, can't receive call\n", tmp->interface);
+               if (qe->chan->cdr)
+                       ast_cdr_busy(qe->chan->cdr);
+               tmp->stillgoing = 0;
+               return 0;
+       }
+
        strncpy(tech, tmp->interface, sizeof(tech) - 1);
        if ((location = strchr(tech, '/')))
                *location++ = '\0';
@@ -1616,7 +1652,7 @@ static struct member * interface_exists(struct ast_call_queue *q, char *interfac
 }
 
 
-static struct member *create_queue_node(char *interface, int penalty)
+static struct member *create_queue_node(char *interface, int penalty, int paused)
 {
        struct member *cur;
        
@@ -1627,6 +1663,7 @@ static struct member *create_queue_node(char *interface, int penalty)
        if (cur) {
                memset(cur, 0, sizeof(struct member));
                cur->penalty = penalty;
+               cur->paused = paused;
                strncpy(cur->interface, interface, sizeof(cur->interface) - 1);
                if (!strchr(cur->interface, '/'))
                        ast_log(LOG_WARNING, "No location at interface '%s'\n", interface);
@@ -1638,7 +1675,7 @@ static struct member *create_queue_node(char *interface, int penalty)
 
 /* Dump all members in a specific queue to the databse
  *
- * <pm_family>/<queuename> = <interface>;<penalty>;...
+ * <pm_family>/<queuename> = <interface>;<penalty>;<paused>;...
  *
  */
 static void dump_queue_members(struct ast_call_queue *pm_queue)
@@ -1655,7 +1692,7 @@ static void dump_queue_members(struct ast_call_queue *pm_queue)
                while (cur_member) {
                        if (cur_member->dynamic) {
                                value_len = strlen(value);
-                               res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;", cur_member->interface, cur_member->penalty);
+                               res = snprintf(value+value_len, sizeof(value)-value_len, "%s;%d;%d;", cur_member->interface, cur_member->penalty, cur_member->paused);
                                if (res != strlen(value + value_len)) {
                                        ast_log(LOG_WARNING, "Could not create persistent member string, out of space\n");
                                        break;
@@ -1721,7 +1758,7 @@ static int remove_from_queue(char *queuename, char *interface)
        return res;
 }
 
-static int add_to_queue(char *queuename, char *interface, int penalty)
+static int add_to_queue(char *queuename, char *interface, int penalty, int paused)
 {
        struct ast_call_queue *q;
        struct member *new_member;
@@ -1732,7 +1769,7 @@ static int add_to_queue(char *queuename, char *interface, int penalty)
                ast_mutex_lock(&q->lock);
                if (!strcmp(q->name, queuename)) {
                        if (interface_exists(q, interface) == NULL) {
-                               new_member = create_queue_node(interface, penalty);
+                               new_member = create_queue_node(interface, penalty, paused);
 
                                if (new_member != NULL) {
                                        new_member->dynamic = 1;
@@ -1745,9 +1782,10 @@ static int add_to_queue(char *queuename, char *interface, int penalty)
                                                "Penalty: %d\r\n"
                                                "CallsTaken: %d\r\n"
                                                "LastCall: %ld\r\n"
-                                               "Status: %d\r\n",
+                                               "Status: %d\r\n"
+                                               "Paused: %d\r\n",
                                        q->name, new_member->interface, new_member->dynamic ? "dynamic" : "static",
-                                       new_member->penalty, new_member->calls, new_member->lastcall, new_member->status);
+                                       new_member->penalty, new_member->calls, new_member->lastcall, new_member->status, new_member->paused);
                                        
                                        if (queue_persistent_members)
                                                dump_queue_members(q);
@@ -1768,6 +1806,49 @@ static int add_to_queue(char *queuename, char *interface, int penalty)
        return res;
 }
 
+static int set_member_paused(char *queuename, char *interface, int paused)
+{
+       int found = 0;
+       struct ast_call_queue *q;
+       struct member *mem;
+
+       /* Special event for when all queues are paused - individual events still generated */
+
+       if (ast_strlen_zero(queuename))
+               ast_queue_log("NONE", "NONE", interface, (paused ? "PAUSEALL" : "UNPAUSEALL"), "%s", "");
+
+       ast_mutex_lock(&qlock);
+       for (q = queues ; q ; q = q->next) {
+               ast_mutex_lock(&q->lock);
+               if (ast_strlen_zero(queuename) || !strcmp(q->name, queuename)) {
+                       if ((mem = interface_exists(q, interface))) {
+                               found++;
+                               if (mem->paused == paused)
+                                       ast_log(LOG_DEBUG, "%spausing already-%spaused queue member %s:%s\n", (paused ? "" : "un"), (paused ? "" : "un"), q->name, interface);
+                               mem->paused = paused;
+
+                               if (queue_persistent_members)
+                                   dump_queue_members(q);
+
+                               ast_queue_log(q->name, "NONE", interface, (paused ? "PAUSE" : "UNPAUSE"), "%s", "");
+
+                               manager_event(EVENT_FLAG_AGENT, "QueueMemberPaused",
+                                       "Queue: %s\r\n"
+                                       "Location: %s\r\n"
+                                       "Paused: %d\r\n",
+                                               q->name, mem->interface, paused);
+                       }
+               }
+               ast_mutex_unlock(&q->lock);
+       }
+       ast_mutex_unlock(&qlock);
+
+       if (found)
+               return RESULT_SUCCESS;
+       else
+               return RESULT_FAILURE;
+}
+
 /* Add members saved in the queue members DB file saves
  * created by dump_queue_members(), back into the queues */
 static void reload_queue_members(void)
@@ -1777,6 +1858,8 @@ static void reload_queue_members(void)
        char *pm_interface;
        char *pm_penalty_tok;
        int pm_penalty = 0;
+       char *pm_paused_tok;
+       int pm_paused = 0;
        struct ast_db_entry *pm_db_tree = NULL;
        int pm_family_len = 0;
        struct ast_call_queue *cur_queue = NULL;
@@ -1813,12 +1896,13 @@ static void reload_queue_members(void)
                        ast_mutex_unlock(&cur_queue->lock);
 
                if (!ast_db_get(pm_family, pm_queue_name, queue_data, PM_MAX_LEN)) {
-                       /* Parse each <interface>;<penalty>; from the value of the
-                        * queuename key and add it to the respective queue */
                        cur_pm_ptr = queue_data;
                        while ((pm_interface = strsep(&cur_pm_ptr, ";"))) {
+                               /* On the last iteration, pm_interface is a pointer to an empty string. Don't report a spurious error. */
+                               if (pm_interface[0] == 0)
+                                       break;
                                if (!(pm_penalty_tok = strsep(&cur_pm_ptr, ";"))) {
-                                       ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s'\n", pm_queue_name);
+                                       ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (penalty)\n", pm_queue_name);
                                        break;
                                }
                                pm_penalty = strtol(pm_penalty_tok, NULL, 10);
@@ -1826,11 +1910,26 @@ static void reload_queue_members(void)
                                        ast_log(LOG_WARNING, "Error converting penalty: %s: Out of range.\n", pm_penalty_tok);
                                        break;
                                }
+
+                               /* If ptr[1] is ';', the string is 1 char long and can't be an interface */
+
+                               if (cur_pm_ptr[1] == ';') {
+                                       if (!(pm_paused_tok = strsep(&cur_pm_ptr, ";"))) {
+                                               ast_log(LOG_WARNING, "Error parsing corrupted Queue DB string for '%s' (paused)\n", pm_queue_name);
+                                               break;
+                                       }
+                                       pm_paused = strtol(pm_paused_tok, NULL, 10);
+                                       if ((errno == ERANGE) || (pm_paused < 0 || pm_paused > 1)) {
+                                               ast_log(LOG_WARNING, "Error converting paused: %s: Expected 0 or 1.\n", pm_paused_tok);
+                                               break;
+                                       }
+                               } else if (option_debug)
+                                       ast_verbose(VERBOSE_PREFIX_3 "Found old-format queue member %s:%s\n", pm_queue_name, pm_interface);
        
                                if (option_debug)
-                                       ast_log(LOG_DEBUG, "Reload Members: Queue: %s  Member: %s  Penalty: %d\n", pm_queue_name, pm_interface, pm_penalty);
+                                       ast_log(LOG_DEBUG, "Reload Members: Queue: %s  Member: %s  Penalty: %d  Paused: %d\n", pm_queue_name, pm_interface, pm_penalty, pm_paused);
        
-                               if (add_to_queue(pm_queue_name, pm_interface, pm_penalty) == RES_OUTOFMEMORY) {
+                               if (add_to_queue(pm_queue_name, pm_interface, pm_penalty, pm_paused) == RES_OUTOFMEMORY) {
                                        ast_log(LOG_ERROR, "Out of Memory when loading queue member from astdb\n");
                                        break;
                                }
@@ -1848,6 +1947,90 @@ static void reload_queue_members(void)
        }
 }
 
+static int pqm_exec(struct ast_channel *chan, void *data)
+{
+       struct localuser *u;
+       char *queuename, *interface;
+
+       if (!data) {
+               ast_log(LOG_WARNING, "PauseQueueMember requires an argument ([queuename]|interface])\n");
+               return -1;
+       }
+
+       queuename = ast_strdupa((char *)data);
+       if (!queuename) {
+               ast_log(LOG_ERROR, "Out of memory\n");
+               return -1;
+       }
+
+       interface = strchr(queuename, '|');
+       if (!interface) {
+               ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n");
+               return -1;
+       }
+
+       LOCAL_USER_ADD(u);
+
+       *interface = '\0';
+       interface++;
+
+       if (set_member_paused(queuename, interface, 1)) {
+               ast_log(LOG_WARNING, "Attempt to pause interface %s, not found\n", interface);
+               if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) {
+                       chan->priority += 100;
+                       LOCAL_USER_REMOVE(u);
+                       return 0;
+               }
+               return -1;
+       }
+
+       LOCAL_USER_REMOVE(u);
+
+       return 0;
+}
+
+static int upqm_exec(struct ast_channel *chan, void *data)
+{
+       struct localuser *u;
+       char *queuename, *interface;
+
+       if (!data) {
+               ast_log(LOG_WARNING, "UnpauseQueueMember requires an argument ([queuename]|interface])\n");
+               return -1;
+       }
+
+       queuename = ast_strdupa((char *)data);
+       if (!queuename) {
+               ast_log(LOG_ERROR, "Out of memory\n");
+               return -1;
+       }
+
+       interface = strchr(queuename, '|');
+       if (!interface) {
+               ast_log(LOG_WARNING, "Missing interface argument to PauseQueueMember ([queuename]|interface])\n");
+               return -1;
+       }
+
+       LOCAL_USER_ADD(u);
+
+       *interface = '\0';
+       interface++;
+
+       if (set_member_paused(queuename, interface, 0)) {
+               ast_log(LOG_WARNING, "Attempt to unpause interface %s, not found\n", interface);
+               if (ast_exists_extension(chan, chan->context, chan->exten, chan->priority + 101, chan->cid.cid_num)) {
+                       chan->priority += 100;
+                       LOCAL_USER_REMOVE(u);
+                       return 0;
+               }
+               return -1;
+       }
+
+       LOCAL_USER_REMOVE(u);
+
+       return 0;
+}
+
 static int rqm_exec(struct ast_channel *chan, void *data)
 {
        int res=-1;
@@ -1962,7 +2145,7 @@ static int aqm_exec(struct ast_channel *chan, void *data)
                }
        }
 
-       switch (add_to_queue(queuename, interface, penalty)) {
+       switch (add_to_queue(queuename, interface, penalty, 0)) {
        case RES_OKAY:
                ast_log(LOG_NOTICE, "Added interface '%s' to queue '%s'\n", interface, queuename);
                res = 0;
@@ -2502,6 +2685,8 @@ static int __queues_show(int fd, int argc, char **argv, int queue_show)
                                        max[0] = '\0';
                                if (mem->dynamic)
                                        strncat(max, " (dynamic)", sizeof(max) - strlen(max) - 1);
+                               if (mem->paused)
+                                       strncat(max, " (paused)", sizeof(max) - strlen(max) - 1);
                                if (mem->status)
                                        snprintf(max + strlen(max), sizeof(max) - strlen(max), " (%s)", status2str(mem->status, tmpbuf, sizeof(tmpbuf)));
                                if (mem->calls) {
@@ -2615,10 +2800,11 @@ static int manager_queues_status( struct mansession *s, struct message *m )
                                "CallsTaken: %d\r\n"
                                "LastCall: %ld\r\n"
                                "Status: %d\r\n"
+                               "Paused: %d\r\n"
                                "%s"
                                "\r\n",
                                        q->name, mem->interface, mem->dynamic ? "dynamic" : "static",
-                                       mem->penalty, mem->calls, mem->lastcall, mem->status, idText);
+                                       mem->penalty, mem->calls, mem->lastcall, mem->status, mem->paused, idText);
 
                /* List Queue Entries */
 
@@ -2646,12 +2832,13 @@ static int manager_queues_status( struct mansession *s, struct message *m )
 
 static int manager_add_queue_member(struct mansession *s, struct message *m)
 {
-       char *queuename, *interface, *penalty_s;
-       int penalty = 0;
+       char *queuename, *interface, *penalty_s, *paused_s;
+       int paused, penalty = 0;
 
        queuename = astman_get_header(m, "Queue");
        interface = astman_get_header(m, "Interface");
        penalty_s = astman_get_header(m, "Penalty");
+       paused_s = astman_get_header(m, "Paused");
 
        if (ast_strlen_zero(queuename)) {
                astman_send_error(s, m, "'Queue' not specified.");
@@ -2669,7 +2856,12 @@ static int manager_add_queue_member(struct mansession *s, struct message *m)
                penalty = 0;
        }
 
-       switch (add_to_queue(queuename, interface, penalty)) {
+       if (ast_strlen_zero(paused_s))
+               paused = 0;
+       else
+               paused = abs(ast_true(paused_s));
+
+       switch (add_to_queue(queuename, interface, penalty, paused)) {
        case RES_OKAY:
                astman_send_ack(s, m, "Added interface to queue");
                break;
@@ -2715,6 +2907,33 @@ static int manager_remove_queue_member(struct mansession *s, struct message *m)
        return 0;
 }
 
+static int manager_pause_queue_member(struct mansession *s, struct message *m)
+{
+       char *queuename, *interface, *paused_s;
+       int paused;
+
+       interface = astman_get_header(m, "Interface");
+       paused_s = astman_get_header(m, "Paused");
+       queuename = astman_get_header(m, "Queue");      /* Optional - if not supplied, pause the given Interface in all queues */
+
+       if (ast_strlen_zero(interface) || ast_strlen_zero(paused_s)) {
+               astman_send_error(s, m, "Need 'Interface' and 'Paused' parameters.");
+               return 0;
+       }
+
+       paused = abs(ast_true(paused_s));
+
+       if (set_member_paused(queuename, interface, paused))
+               astman_send_error(s, m, "Interface not found");
+       else
+               if (paused)
+                       astman_send_ack(s, m, "Interface paused successfully");
+               else
+                       astman_send_ack(s, m, "Interface unpaused successfully");
+
+       return 0;
+}
+
 static int handle_add_queue_member(int fd, int argc, char *argv[])
 {
        char *queuename, *interface;
@@ -2744,7 +2963,7 @@ static int handle_add_queue_member(int fd, int argc, char *argv[])
                penalty = 0;
        }
 
-       switch (add_to_queue(queuename, interface, penalty)) {
+       switch (add_to_queue(queuename, interface, penalty, 0)) {
        case RES_OKAY:
                ast_cli(fd, "Added interface '%s' to queue '%s'\n", interface, queuename);
                return RESULT_SUCCESS;
@@ -2909,9 +3128,12 @@ int unload_module(void)
        ast_manager_unregister("QueueStatus");
        ast_manager_unregister("QueueAdd");
        ast_manager_unregister("QueueRemove");
+       ast_manager_unregister("QueuePause");
        ast_devstate_del(statechange_queue, NULL);
        ast_unregister_application(app_aqm);
        ast_unregister_application(app_rqm);
+       ast_unregister_application(app_pqm);
+       ast_unregister_application(app_upqm);
        return ast_unregister_application(app);
 }
 
@@ -2929,8 +3151,11 @@ int load_module(void)
                ast_manager_register( "QueueStatus", 0, manager_queues_status, "Queue Status" );
                ast_manager_register( "QueueAdd", EVENT_FLAG_AGENT, manager_add_queue_member, "Add interface to queue." );
                ast_manager_register( "QueueRemove", EVENT_FLAG_AGENT, manager_remove_queue_member, "Remove interface from queue." );
+               ast_manager_register( "QueuePause", EVENT_FLAG_AGENT, manager_pause_queue_member, "Makes a queue member temporarily unavailable" );
                ast_register_application(app_aqm, aqm_exec, app_aqm_synopsis, app_aqm_descrip) ;
                ast_register_application(app_rqm, rqm_exec, app_rqm_synopsis, app_rqm_descrip) ;
+               ast_register_application(app_pqm, pqm_exec, app_pqm_synopsis, app_pqm_descrip) ;
+               ast_register_application(app_upqm, upqm_exec, app_upqm_synopsis, app_upqm_descrip) ;
        }
        reload_queues();