res_pjsip_mwi: fix unsolicited mwi blocks PJSIP stack
authorAlexei Gradinari <alex2grad@gmail.com>
Mon, 8 Aug 2016 17:53:32 +0000 (13:53 -0400)
committerAlexei Gradinari <alex2grad@gmail.com>
Mon, 8 Aug 2016 18:57:58 +0000 (13:57 -0500)
The PJSIP taskprocessors could be overflowed on startup
if there are many (thousands) realtime endpoints
configured with unsolicited mwi.
The PJSIP stack could be totally unresponsive for a few minutes
after boot completed.

This patch creates a separate PJSIP serializers pool for mwi
and makes unsolicited mwi use serializers from this pool.
This patch also adds 2 new global options to tune taskprocessor
alert levels: 'mwi_tps_queue_high' and 'mwi_tps_queue_low'.

This patch also adds new global option 'mwi_disable_initial_unsolicited'
to disable sending unsolicited mwi to all endpoints on startup.
If disabled then unsolicited mwi will start processing
on next endpoint's contact update.

ASTERISK-26230 #close

Change-Id: I4c8ecb82c249eb887930980a800c9f87f28f861a

CHANGES
configs/samples/pjsip.conf.sample
contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py [new file with mode: 0644]
include/asterisk/res_pjsip.h
res/res_pjsip.c
res/res_pjsip/config_global.c
res/res_pjsip_mwi.c

diff --git a/CHANGES b/CHANGES
index 3eadadf..b353e96 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -389,6 +389,15 @@ app_voicemail
    Additional information can be found in the sample configuration file at
    config/samples/voicemail.conf.sample.
 
+res_pjsip_mwi
+------------------
+ * Added "mwi_tps_queue_high" and "mwi_tps_queue_low" global configuration
+   options to tune taskprocessor alert levels.
+ * Added "mwi_disable_initial_unsolicited" global configuration option
+   to disable sending unsolicited MWI to all endpoints on startup.
+   Additional information can be found in the sample configuration file at
+   config/samples/pjsip.conf.sample.
+
 ------------------------------------------------------------------------------
 --- Functionality changes from Asterisk 13.10.0 to Asterisk 13.11.0 ----------
 ------------------------------------------------------------------------------
index 99bdfb9..eac0549 100644 (file)
                                 ; set to this value if there is no better option (such as
                                 ; auth/realm) to be used
 
+                    ; Asterisk Task Processor Queue Size
+                    ; On heavy loaded system with DB storage you may need to increase
+                    ; taskprocessor queue.
+                    ; If the taskprocessor queue size reached high water level,
+                    ; the alert is triggered.
+                    ; If the alert is set the pjsip distibutor stops processing incoming
+                    ; requests until the alert is cleared.
+                    ; The alert is cleared when taskprocessor queue size drops to the
+                    ; low water clear level.
+                    ; The next options set taskprocessor queue levels for MWI.
+;mwi_tps_queue_high=500 ; Taskprocessor high water alert trigger level.
+;mwi_tps_queue_low=450  ; Taskprocessor low water clear alert level.
+                    ; The default is -1 for 90% of high water level.
+
+                    ; Unsolicited MWI
+                    ; If there are endpoints configured with unsolicited MWI
+                    ; then res_pjsip_mwi module tries to send MWI to all endpoints on startup.
+;mwi_disable_initial_unsolicited=no ; Disable sending unsolicited mwi to all endpoints on startup.
+                    ; If disabled then unsolicited mwi will start processing
+                    ; on the endpoint's next contact update.
+
 ; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
 ;==========================ACL SECTION OPTIONS=========================
 ;[acl]
