app_queue: Make ordering of rrmemory/rrordered persist over add/remove members
[asterisk/asterisk.git] / apps / app_queue.c
index 742636a..f7ea7f3 100644 (file)
@@ -1161,6 +1161,7 @@ struct member {
        int realtime;                        /*!< Is this member realtime? */
        int status;                          /*!< Status of queue member */
        int paused;                          /*!< Are we paused (not accepting calls)? */
+       int queuepos;                        /*!< In what order (pertains to certain strategies) should this member be called? */
        time_t lastcall;                     /*!< When last successful call was hungup */
        struct call_queue *lastqueue;        /*!< Last queue we received a call */
        unsigned int dead:1;                 /*!< Used to detect members deleted in realtime */
@@ -1397,6 +1398,60 @@ static int queue_cmp_cb(void *obj, void *arg, int flags)
        return !strcasecmp(q->name, q2->name) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+/*! \internal
+ * \brief ao2_callback, Decreases queuepos of all followers with a queuepos greater than arg.
+ * \param obj the member being acted on
+ * \param arg pointer to an integer containing the position value that was removed and requires reduction for anything above
+ */
+static int queue_member_decrement_followers(void *obj, void *arg, int flag)
+{
+       struct member *mem = obj;
+       int *decrement_followers_after = arg;
+
+       if (mem->queuepos > *decrement_followers_after) {
+               mem->queuepos--;
+       }
+
+       return 0;
+}
+
+/*! \internal
+ * \brief ao2_callback, finds members in a queue marked for deletion and in a cascading fashion runs queue_member_decrement_followers
+ *        on them. This callback should always be ran before performing mass unlinking of delmarked members from queues.
+ * \param obj member being acted on
+ * \param arg pointer to the queue members are being removed from
+ */
+static int queue_delme_members_decrement_followers(void *obj, void *arg, int flag)
+{
+       struct member *mem = obj;
+       struct call_queue *queue = arg;
+       int rrpos = mem->queuepos;
+
+       if (mem->delme) {
+               ao2_callback(queue->members, OBJ_NODATA | OBJ_MULTIPLE, queue_member_decrement_followers, &rrpos);
+       }
+
+       return 0;
+}
+
+/*! \internal
+ * \brief Use this to decrement followers during removal of a member
+ * \param queue which queue the member is being removed from
+ * \param mem which member is being removed from the queue
+ */
+static void queue_member_follower_removal(struct call_queue *queue, struct member *mem)
+{
+       int pos = mem->queuepos;
+
+       /* If the position being removed is less than the current place in the queue, reduce the queue position by one so that we don't skip the member
+        * who would have been next otherwise. */
+       if (pos < queue->rrpos) {
+               queue->rrpos--;
+       }
+
+       ao2_callback(queue->members, OBJ_NODATA | OBJ_MULTIPLE, queue_member_decrement_followers, &pos);
+}
+
 #ifdef REF_DEBUG_ONLY_QUEUES
 #define queue_ref(q)                           _queue_ref(q, "", __FILE__, __LINE__, __PRETTY_FUNCTION__)
 #define queue_unref(q)                         _queue_unref(q, "", __FILE__, __LINE__, __PRETTY_FUNCTION__)
@@ -2396,6 +2451,34 @@ static void queue_set_param(struct call_queue *q, const char *param, const char
        }
 }
 
