taskprocessor: Enable subsystems and overload by subsystem
authorGeorge Joseph <gjoseph@digium.com>
Fri, 15 Feb 2019 18:53:50 +0000 (11:53 -0700)
committerGeorge Joseph <gjoseph@digium.com>
Wed, 20 Feb 2019 17:51:08 +0000 (11:51 -0600)
To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.

* Any taskprocessor name that has a '/' will have the part
  before the '/' saved as its "subsystem".
  Examples:
  "sorcery/acl-0000006a" and "sorcery/aor-00000019"
  will be grouped to subsystem "sorcery".
  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"
  will bn grouped to subsystem "pjsip".
  Taskprocessors with no '/' have an empty subsystem.

* When a taskprocessor enters high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will
  be incremented.

* When a taskprocessor leaves high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will be
  decremented.

* A new api ast_taskprocessor_get_subsystem_alert() has been
  added that returns the number of taskprocessors in alert for
  the subsystem.

* A new CLI command "core show taskprocessor alerted subsystems"
  has been added.

* A new unit test was addded.

REMINDER: The taskprocessor code itself doesn't take any action
based on high-water alerts or overloading.  It's up to taskprocessor
users to check and take action themselves.  Currently only the pjsip
distributor does this.

* A new pjsip/global option "taskprocessor_overload_trigger"
  has been added that allows the user to select the trigger
  mechanism the distributor uses to pause accepting new requests.
  "none": Don't pause on any overload condition.
  "global": Pause on ANY taskprocessor overload (the default and
  current behavior)
  "pjsip_only": Pause only on pjsip taskprocessor overloads.

* The core pjsip pool was renamed from "SIP" to "pjsip" so it can
  be properly grouped into the "pjsip" subsystem.

* stasis taskprocessor names were changed to "stasis" as the
  subsystem.

* Sorcery core taskprocessor names were changed to "sorcery" to
  match the object taskprocessors.

Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56

13 files changed:
CHANGES
configs/samples/pjsip.conf.sample
contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py [new file with mode: 0644]
include/asterisk/taskprocessor.h
main/sorcery.c
main/stasis.c
main/taskprocessor.c
main/threadpool.c
res/res_pjsip.c
res/res_pjsip/config_global.c
res/res_pjsip/include/res_pjsip_private.h
res/res_pjsip/pjsip_distributor.c
tests/test_taskprocessor.c

diff --git a/CHANGES b/CHANGES
index 78e019e..7ae6000 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -83,6 +83,15 @@ ARI
    types defined in the "disallowed" list are not sent to the application. Note
    that if a type is specified in both lists "disallowed" takes precedence.
 
+res_pjsip
+------------------
+ * A new configuration parameter "taskprocessor_overload_trigger" has been
+   added to the pjsip.conf "globals" section.  The distributor currently stops
+   accepting new requests when any taskprocessor overload is triggered.  The
+   new option allows you to completely disable overload detection (NOT
+   RECOMMENDED), keep the current behavior, or trigger only on pjsip
+   taskprocessor overloads.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 16.1.0 to Asterisk 16.2.0 ------------
 ------------------------------------------------------------------------------
index 0ed01f0..29f53a5 100644 (file)
                     ; event when a device refreshes its registration
                     ; (default: "no")
 
+;taskprocessor_overload_trigger=global
+                ; Set the trigger the distributor will use to detect
+                ; taskprocessor overloads.  When triggered, the distributor
+                ; will not accept any new requests until the overload has
+                ; cleared.
+                : "global": (default) Any taskprocessor overload will trigger.
+                ; "pjsip_only": Only pjsip taskprocessor overloads will trigger.
+                ; "none":  No overload detection will be performed.
+                ; WARNING: The "none" and "pjsip_only" options should be used
+                ; with extreme caution and only to mitigate specific issues.
+                ; Under certain conditions they could make things worse.
 
 ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
 ;==========================ACL SECTION OPTIONS=========================