diff --git a/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py b/contrib/ast-db-manage/config/versions/c7a44a5a0851_pjsip_add_global_mwi_options.py
new file mode 100644 (file)
index 0000000..d3efa22
--- /dev/null
@@ -0,0 +1,35 @@
+"""pjsip: add global MWI options
+
+Revision ID: c7a44a5a0851
+Revises: 4a6c67fa9b7a
+Create Date: 2016-08-03 15:08:22.524727
+
+"""
+
+# revision identifiers, used by Alembic.
+revision = 'c7a44a5a0851'
+down_revision = '4a6c67fa9b7a'
+
+from alembic import op
+import sqlalchemy as sa
+from sqlalchemy.dialects.postgresql import ENUM
+
+YESNO_NAME = 'yesno_values'
+YESNO_VALUES = ['yes', 'no']
+
+
+def upgrade():
+    ############################# Enums ##############################
+
+    # yesno_values have already been created, so use postgres enum object
+    # type to get around "already created" issue - works okay with mysql
+    yesno_values = ENUM(*YESNO_VALUES, name=YESNO_NAME, create_type=False)
+
+    op.add_column('ps_globals', sa.Column('mwi_tps_queue_high', sa.Integer))
+    op.add_column('ps_globals', sa.Column('mwi_tps_queue_low', sa.Integer))
+    op.add_column('ps_globals', sa.Column('mwi_disable_initial_unsolicited', yesno_values))
+
+def downgrade():
+    op.drop_column('ps_globals', 'mwi_tps_queue_high')
+    op.drop_column('ps_globals', 'mwi_tps_queue_low')
+    op.drop_column('ps_globals', 'mwi_disable_initial_unsolicited')
index 9bb2a82..cd6b33d 100644 (file)
@@ -2417,6 +2417,32 @@ int ast_sip_register_supplement(struct ast_sip_supplement *supplement);
 void ast_sip_unregister_supplement(struct ast_sip_supplement *supplement);
 
 /*!
+ * \brief Retrieve the global MWI taskprocessor high water alert trigger level.
+ *
+ * \since 13.12.0
+ *
+ * \retval the system MWI taskprocessor high water alert trigger level
+ */
+unsigned int ast_sip_get_mwi_tps_queue_high(void);
+
+/*!
+ * \brief Retrieve the global MWI taskprocessor low water clear alert level.
+ *
+ * \since 13.12.0
+ *
+ * \retval the system MWI taskprocessor low water clear alert level
+ */
+int ast_sip_get_mwi_tps_queue_low(void);
+
+/*!
+ * \brief Retrieve the global setting 'disable sending unsolicited mwi on startup'.
+ * \since 13.12.0
+ *
+ * \retval non zero if disable.
+ */
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void);
+
+/*!
  * \brief Retrieve the system debug setting (yes|no|host).
  *
  * \note returned string needs to be de-allocated by caller.
index ffbf880..8a9a19d 100644 (file)
                                                set to this value if there is no better option (such as auth/realm) to be
                                                used.</synopsis>
                                </configOption>
+                               <configOption name="mwi_tps_queue_high" default="500">
+                                       <synopsis>MWI taskprocessor high water alert trigger level.</synopsis>
+                                       <description>
+                                               <para>On a heavily loaded system you may need to adjust the
+                                               taskprocessor queue limits.  If any taskprocessor queue size
+                                               reaches its high water level then pjsip will stop processing
+                                               new requests until the alert is cleared.  The alert clears
+                                               when all alerting taskprocessor queues have dropped to their
+                                               low water clear level.
+                                               </para>
+                                       </description>
+                               </configOption>
+                               <configOption name="mwi_tps_queue_low" default="-1">
+                                       <synopsis>MWI taskprocessor low water clear alert level.</synopsis>
+                                       <description>
+                                               <para>On a heavily loaded system you may need to adjust the
+                                               taskprocessor queue limits.  If any taskprocessor queue size
+                                               reaches its high water level then pjsip will stop processing
+                                               new requests until the alert is cleared.  The alert clears
+                                               when all alerting taskprocessor queues have dropped to their
+                                               low water clear level.
+                                               </para>
+                                               <note><para>Set to -1 for the low water level to be 90% of
+                                               the high water level.</para></note>
+                                       </description>
+                               </configOption>
+                               <configOption name="mwi_disable_initial_unsolicited" default="no">
+                                       <synopsis>Enable/Disable sending unsolicited MWI to all endpoints on startup.</synopsis>
+                                       <description>
+                                               <para>When the initial unsolicited MWI notification are
+                                               enabled on startup then the initial notifications
+                                               get sent at startup.  If you have a lot of endpoints
+                                               (thousands) that use unsolicited MWI then you may
+                                               want to consider disabling the initial startup
+                                               notifications.
+                                               </para>
+                                               <para>When the initial unsolicited MWI notifications are
+                                               disabled on startup then the notifications will start
+                                               on the endpoint's next contact update.
+                                               </para>
+                                       </description>
+                               </configOption>
                        </configObject>
                </configFile>
        </configInfo>
index 6bb6888..8a1b0d4 100644 (file)
@@ -24,6 +24,7 @@
 #include "asterisk/res_pjsip.h"
 #include "include/res_pjsip_private.h"
 #include "asterisk/sorcery.h"
+#include "asterisk/taskprocessor.h"
 #include "asterisk/ast_version.h"
 #include "asterisk/res_pjsip_cli.h"
 
@@ -43,6 +44,9 @@
 #define DEFAULT_UNIDENTIFIED_REQUEST_COUNT 5
 #define DEFAULT_UNIDENTIFIED_REQUEST_PERIOD 5
 #define DEFAULT_UNIDENTIFIED_REQUEST_PRUNE_INTERVAL 30
+#define DEFAULT_MWI_TPS_QUEUE_HIGH AST_TASKPROCESSOR_HIGH_WATER_LEVEL
+#define DEFAULT_MWI_TPS_QUEUE_LOW -1
+#define DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED 0
 
 static char default_useragent[256];
 
@@ -79,6 +83,14 @@ struct global_config {
        unsigned int unidentified_request_period;
        /* Interval at which expired unidentifed requests will be pruned */
        unsigned int unidentified_request_prune_interval;