+/*! \internal
+ * \brief If adding a single new member to a queue, use this function instead of ao2_linking.
+ *        This adds round robin queue position data for a fresh member as well as links it.
+ * \param queue Which queue the member is being added to
+ * \param mem Which member is being added to the queue
+ */
+static void member_add_to_queue(struct call_queue *queue, struct member *mem)
+{
+       ao2_lock(queue->members);
+       mem->queuepos = ao2_container_count(queue->members);
+       ao2_link(queue->members, mem);
+       ao2_unlock(queue->members);
+}
+
+/*! \internal
+ * \brief If removing a single member from a queue, use this function instead of ao2_unlinking.
+ *        This will perform round robin queue position reordering for the remaining members.
+ * \param queue Which queue the member is being removed from
+ * \param member Which member is being removed from the queue
+ */
+static void member_remove_from_queue(struct call_queue *queue, struct member *mem)
+{
+       ao2_lock(queue->members);
+       queue_member_follower_removal(queue, mem);
+       ao2_unlink(queue->members, mem);
+       ao2_unlock(queue->members);
+}
+
 /*!
  * \brief Find rt member record to update otherwise create one.
  *
@@ -2482,7 +2565,7 @@ static void rt_handle_member_record(struct call_queue *q, char *interface, struc
                        } else {
                                ast_queue_log(q->name, "REALTIME", m->membername, "ADDMEMBER", "%s", paused ? "PAUSED" : "");
                        }
-                       ao2_link(q->members, m);
+                       member_add_to_queue(q, m);
                        ao2_ref(m, -1);
                        m = NULL;
                }
@@ -2498,7 +2581,7 @@ static void free_members(struct call_queue *q, int all)
 
        while ((cur = ao2_iterator_next(&mem_iter))) {
                if (all || !cur->dynamic) {
-                       ao2_unlink(q->members, cur);
+                       member_remove_from_queue(q, cur);
                }
                ao2_ref(cur, -1);
        }
@@ -2667,7 +2750,7 @@ static struct call_queue *find_queue_by_name_rt(const char *queuename, struct as
                        } else {
                                ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
                        }
-                       ao2_unlink(q->members, m);
+                       member_remove_from_queue(q, m);
                }
                ao2_ref(m, -1);
        }
@@ -2775,7 +2858,7 @@ static void update_realtime_members(struct call_queue *q)
                mem_iter = ao2_iterator_init(q->members, 0);
                while ((m = ao2_iterator_next(&mem_iter))) {
                        if (m->realtime) {
-                               ao2_unlink(q->members, m);
+                               member_remove_from_queue(q, m);
                        }
                        ao2_ref(m, -1);
                }
@@ -2809,7 +2892,7 @@ static void update_realtime_members(struct call_queue *q)
                        } else {
                                ast_queue_log(q->name, "REALTIME", m->membername, "REMOVEMEMBER", "%s", "");
                        }
-                       ao2_unlink(q->members, m);
+                       member_remove_from_queue(q, m);
                }
                ao2_ref(m, -1);
        }
@@ -4724,6 +4807,7 @@ static int calc_metric(struct call_queue *q, struct member *mem, int pos, struct
                break;
        case QUEUE_STRATEGY_RRORDERED:
        case QUEUE_STRATEGY_RRMEMORY:
+               pos = mem->queuepos;
                if (pos < q->rrpos) {
                        tmp->metric = 1000 + pos;
                } else {
@@ -5937,7 +6021,7 @@ static int remove_from_queue(const char *queuename, const char *interface)
                                "Location: %s\r\n"
                                "MemberName: %s\r\n",
                                q->name, mem->interface, mem->membername);
-                       ao2_unlink(q->members, mem);
+                       member_remove_from_queue(q, mem);
                        ao2_ref(mem, -1);
 
                        if (queue_persistent_members) {
@@ -5983,7 +6067,7 @@ static int add_to_queue(const char *queuename, const char *interface, const char
                if ((new_member = create_queue_member(interface, membername, penalty, paused, state_interface, q->ringinuse))) {
                        new_member->ringinuse = q->ringinuse;
                        new_member->dynamic = 1;
-                       ao2_link(q->members, new_member);
+                       member_add_to_queue(q, new_member);
                        /*** DOCUMENTATION
                        <managerEventInstance>
                                <synopsis>Raised when a member is added to the queue.</synopsis>
@@ -7732,9 +7816,20 @@ static void reload_single_member(const char *memberdata, struct call_queue *q)
 
        /* Find the old position in the list */
        ast_copy_string(tmpmem.interface, interface, sizeof(tmpmem.interface));
-       cur = ao2_find(q->members, &tmpmem, OBJ_POINTER | OBJ_UNLINK);
+       cur = ao2_find(q->members, &tmpmem, OBJ_POINTER);
+
        if ((newm = create_queue_member(interface, membername, penalty, cur ? cur->paused : 0, state_interface, ringinuse))) {
-               ao2_link(q->members, newm);
+               if (cur) {
+                       /* Round Robin Queue Position must be copied if this is replacing an existing member */
+                       ao2_lock(q->members);
+                       newm->queuepos = cur->queuepos;
+                       ao2_link(q->members, newm);
+                       ao2_unlink(q->members, cur);
+                       ao2_unlock(q->members);
+               } else {
+                       /* Otherwise we need to add using the function that will apply a round robin queue position manually. */
+                       member_add_to_queue(q, newm);
+               }
                ao2_ref(newm, -1);
        }
        newm = NULL;
@@ -7868,7 +7963,10 @@ static void reload_single_queue(struct ast_config *cfg, struct ast_flags *mask,
 
        /* Free remaining members marked as delme */
        if (member_reload) {
+               ao2_lock(q->members);
+               ao2_callback(q->members, OBJ_NODATA | OBJ_MULTIPLE, queue_delme_members_decrement_followers, q);
                ao2_callback(q->members, OBJ_NODATA | OBJ_MULTIPLE | OBJ_UNLINK, kill_dead_members, q);
+               ao2_unlock(q->members);
        }
 
        if (new) {