stasis: Add internal filtering of messages.
[asterisk/asterisk.git] / main / stasis.c
index ed83873..93112d9 100644 (file)
@@ -370,6 +370,11 @@ const char *stasis_topic_name(const struct stasis_topic *topic)
        return topic->name;
 }
 
+size_t stasis_topic_subscribers(const struct stasis_topic *topic)
+{
+       return AST_VECTOR_SIZE(&topic->subscribers);
+}
+
 /*! \internal */
 struct stasis_subscription {
        /*! Unique ID for this subscription */
@@ -391,6 +396,11 @@ struct stasis_subscription {
        /*! Flag set when final message for sub has been processed.
         *  Be sure join_lock is held before reading/setting. */
        int final_message_processed;
+
+       /*! The message types this subscription is accepting */
+       AST_VECTOR(, char) accepted_message_types;
+       /*! The message filter currently in use */
+       enum stasis_subscription_message_filter filter;
 };
 
 static void subscription_dtor(void *obj)
@@ -409,6 +419,8 @@ static void subscription_dtor(void *obj)
        ast_taskprocessor_unreference(sub->mailbox);
        sub->mailbox = NULL;
        ast_cond_destroy(&sub->join_cond);
+
+       AST_VECTOR_FREE(&sub->accepted_message_types);
 }
 
 /*!
@@ -420,19 +432,25 @@ static void subscription_dtor(void *obj)
 static void subscription_invoke(struct stasis_subscription *sub,
                                  struct stasis_message *message)
 {
+       unsigned int final = stasis_subscription_final_message(sub, message);
+       int message_type_id = stasis_message_type_id(stasis_subscription_change_type());
+
        /* Notify that the final message has been received */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_rxed = 1;
                ast_cond_signal(&sub->join_cond);
                ao2_unlock(sub);
        }
 
-       /* Since sub is mostly immutable, no need to lock sub */
-       sub->callback(sub->data, sub, message);
+       if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
+               (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
+               /* Since sub is mostly immutable, no need to lock sub */
+               sub->callback(sub->data, sub, message);
+       }
 
        /* Notify that the final message has been processed */
-       if (stasis_subscription_final_message(sub, message)) {
+       if (final) {
                ao2_lock(sub);
                sub->final_message_processed = 1;
                ast_cond_signal(&sub->join_cond);
@@ -500,6 +518,8 @@ struct stasis_subscription *internal_stasis_subscribe(
        sub->callback = callback;
        sub->data = data;
        ast_cond_init(&sub->join_cond, NULL);
+       sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
+       AST_VECTOR_INIT(&sub->accepted_message_types, 0);
 
        if (topic_add_subscription(topic, sub) != 0) {
                ao2_ref(sub, -1);
@@ -586,6 +606,76 @@ int stasis_subscription_set_congestion_limits(struct stasis_subscription *subscr
        return res;
 }
 
+int stasis_subscription_accept_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               /* Filtering is unreliable as this message type is not yet initialized
+                * so force all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 1)) {
+               /* We do this for the same reason as above. The subscription can still operate, so allow
+                * it to do so by forcing all messages through.
+                */
+               subscription->filter = STASIS_SUBSCRIPTION_FILTER_FORCED_NONE;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_decline_message_type(struct stasis_subscription *subscription,
+       const struct stasis_message_type *type)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ast_assert(type != NULL);
+       ast_assert(stasis_message_type_name(type) != NULL);
+
+       if (!type || !stasis_message_type_name(type)) {
+               return 0;
+       }
+
+       ao2_lock(subscription->topic);
+       if (stasis_message_type_id(type) < AST_VECTOR_SIZE(&subscription->accepted_message_types)) {
+               /* The memory is already allocated so this can't fail */
+               AST_VECTOR_REPLACE(&subscription->accepted_message_types, stasis_message_type_id(type), 0);
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
+int stasis_subscription_set_filter(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_filter filter)
+{
+       if (!subscription) {
+               return -1;
+       }
+
+       ao2_lock(subscription->topic);
+       if (subscription->filter != STASIS_SUBSCRIPTION_FILTER_FORCED_NONE) {
+               subscription->filter = filter;
+       }
+       ao2_unlock(subscription->topic);
+
+       return 0;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
@@ -781,6 +871,18 @@ static void dispatch_message(struct stasis_subscription *sub,
        struct stasis_message *message,
        int synchronous)
 {
+       /* Determine if this subscription is interested in this message. Note that final
+        * messages are special and are always invoked on the subscription.
+        */
+       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
+               int message_type_id = stasis_message_type_id(stasis_message_type(message));
+               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
+                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
+                       !stasis_subscription_final_message(sub, message)) {
+                       return;
+               }
+       }
+
        if (!sub->mailbox) {
                /* Dispatch directly */
                subscription_invoke(sub, message);
@@ -840,6 +942,11 @@ static void publish_msg(struct stasis_topic *topic,
        ast_assert(topic != NULL);
        ast_assert(message != NULL);
 
+       /* If there are no subscribers don't bother */
+       if (!stasis_topic_subscribers(topic)) {
+               return;
+       }
+
        /*
         * The topic may be unref'ed by the subscription invocation.
         * Make sure we hold onto a reference while dispatching.