+       struct {
+               /*! Taskprocessor high water alert trigger level */
+               unsigned int tps_queue_high;
+               /*! Taskprocessor low water clear alert level. */
+               int tps_queue_low;
+               /*! Nonzero to disable sending unsolicited mwi to all endpoints on startup */
+               unsigned int disable_initial_unsolicited;
+       } mwi;
 };
 
 static void global_destructor(void *obj)
@@ -314,6 +326,53 @@ void ast_sip_get_default_from_user(char *from_user, size_t size)
        }
 }
 
+
+unsigned int ast_sip_get_mwi_tps_queue_high(void)
+{
+       unsigned int tps_queue_high;
+       struct global_config *cfg;
+
+       cfg = get_global_cfg();
+       if (!cfg) {
+               return DEFAULT_MWI_TPS_QUEUE_HIGH;
+       }
+
+       tps_queue_high = cfg->mwi.tps_queue_high;
+       ao2_ref(cfg, -1);
+       return tps_queue_high;
+}
+
+int ast_sip_get_mwi_tps_queue_low(void)
+{
+       int tps_queue_low;
+       struct global_config *cfg;
+
+       cfg = get_global_cfg();
+       if (!cfg) {
+               return DEFAULT_MWI_TPS_QUEUE_LOW;
+       }
+
+       tps_queue_low = cfg->mwi.tps_queue_low;
+       ao2_ref(cfg, -1);
+       return tps_queue_low;
+}
+
+unsigned int ast_sip_get_mwi_disable_initial_unsolicited(void)
+{
+       unsigned int disable_initial_unsolicited;
+       struct global_config *cfg;
+
+       cfg = get_global_cfg();
+       if (!cfg) {
+               return DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED;
+       }
+
+       disable_initial_unsolicited = cfg->mwi.disable_initial_unsolicited;
+       ao2_ref(cfg, -1);
+       return disable_initial_unsolicited;
+}
+
+
 /*!
  * \internal
  * \brief Observer to set default global object if none exist.
@@ -450,6 +509,15 @@ int ast_sip_initialize_sorcery_global(void)
                OPT_UINT_T, 0, FLDSET(struct global_config, unidentified_request_prune_interval));
        ast_sorcery_object_field_register(sorcery, "global", "default_realm", DEFAULT_REALM,
                OPT_STRINGFIELD_T, 0, STRFLDSET(struct global_config, default_realm));
+       ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_high",
+               __stringify(DEFAULT_MWI_TPS_QUEUE_HIGH),
+               OPT_UINT_T, 0, FLDSET(struct global_config, mwi.tps_queue_high));
+       ast_sorcery_object_field_register(sorcery, "global", "mwi_tps_queue_low",
+               __stringify(DEFAULT_MWI_TPS_QUEUE_LOW),
+               OPT_INT_T, 0, FLDSET(struct global_config, mwi.tps_queue_low));
+       ast_sorcery_object_field_register(sorcery, "global", "mwi_disable_initial_unsolicited",
+               DEFAULT_MWI_DISABLE_INITIAL_UNSOLICITED ? "yes" : "no",
+               OPT_BOOL_T, 1, FLDSET(struct global_config, mwi.disable_initial_unsolicited));
 
        if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
                return -1;
index d86c96c..bf7f042 100644 (file)
@@ -35,6 +35,7 @@
 #include "asterisk/module.h"
 #include "asterisk/logger.h"
 #include "asterisk/astobj2.h"
+#include "asterisk/taskprocessor.h"
 #include "asterisk/sorcery.h"
 #include "asterisk/stasis.h"
 #include "asterisk/app.h"
@@ -52,6 +53,12 @@ static char *default_voicemail_extension;
 
 #define MWI_DATASTORE "MWI datastore"
 
+/*! Number of serializers in pool if one not supplied. */
+#define MWI_SERIALIZER_POOL_SIZE 8
+
+/*! Pool of serializers to use if not supplied. */
+static struct ast_taskprocessor *mwi_serializer_pool[MWI_SERIALIZER_POOL_SIZE];
+
 static void mwi_subscription_shutdown(struct ast_sip_subscription *sub);
 static void mwi_to_ami(struct ast_sip_subscription *sub, struct ast_str **buf);
 static int mwi_new_subscribe(struct ast_sip_endpoint *endpoint,
@@ -119,6 +126,117 @@ struct mwi_subscription {
        char id[1];
 };
 
+/*!
+ * \internal
+ * \brief Shutdown the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \return Nothing
+ */
+static void mwi_serializer_pool_shutdown(void)
+{
+       int idx;
+
+       for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+               ast_taskprocessor_unreference(mwi_serializer_pool[idx]);
+               mwi_serializer_pool[idx] = NULL;
+       }
+}
+
+/*!
+ * \internal
+ * \brief Setup the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_pool_setup(void)
+{
+       char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
+       int idx;
+
+       for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+               /* Create name with seq number appended. */
+               ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/mwi");
+
+               mwi_serializer_pool[idx] = ast_sip_create_serializer(tps_name);
+               if (!mwi_serializer_pool[idx]) {
+                       mwi_serializer_pool_shutdown();
+                       return -1;
+               }
+       }
+       return 0;
+}
+
+/*!
+ * \internal
+ * \brief Pick a mwi serializer from the pool.
+ * \since 13.12.0
+ *
+ * \retval least queue size task processor.
+ */
+static struct ast_taskprocessor *get_mwi_serializer(void)
+{
+       int idx;
+       int pos;
+
+       if (!mwi_serializer_pool[0]) {
+               return NULL;
+       }
+
+       for (pos = idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+               if (ast_taskprocessor_size(mwi_serializer_pool[idx]) < ast_taskprocessor_size(mwi_serializer_pool[pos])) {
+                       pos = idx;
+               }
+       }
+
+       return mwi_serializer_pool[pos];
+}
+
+/*!
+ * \internal
+ * \brief Set taskprocessor alert levels for the serializers in the mwi pool.
+ * \since 13.12.0
+ *
+ * \retval 0 on success.
+ * \retval -1 on error.
+ */
+static int mwi_serializer_set_alert_levels(void)
+{
+       int idx;
+       long tps_queue_high;
+       long tps_queue_low;
+
+       if (!mwi_serializer_pool[0]) {
+               return -1;
+       }
+
+       tps_queue_high = ast_sip_get_mwi_tps_queue_high();
+       if (tps_queue_high <= 0) {
+               ast_log(AST_LOG_WARNING, "Invalid taskprocessor high water alert trigger level '%ld'\n",
+                       tps_queue_high);
+               tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
+       }
+
+       tps_queue_low = ast_sip_get_mwi_tps_queue_low();
+       if (tps_queue_low < -1 || tps_queue_high < tps_queue_low) {
+               ast_log(AST_LOG_WARNING, "Invalid taskprocessor low water clear alert level '%ld'\n",
+                       tps_queue_low);
+               tps_queue_low = -1;
+       }
+
+       for (idx = 0; idx < MWI_SERIALIZER_POOL_SIZE; ++idx) {
+               if (ast_taskprocessor_alert_set_levels(mwi_serializer_pool[idx], tps_queue_low, tps_queue_high)) {
+                       ast_log(AST_LOG_WARNING, "Failed to set alert levels for MWI serializer pool #%d.\n",
+                               idx);
+               }
+       }
+
+       return 0;
+}
+
+
 static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
                struct stasis_message *msg);
 