diff --git a/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py b/contrib/ast-db-manage/config/versions/f3c0b8695b66_taskprocessor_overload_trigger.py
new file mode 100644 (file)
index 0000000..6a5b9b2
--- /dev/null
@@ -0,0 +1,42 @@
+"""taskprocessor_overload_trigger
+
+Revision ID: f3c0b8695b66
+Revises: 0838f8db6a61
+Create Date: 2019-02-15 15:03:50.106790
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'f3c0b8695b66'
+down_revision = '0838f8db6a61'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values'
+PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only']
+
+def upgrade():
+    context = op.get_context()
+
+    if context.bind.dialect.name == 'postgresql':
+        enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+                    name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+        enum.create(op.get_bind(), checkfirst=False)
+
+    op.add_column('ps_globals',
+        sa.Column('taskprocessor_overload_trigger',
+            sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+            name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME,
+            create_type=False)))
+
+def downgrade():
+    if op.get_context().bind.dialect.name == 'mssql':
+        op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals')
+    op.drop_column('ps_globals', 'taskprocessor_overload_trigger')
+
+    if context.bind.dialect.name == 'postgresql':
+        enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
+                    name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
+        enum.drop(op.get_bind(), checkfirst=False)
index f74989a..5278595 100644 (file)
@@ -341,6 +341,19 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps);
  */
 unsigned int ast_taskprocessor_alert_get(void);
 
+
+/*!
+ * \brief Get the current taskprocessor high water alert count by sybsystem.
+ * \since 13.26.0
+ * \since 16.3.0
+ *
+ * \param subsystem The subsystem name
+ *
+ * \retval 0 if no taskprocessors are in high water alert.
+ * \retval non-zero if some task processors are in high water alert.
+ */
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
+
 /*!
  * \brief Set the high and low alert water marks of the given taskprocessor queue.
  * \since 13.10.0
index beaad21..8e14881 100644 (file)
@@ -380,7 +380,7 @@ int ast_sorcery_init(void)
        };
        ast_assert(wizards == NULL);
 
-       threadpool = ast_threadpool_create("Sorcery", NULL, &options);
+       threadpool = ast_threadpool_create("sorcery", NULL, &options);
        if (!threadpool) {
                return -1;
        }
index f05f5ff..204e7c8 100644 (file)
@@ -677,7 +677,7 @@ struct stasis_subscription *internal_stasis_subscribe(
                char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
 
                /* Create name with seq number appended. */
-               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
                        use_thread_pool ? 'p' : 'm',
                        stasis_topic_name(topic));
 
@@ -2593,7 +2593,7 @@ int stasis_init(void)
        threadpool_opts.auto_increment = 1;
        threadpool_opts.max_size = cfg->threadpool_options->max_size;
        threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
-       pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
+       pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
        ao2_ref(cfg, -1);
        if (!pool) {
                ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
index 30aeddb..9ebbf39 100644 (file)
@@ -89,7 +89,11 @@ struct ast_taskprocessor {
        unsigned int high_water_alert:1;
        /*! Indicates if the taskprocessor is currently suspended */
        unsigned int suspended:1;
-       /*! \brief Friendly name of the taskprocessor */
+       /*! \brief Anything before the first '/' in the name (if there is one) */
+       char *subsystem;
+       /*! \brief Friendly name of the taskprocessor.
+        * Subsystem is appended after the name's NULL terminator.
+        */
        char name[0];
 };
 
@@ -112,6 +116,16 @@ struct ast_taskprocessor_listener {
        void *user_data;
 };
 
+/*!
+ * Keep track of which subsystems are in alert
+ * and how many of their taskprocessors are overloaded.
+ */
+struct subsystem_alert {
+       unsigned int alert_count;
+       char subsystem[0];
+};
+static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
+
 #ifdef LOW_MEMORY
 #define TPS_MAX_BUCKETS 61
 #else
@@ -138,10 +152,12 @@ static int tps_ping_handler(void *datap);
 
 static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
 
 static struct ast_cli_entry taskprocessor_clis[] = {
        AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
        AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
+       AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
 };
 
 struct default_taskprocessor_listener_pvt {
@@ -271,6 +287,8 @@ static const struct ast_taskprocessor_listener_callbacks default_listener_callba
 static void tps_shutdown(void)
 {
        ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
+       AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
+       AST_VECTOR_RW_FREE(&overloaded_subsystems);
        ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
        tps_singletons = NULL;
 }
@@ -285,6 +303,12 @@ int ast_tps_init(void)
                return -1;
        }
 
+       if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
+               ao2_ref(tps_singletons, -1);
+               ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
+               return -1;
+       }
+
        ast_cond_init(&cli_ping_cond, NULL);
 
        ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@@ -548,6 +572,157 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
        return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
 }
 
+static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
+{
+       return !strcmp(alert->subsystem, subsystem);
+}
+
+static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
+{
+       return strcmp(a->subsystem, b->subsystem);
+}
+
+unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
+{
+       struct subsystem_alert *alert;
+       unsigned int count = 0;
+       int idx;
+
+       AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+       idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+       if (idx >= 0) {
+               alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+               count = alert->alert_count;
+       }
+       AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+       return count;
+}
+
+static void subsystem_alert_increment(const char *subsystem)
+{
+       struct subsystem_alert *alert;
+       int idx;
+
+       if (ast_strlen_zero(subsystem)) {
+               return;
+       }
+
+       AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+       idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+       if (idx >= 0) {
+               alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+               alert->alert_count++;
+               AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+               return;
+       }
+
+       alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
+       if (!alert) {
+               AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+               return;
+       }
+       alert->alert_count = 1;
+       strcpy(alert->subsystem, subsystem); /* Safe */
+
+       if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
+               ast_free(alert);
+       }
+       AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_alert_decrement(const char *subsystem)
+{
+       struct subsystem_alert *alert;
+       int idx;
+
+       if (ast_strlen_zero(subsystem)) {
+               return;
+       }
+
+       AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
+       idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
+       if (idx < 0) {
+               ast_log(LOG_ERROR,
+                       "Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
+               AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+               return;
+       }
+       alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
+
+       alert->alert_count--;
+       if (alert->alert_count <= 0) {
+               AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
+               ast_free(alert);
+       }
+
+       AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+}
+
+static void subsystem_copy(struct subsystem_alert *alert,
+       struct subsystem_alert_vector *vector)
+{
+       struct subsystem_alert *alert_copy;
+       alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
+       if (!alert_copy) {
+               return;
+       }
+       alert_copy->alert_count = alert->alert_count;
+       strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
+       if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
+               ast_free(alert_copy);
+       }
+}
+
+static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       struct subsystem_alert_vector sorted_subsystems;
+       int i;
+
+#define FMT_HEADERS_SUBSYSTEM          "%-32s %12s\n"
+#define FMT_FIELDS_SUBSYSTEM           "%-32s %12u\n"
+
+       switch (cmd) {
+       case CLI_INIT:
+               e->command = "core show taskprocessor alerted subsystems";
+               e->usage =
+                       "Usage: core show taskprocessor alerted subsystems\n"
+                       "       Shows a list of task processor subsystems that are currently alerted\n";
+               return NULL;
+       case CLI_GENERATE:
+               return NULL;
+       }
+
+       if (a->argc != e->args) {
+               return CLI_SHOWUSAGE;
+       }
+
+       if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
+               return CLI_FAILURE;
+       }
+
+       AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
+       for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
+               subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
+       }
+       AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
+
+       ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
+
+       for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
+               struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
+               ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
+       }
+
+       ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
+
+       AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
+       AST_VECTOR_FREE(&sorted_subsystems);
+
+       return CLI_SUCCESS;
+}
+
+
 /*! Count of the number of taskprocessors in high water alert. */
 static unsigned int tps_alert_count;
 
@@ -577,6 +752,15 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
                ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
                        tps->name, tps_alert_count ? "triggered" : "cleared");
        }
+
+       if (tps->subsystem[0] != '\0') {
+               if (delta > 0) {
+                       subsystem_alert_increment(tps->subsystem);
+               } else {
+                       subsystem_alert_decrement(tps->subsystem);
+               }
+       }
+
        ast_rwlock_unlock(&tps_alert_lock);
 }
 
@@ -747,8 +931,17 @@ static void *default_listener_pvt_alloc(void)
 static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
 {
        struct ast_taskprocessor *p;
+       char *subsystem_separator;
+       size_t subsystem_length = 0;
+       size_t name_length;
+
+       name_length = strlen(name);
+       subsystem_separator = strchr(name, '/');
+       if (subsystem_separator) {
+               subsystem_length = subsystem_separator - name;
+       }
 
-       p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
+       p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
        if (!p) {
                ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
                return NULL;
@@ -758,7 +951,9 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
        p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
        p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
 
-       strcpy(p->name, name); /*SAFE*/
+       strcpy(p->name, name); /* Safe */
+       p->subsystem = p->name + name_length + 1;
+       ast_copy_string(p->subsystem, name, subsystem_length + 1);
 
        ao2_ref(listener, +1);
        p->listener = listener;
index 2ab0936..56fbb2c 100644 (file)
@@ -413,7 +413,7 @@ static struct ast_threadpool *threadpool_alloc(const char *name, const struct as
                return NULL;
        }
 
-       ast_str_set(&control_tps_name, 0, "%s-control", name);
+       ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
 
        pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
        ast_free(control_tps_name);
@@ -919,6 +919,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
        struct ast_taskprocessor *tps;
        RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
        RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
+       char *fullname;
 
        pool = threadpool_alloc(name, options);
        if (!pool) {
@@ -935,7 +936,9 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
                return NULL;
        }
 
-       tps = ast_taskprocessor_create_with_listener(name, tps_listener);
+       fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
+       sprintf(fullname, "%s/pool", name); /* Safe */
+       tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
        if (!tps) {
                return NULL;
        }
index 4f18010..24796fc 100644 (file)
                                <configOption name="send_contact_status_on_update_registration" default="no">
                                        <synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis>
                                </configOption>
+                               <configOption name="taskprocessor_overload_trigger">
+                                       <synopsis>Trigger scope for taskprocessor overloads</synopsis>
+                                       <description><para>
+                                               This option specifies the trigger the distributor will use for
+                                               detecting taskprocessor overloads.  When it detects an overload condition,
+                                               the distrubutor will stop accepting new requests until the overload is
+                                               cleared.
+                                               </para>
+                                               <enumlist>
+                                                       <enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum>
+                                                       <enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum>
+                                                       <enum name="none"><para>No overload detection will be performed.</para></enum>
+                                               </enumlist>
+                                               <warning><para>
+                                               The "none" and "pjsip_only" options should be used
+                                               with extreme caution and only to mitigate specific issues.
+                                               Under certain conditions they could make things worse.
+                                               </para></warning>
+                                       </description>
+                               </configOption>
                        </configObject>
                </configFile>
        </configInfo>
@@ -5298,7 +5318,7 @@ static int load_module(void)
        /* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
        sip_get_threadpool_options(&options);
        options.thread_start = sip_thread_start;
-       sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
+       sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
        if (!sip_threadpool) {
                goto error;
        }
index 38383c5..8f21e50 100644 (file)
@@ -51,6 +51,7 @@
 #define DEFAULT_IGNORE_URI_USER_OPTIONS 0
 #define DEFAULT_USE_CALLERID_CONTACT 0
 #define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0
+#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
 
 /*!
  * \brief Cached global config object
@@ -110,6 +111,8 @@ struct global_config {
        unsigned int use_callerid_contact;
        /*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
        unsigned int send_contact_status_on_update_registration;
+       /*! Trigger the distributor should use to pause accepting new dialogs */
+       enum ast_sip_taskprocessor_overload_trigger overload_trigger;
 };
 
 static void global_destructor(void *obj)
@@ -483,6 +486,58 @@ unsigned int ast_sip_get_send_contact_status_on_update_registration(void)
        return send_contact_status_on_update_registration;
 }
 
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
+{
+       enum ast_sip_taskprocessor_overload_trigger trigger;
+       struct global_config *cfg;
+
+       cfg = get_global_cfg();
+       if (!cfg) {
+               return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
+       }
+
+       trigger = cfg->overload_trigger;
+       ao2_ref(cfg, -1);
+       return trigger;
+}
+
+static int overload_trigger_handler(const struct aco_option *opt,
+       struct ast_variable *var, void *obj)
+{
+       struct global_config *cfg = obj;
+       if (!strcasecmp(var->value, "none")) {
+               cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
+       } else if (!strcasecmp(var->value, "global")) {
+               cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
+       } else if (!strcasecmp(var->value, "pjsip_only")) {
+               cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
+       } else {
+               ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
+                               var->value, var->name);
+               return -1;
+       }
+       return 0;
+}
+
+static const char *overload_trigger_map[] = {
+       [TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
+       [TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
+       [TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
+};
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
+{
+       return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
+               overload_trigger_map[trigger] : "";
+}
+
+static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
+{
+       const struct global_config *cfg = obj;
+       *buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
+       return 0;
+}
+
 /*!
  * \internal
  * \brief Observer to set default global object if none exist.
@@ -646,6 +701,9 @@ int ast_sip_initialize_sorcery_global(void)
        ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
                DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
                OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
+       ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
+               overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
+               overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
 
        if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
                return -1;
index 7af5b27..f6333bf 100644 (file)
@@ -408,4 +408,14 @@ void ast_sip_destroy_transport_management(void);
  */
 int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
 
+enum ast_sip_taskprocessor_overload_trigger {
+       TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
+       TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
+       TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
+};
+
+enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
+
+const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
+
 #endif /* RES_PJSIP_PRIVATE_H_ */
index d356e37..72ed35b 100644 (file)
@@ -51,6 +51,7 @@ static unsigned int unidentified_count;
 static unsigned int unidentified_period;
 static unsigned int unidentified_prune_interval;
 static int using_auth_username;
+static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
 
 struct unidentified_request{
        struct timeval first_seen;
@@ -534,7 +535,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
                ao2_cleanup(dist);
                return PJ_TRUE;
        } else {
-               if (ast_taskprocessor_alert_get()) {
+               if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
+                       ast_taskprocessor_alert_get())
+                       || (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
+                       ast_taskprocessor_get_subsystem_alert("pjsip"))) {
                        /*
                         * When taskprocessors get backed up, there is a good chance that
                         * we are being overloaded and need to defer adding new work to
@@ -1196,6 +1200,8 @@ static void global_loaded(const char *object_type)
 
        ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
 
+       overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
+
        /* Clean out the old task, if any */
        ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
        /* Have to do something with the return value to shut up the stupid compiler. */
index 6428746..70cb556 100644 (file)
@@ -46,6 +46,8 @@ struct task_data {
        ast_mutex_t lock;
        /*! Boolean indicating that the task was run */
        int task_complete;
+       /*! Milliseconds to wait before returning */
+       unsigned long wait_time;
 };
 
 static void task_data_dtor(void *obj)
@@ -69,6 +71,7 @@ static struct task_data *task_data_create(void)
        ast_cond_init(&task_data->cond, NULL);
        ast_mutex_init(&task_data->lock);
        task_data->task_complete = 0;
+       task_data->wait_time = 0;
 
        return task_data;
 }
@@ -83,7 +86,11 @@ static struct task_data *task_data_create(void)
 static int task(void *data)
 {
        struct task_data *task_data = data;
+
        SCOPED_MUTEX(lock, &task_data->lock);
+       if (task_data->wait_time > 0) {
+               usleep(task_data->wait_time * 1000);
+       }
        task_data->task_complete = 1;
        ast_cond_signal(&task_data->cond);
        return 0;
@@ -165,6 +172,143 @@ AST_TEST_DEFINE(default_taskprocessor)
        return AST_TEST_PASS;
 }
 
+/*!
+ * \brief Baseline test for subsystem alert
+ */
+AST_TEST_DEFINE(subsystem_alert)
+{
+       RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
+#define TEST_DATA_ARRAY_SIZE 10
+#define LOW_WATER_MARK 3
+#define HIGH_WATER_MARK 6
+       struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
+       int res;
+       int i;
+       long queue_count;
+       unsigned int alert_level;
+       unsigned int subsystem_alert_level;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = "subsystem_alert";
+               info->category = "/main/taskprocessor/";
+               info->summary = "Test of subsystem alerts";
+               info->description =
+                       "Ensures alerts are generated properly.";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
+
+       if (!tps) {
+               ast_test_status_update(test, "Unable to create test taskprocessor\n");
+               return AST_TEST_FAIL;
+       }
+
+       ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
+       ast_taskprocessor_suspend(tps);
+
+       for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+               task_data[i] = task_data_create();
+               if (!task_data[i]) {
+                       ast_test_status_update(test, "Unable to create task_data\n");
+                       res = -1;
+                       goto data_cleanup;
+               }
+               task_data[i]->wait_time = 500;
+
+               ast_test_status_update(test, "Pushing task %d\n", i);
+               if (ast_taskprocessor_push(tps, task, task_data[i])) {
+                       ast_test_status_update(test, "Failed to queue task\n");
+                       res = -1;
+                       goto data_cleanup;
+               }
+
+               queue_count = ast_taskprocessor_size(tps);
+               alert_level = ast_taskprocessor_alert_get();
+               subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+               if (queue_count == HIGH_WATER_MARK) {
+                       if (subsystem_alert_level) {
+                               ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
+                       }
+                       if (alert_level) {
+                               ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
+                       }
+               } else if (queue_count < HIGH_WATER_MARK) {
+                       if (subsystem_alert_level > 0) {
+                               ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
+                               res = -1;
+                       }
+                       if (alert_level > 0) {
+                               ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
+                               res = -1;
+                       }
+               } else {
+                       if (subsystem_alert_level == 0) {
+                               ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
+                               res = -1;
+                       }
+                       if (alert_level == 0) {
+                               ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
+                               res = -1;
+                       }
+               }
+       }
+
+       ast_taskprocessor_unsuspend(tps);
+
+       for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+               ast_test_status_update(test, "Waiting on task %d\n", i);
+               if (task_wait(task_data[i])) {
+                       ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
+                       res = -1;
+                       goto data_cleanup;
+               }
+
+               queue_count = ast_taskprocessor_size(tps);
+               alert_level = ast_taskprocessor_alert_get();
+               subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
+
+               if (queue_count == LOW_WATER_MARK) {
+                       if (!subsystem_alert_level) {
+                               ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
+                       }
+                       if (!alert_level) {
+                               ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
+                       }
+               } else if (queue_count > LOW_WATER_MARK) {
+                       if (subsystem_alert_level == 0) {
+                               ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
+                               res = -1;
+                       }
+                       if (alert_level == 0) {
+                               ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
+                               res = -1;
+                       }
+               } else {
+                       if (subsystem_alert_level > 0) {
+                               ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
+                               res = -1;
+                       }
+                       if (alert_level > 0) {
+                               ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
+                               res = -1;
+                       }
+               }
+
+       }
+
+data_cleanup:
+       for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
+               ao2_cleanup(task_data[i]);
+       }
+
+       return res ? AST_TEST_FAIL : AST_TEST_PASS;
+}
+
 #define NUM_TASKS 20000
 
 /*!
@@ -749,6 +893,7 @@ static int unload_module(void)
 {
        ast_test_unregister(default_taskprocessor);
        ast_test_unregister(default_taskprocessor_load);
+       ast_test_unregister(subsystem_alert);
        ast_test_unregister(taskprocessor_listener);
        ast_test_unregister(taskprocessor_shutdown);
        ast_test_unregister(taskprocessor_push_local);
@@ -759,6 +904,7 @@ static int load_module(void)
 {
        ast_test_register(default_taskprocessor);
        ast_test_register(default_taskprocessor_load);
+       ast_test_register(subsystem_alert);
        ast_test_register(taskprocessor_listener);
        ast_test_register(taskprocessor_shutdown);
        ast_test_register(taskprocessor_push_local);