@@ -945,7 +1063,7 @@ static int send_notify(void *obj, void *arg, int flags)
        struct mwi_subscription *mwi_sub = obj;
        struct ast_taskprocessor *serializer = mwi_sub->is_solicited
                ? ast_sip_subscription_get_serializer(mwi_sub->sip_sub)
-               : NULL;
+               : get_mwi_serializer();
 
        if (ast_sip_push_task(serializer, serialized_notify, ao2_bump(mwi_sub))) {
                ao2_ref(mwi_sub, -1);
@@ -1063,7 +1181,7 @@ static int send_contact_notify(void *obj, void *arg, int flags)
                return 0;
        }
 
-       if (ast_sip_push_task(NULL, serialized_notify, ao2_bump(mwi_sub))) {
+       if (ast_sip_push_task(get_mwi_serializer(), serialized_notify, ao2_bump(mwi_sub))) {
                ao2_ref(mwi_sub, -1);
        }
 
@@ -1149,6 +1267,7 @@ static void global_loaded(const char *object_type)
 {
        ast_free(default_voicemail_extension);
        default_voicemail_extension = ast_sip_get_default_voicemail_extension();
+       mwi_serializer_set_alert_levels();
 }
 
 static struct ast_sorcery_observer global_observer = {
@@ -1175,15 +1294,21 @@ static int load_module(void)
                return AST_MODULE_LOAD_DECLINE;
        }
 
+       if (mwi_serializer_pool_setup()) {
+               ast_log(AST_LOG_WARNING, "Failed to create MWI serializer pool. The default SIP pool will be used for MWI\n");
+       }
+
        create_mwi_subscriptions();
        ast_sorcery_observer_add(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
        ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
        ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
 
-       if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
-               ast_sip_push_task(NULL, send_initial_notify_all, NULL);
-       } else {
-               stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+       if (!ast_sip_get_mwi_disable_initial_unsolicited()) {
+               if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
+                       ast_sip_push_task(NULL, send_initial_notify_all, NULL);
+               } else {
+                       stasis_subscribe_pool(ast_manager_get_topic(), mwi_startup_event_cb, NULL);
+               }
        }
 
        return AST_MODULE_LOAD_SUCCESS;
@@ -1193,6 +1318,7 @@ static int unload_module(void)
 {
        ao2_callback(unsolicited_mwi, OBJ_UNLINK | OBJ_NODATA | OBJ_MULTIPLE, unsubscribe, NULL);
        ao2_ref(unsolicited_mwi, -1);
+       mwi_serializer_pool_shutdown();
        ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
        ast_sorcery_observer_remove(ast_sip_get_sorcery(), "contact", &mwi_contact_observer);
        ast_sip_unregister_subscription_handler(&mwi_handler);