stasis: Allow filtering by formatter
authorGeorge Joseph <gjoseph@digium.com>
Thu, 29 Nov 2018 15:53:51 +0000 (08:53 -0700)
committerGeorge Joseph <gjoseph@digium.com>
Fri, 7 Dec 2018 13:59:00 +0000 (08:59 -0500)
A subscriber can now indicate that it only wants messages
that have formatters of a specific type.  For instance,
manager can indicate that it only wants messages that have a
"to_ami" formatter.  You can combine this with the existing
filter for message type to get only messages with specific
formatters or messages of specific types.

ASTERISK-28186

Change-Id: Ifdb7a222a73b6b56c6bb9e4ee93dc8a394a5494c

include/asterisk/stasis.h
include/asterisk/stasis_message_router.h
main/stasis.c
main/stasis_message.c
main/stasis_message_router.c
tests/test_stasis.c

index 85e78dc..6d423d9 100644 (file)
@@ -301,6 +301,21 @@ enum stasis_subscription_message_filter {
 };
 
 /*!
+ * \brief Stasis subscription formatter filters
+ *
+ * There should be an entry here for each member of \ref stasis_message_vtable
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters {
+       STASIS_SUBSCRIPTION_FORMATTER_NONE =  0,
+       STASIS_SUBSCRIPTION_FORMATTER_JSON =  1 << 0,  /*!< Allow messages with a to_json formatter */
+       STASIS_SUBSCRIPTION_FORMATTER_AMI =   1 << 1,  /*!< Allow messages with a to_ami formatter */
+       STASIS_SUBSCRIPTION_FORMATTER_EVENT = 1 << 2,  /*!< Allow messages with a to_event formatter */
+};
+
+/*!
  * \brief Create a new message type.
  *
  * \ref stasis_message_type is an AO2 object, so ao2_cleanup() when you're done
@@ -676,6 +691,30 @@ int stasis_subscription_set_filter(struct stasis_subscription *subscription,
        enum stasis_subscription_message_filter filter);
 
 /*!
+ * \brief Indicate to a subscription that we are interested in messages with one or more formatters.
+ *
+ * \param subscription Subscription to alter.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_formatters formatters);
+
+/*!
+ * \brief Get a bitmap of available formatters for a message type
+ *
+ * \param message_type Message type
+ * \return A bitmap of \ref stasis_subscription_message_formatters
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+       const struct stasis_message_type *message_type);
+
+/*!
  * \brief Cancel a subscription.
  *
  * Note that in an asynchronous system, there may still be messages queued or
index 8dcdfcc..9897d62 100644 (file)
@@ -242,4 +242,23 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
                                      stasis_subscription_cb callback,
                                      void *data);
 
+/*!
+ * \brief Indicate to a message router that we are interested in messages with one or more formatters.
+ *
+ * The formatters are passed on to the underlying subscription.
+ *
+ * \warning With direct subscriptions, adding a formatter filter is an OR operation
+ * with any message type filters.  In the current implementation of message router however,
+ * it's an AND operation.  Even when setting a default route, the callback will only get
+ * messages that have the formatters provides in this call.
+ *
+ * \param router Router to set the formatters of.
+ * \param formatters A bitmap of \ref stasis_subscription_message_formatters we wish to receive.
+ *
+ * \since 13.25.0
+ * \since 16.2.0
+ */
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+       enum stasis_subscription_message_formatters formatters);
+
 #endif /* _ASTERISK_STASIS_MESSAGE_ROUTER_H */
index 0c60b13..69ec1a5 100644 (file)
@@ -399,6 +399,8 @@ struct stasis_subscription {
 
        /*! The message types this subscription is accepting */
        AST_VECTOR(, char) accepted_message_types;
+       /*! The message formatters this subscription is accepting */
+       enum stasis_subscription_message_formatters accepted_formatters;
        /*! The message filter currently in use */
        enum stasis_subscription_message_filter filter;
 };
@@ -443,6 +445,10 @@ static void subscription_invoke(struct stasis_subscription *sub,
                ao2_unlock(sub);
        }
 
+       /*
+        * If filtering is turned on and this is a 'final' message, we only invoke the callback
+        * if the subscriber accepts subscription_change message types.
+        */
        if (!final || sub->filter != STASIS_SUBSCRIPTION_FILTER_SELECTIVE ||
                (message_type_id < AST_VECTOR_SIZE(&sub->accepted_message_types) && AST_VECTOR_GET(&sub->accepted_message_types, message_type_id))) {
                /* Since sub is mostly immutable, no need to lock sub */
@@ -520,6 +526,7 @@ struct stasis_subscription *internal_stasis_subscribe(
        ast_cond_init(&sub->join_cond, NULL);
        sub->filter = STASIS_SUBSCRIPTION_FILTER_NONE;
        AST_VECTOR_INIT(&sub->accepted_message_types, 0);
+       sub->accepted_formatters = STASIS_SUBSCRIPTION_FORMATTER_NONE;
 
        if (topic_add_subscription(topic, sub) != 0) {
                ao2_ref(sub, -1);
@@ -676,6 +683,18 @@ int stasis_subscription_set_filter(struct stasis_subscription *subscription,
        return 0;
 }
 
+void stasis_subscription_accept_formatters(struct stasis_subscription *subscription,
+       enum stasis_subscription_message_formatters formatters)
+{
+       ast_assert(subscription != NULL);
+
+       ao2_lock(subscription->topic);
+       subscription->accepted_formatters = formatters;
+       ao2_unlock(subscription->topic);
+
+       return;
+}
+
 void stasis_subscription_join(struct stasis_subscription *subscription)
 {
        if (subscription) {
@@ -871,17 +890,57 @@ static void dispatch_message(struct stasis_subscription *sub,
        struct stasis_message *message,
        int synchronous)
 {
-       /* Determine if this subscription is interested in this message. Note that final
-        * messages are special and are always invoked on the subscription.
+       int is_final = stasis_subscription_final_message(sub, message);
+
+       /*
+        * The 'do while' gives us an easy way to skip remaining logic once
+        * we determine the message should be accepted.
+        * The code looks more verbose than it needs to be but it optimizes
+        * down very nicely.  It's just easier to understand and debug this way.
         */
-       if (sub->filter == STASIS_SUBSCRIPTION_FILTER_SELECTIVE) {
-               int message_type_id = stasis_message_type_id(stasis_message_type(message));
-               if ((message_type_id >= AST_VECTOR_SIZE(&sub->accepted_message_types) ||
-                       !AST_VECTOR_GET(&sub->accepted_message_types, message_type_id)) &&
-                       !stasis_subscription_final_message(sub, message)) {
-                       return;
+       do {
+               struct stasis_message_type *message_type = stasis_message_type(message);
+               int type_id = stasis_message_type_id(message_type);
+               int type_filter_specified = 0;
+               int formatter_filter_specified = 0;
+               int type_filter_passed = 0;
+               int formatter_filter_passed = 0;
+
+               /* We always accept final messages so only run the filter logic if not final */
+               if (is_final) {
+                       break;
                }
-       }
+
+               type_filter_specified = sub->filter & STASIS_SUBSCRIPTION_FILTER_SELECTIVE;
+               formatter_filter_specified = sub->accepted_formatters != STASIS_SUBSCRIPTION_FORMATTER_NONE;
+
+               /* Accept if no filters of either type were specified */
+               if (!type_filter_specified && !formatter_filter_specified) {
+                       break;
+               }
+
+               type_filter_passed = type_filter_specified
+                       && type_id < AST_VECTOR_SIZE(&sub->accepted_message_types)
+                       && AST_VECTOR_GET(&sub->accepted_message_types, type_id);
+
+               /*
+                * Since the type and formatter filters are OR'd, we can skip
+                * the formatter check if the type check passes.
+                */
+               if (type_filter_passed) {
+                       break;
+               }
+
+               formatter_filter_passed = formatter_filter_specified
+                       && (sub->accepted_formatters & stasis_message_type_available_formatters(message_type));
+
+               if (formatter_filter_passed) {
+                       break;
+               }
+
+               return;
+
+       } while (0);
 
        if (!sub->mailbox) {
                /* Dispatch directly */
index 1fdbe85..d3f304c 100644 (file)
@@ -40,6 +40,7 @@ struct stasis_message_type {
        char *name;
        unsigned int hash;
        int id;
+       enum stasis_subscription_message_formatters available_formatters;
 };
 
 static struct stasis_message_vtable null_vtable = {};
@@ -80,6 +81,15 @@ int stasis_message_type_create(const char *name,
        }
        type->hash = ast_hashtab_hash_string(name);
        type->vtable = vtable;
+       if (vtable->to_json) {
+               type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_JSON;
+       }
+       if (vtable->to_ami) {
+               type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_AMI;
+       }
+       if (vtable->to_event) {
+               type->available_formatters |= STASIS_SUBSCRIPTION_FORMATTER_EVENT;
+       }
        type->id = ast_atomic_fetchadd_int(&message_type_id, +1);
        *result = type;
 
@@ -101,6 +111,12 @@ int stasis_message_type_id(const struct stasis_message_type *type)
        return type->id;
 }
 
+enum stasis_subscription_message_formatters stasis_message_type_available_formatters(
+       const struct stasis_message_type *type)
+{
+       return type->available_formatters;
+}
+
 /*! \internal */
 struct stasis_message {
        /*! Time the message was created */
index 41ebc7e..197f7f9 100644 (file)
@@ -399,3 +399,13 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
        /* While this implementation can never fail, it used to be able to */
        return 0;
 }
+
+void stasis_message_router_accept_formatters(struct stasis_message_router *router,
+       enum stasis_subscription_message_formatters formatters)
+{
+       ast_assert(router != NULL);
+
+       stasis_subscription_accept_formatters(router->subscription, formatters);
+
+       return;
+}
index 5bc38c5..e620039 100644 (file)
 #include "asterisk/stasis_message_router.h"
 #include "asterisk/test.h"
 
-static const char *test_category = "/stasis/core/";
+#define test_category "/stasis/core/"
+
+static struct ast_event *fake_event(struct stasis_message *message)
+{
+       return ast_event_new(AST_EVENT_CUSTOM,
+               AST_EVENT_IE_DESCRIPTION, AST_EVENT_IE_PLTYPE_STR, "Dummy", AST_EVENT_IE_END);
+}
 
 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
 {
@@ -2044,6 +2050,389 @@ AST_TEST_DEFINE(caching_dtor_order)
        return AST_TEST_PASS;
 }
 
+struct test_message_types {
+       struct stasis_message_type *none;
+       struct stasis_message_type *ami;
+       struct stasis_message_type *json;
+       struct stasis_message_type *event;
+       struct stasis_message_type *amievent;
+       struct stasis_message_type *type1;
+       struct stasis_message_type *type2;
+       struct stasis_message_type *type3;
+       struct stasis_message_type *change;
+};
+
+static void destroy_message_types(void *obj)
+{
+       struct test_message_types *types = obj;
+
+       ao2_cleanup(types->none);
+       ao2_cleanup(types->ami);
+       ao2_cleanup(types->json);
+       ao2_cleanup(types->event);
+       ao2_cleanup(types->amievent);
+       ao2_cleanup(types->type1);
+       ao2_cleanup(types->type2);
+       ao2_cleanup(types->type3);
+       /* N.B.  Don't cleanup types->change! */
+}
+
+static struct test_message_types *create_message_types(struct ast_test *test)
+{
+       struct stasis_message_vtable vtable = { 0 };
+       struct test_message_types *types;
+       enum ast_test_result_state  __attribute__ ((unused)) rc;
+
+       types = ao2_alloc(sizeof(*types), destroy_message_types);
+       if (!types) {
+               return NULL;
+       }
+
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageNONE", &vtable, &types->none) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       vtable.to_ami = fake_ami;
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageAMI", &vtable, &types->ami) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       vtable.to_ami = NULL;
+       vtable.to_json = fake_json;
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageJSON", &vtable, &types->json) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       vtable.to_ami = NULL;
+       vtable.to_json = NULL;
+       vtable.to_event = fake_event;
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageEVENT", &vtable, &types->event) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       vtable.to_ami = fake_ami;
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageAMIEVENT", &vtable, &types->amievent) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageType1", NULL, &types->type1) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageType2", NULL, &types->type2) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       ast_test_validate_cleanup(test,
+               stasis_message_type_create("TestMessageType3", NULL, &types->type3) == STASIS_MESSAGE_TYPE_SUCCESS,
+               rc, cleanup);
+
+       types->change = stasis_subscription_change_type();
+
+       return types;
+
+cleanup:
+       ao2_cleanup(types);
+       return NULL;
+}
+
+struct cts {
+       struct consumer *consumer;
+       struct stasis_topic *topic;
+       struct stasis_subscription *sub;
+};
+
+static void destroy_cts(void *obj)
+{
+       struct cts *c = obj;
+
+       stasis_unsubscribe(c->sub);
+       ao2_cleanup(c->topic);
+       ao2_cleanup(c->consumer);
+}
+
+static struct cts *create_cts(struct ast_test *test)
+{
+       struct cts *cts = ao2_alloc(sizeof(*cts), destroy_cts);
+       enum ast_test_result_state  __attribute__ ((unused)) rc;
+
+       ast_test_validate_cleanup(test, cts, rc, cleanup);
+
+       cts->topic = stasis_topic_create("TestTopic");
+       ast_test_validate_cleanup(test, NULL != cts->topic, rc, cleanup);
+
+       cts->consumer = consumer_create(0);
+       ast_test_validate_cleanup(test, NULL != cts->consumer, rc, cleanup);
+
+       ao2_ref(cts->consumer, +1);
+       cts->sub = stasis_subscribe(cts->topic, consumer_exec, cts->consumer);
+       ast_test_validate_cleanup(test, NULL != cts->sub, rc, cleanup);
+
+       return cts;
+
+cleanup:
+       ao2_cleanup(cts);
+       return NULL;
+}
+
+static int is_msg(struct stasis_message *msg, struct stasis_message_type *mtype, const char *data)
+{
+       struct stasis_subscription_change *msg_data = stasis_message_data(msg);
+
+       if (stasis_message_type(msg) != mtype) {
+               return 0;
+       }
+
+       if (data) {
+               return (strcmp(data, msg_data->description) == 0);
+       }
+
+       return 1;
+}
+
+static void dump_consumer(struct ast_test *test, struct cts *cts)
+{
+       int i;
+       struct stasis_subscription_change *data;
+
+       ast_test_status_update(test, "Messages received: %ld  Final? %s\n", cts->consumer->messages_rxed_len,
+               cts->consumer->complete ? "yes" : "no");
+       for (i = 0; i < cts->consumer->messages_rxed_len; i++) {
+               data = stasis_message_data(cts->consumer->messages_rxed[i]);
+               ast_test_status_update(test, "Message type received: %s %s\n",
+                       stasis_message_type_name(stasis_message_type(cts->consumer->messages_rxed[i])),
+                       data && data->description ? data->description : "no data");
+       }
+}
+
+static int send_msg(struct ast_test *test, struct cts *cts, struct stasis_message_type *msg_type,
+       const char *data)
+{
+       struct stasis_message *msg;
+       struct stasis_subscription_change *test_data =
+               ao2_alloc(sizeof(*test_data) + (data ? strlen(data) : strlen("no data")) + 1, NULL);
+
+       if (!test_data) {
+               return 0;
+       }
+       strcpy(test_data->description, S_OR(data, "no data")); /* Safe */
+
+       msg = stasis_message_create(msg_type, test_data);
+       ao2_ref(test_data, -1);
+       if (!msg) {
+               ast_test_status_update(test, "Unable to create %s message\n",
+                       stasis_message_type_name(msg_type));
+               return 0;
+       }
+
+       stasis_publish(cts->topic, msg);
+       ao2_ref(msg, -1);
+
+       return 1;
+}
+
+AST_TEST_DEFINE(type_filters)
+{
+       RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+       RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+       int ix = 0;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category "filtering/";
+               info->summary = "Test message filtering by type";
+               info->description = "Test message filtering by type";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       types = create_message_types(test);
+       ast_test_validate(test, NULL != types);
+
+       cts = create_cts(test);
+       ast_test_validate(test, NULL != cts);
+
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+       ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+
+       /* We should get these */
+       ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+       /* ... but not this one */
+       ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+       /* Wait for change(subscribe) and "Pass" messages */
+       consumer_wait_for(cts->consumer, 3);
+
+       /* Remove type 1 */
+       ast_test_validate(test, stasis_subscription_decline_message_type(cts->sub, types->type1) == 0);
+
+       /* We should now NOT get this one */
+       ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+       /* We should get this one (again) */
+       ast_test_validate(test, send_msg(test, cts, types->type2, "Pass2"));
+       /* We still should NOT get this one */
+       ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+
+       /* We should now have a second type2 */
+       consumer_wait_for(cts->consumer, 4);
+
+       stasis_unsubscribe(cts->sub);
+       cts->sub = NULL;
+       consumer_wait_for_completion(cts->consumer);
+
+       dump_consumer(test, cts);
+
+       ast_test_validate(test, 1 == cts->consumer->complete);
+       ast_test_validate(test, 5 == cts->consumer->messages_rxed_len);
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass2"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(formatter_filters)
+{
+       RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+       RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup) ;
+       int ix = 0;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category "filtering/";
+               info->summary = "Test message filtering by formatter";
+               info->description = "Test message filtering by formatter";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       types = create_message_types(test);
+       ast_test_validate(test, NULL != types);
+
+       cts = create_cts(test);
+       ast_test_validate(test, NULL != cts);
+
+       stasis_subscription_accept_formatters(cts->sub,
+               STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+       /* We should get these */
+       ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+
+       /* ... but not these */
+       ast_test_validate(test, send_msg(test, cts, types->none, "FAIL"));
+       ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+       ast_test_validate(test, send_msg(test, cts, types->type1, "FAIL"));
+
+       /* Wait for change(subscribe) and the "Pass" messages */
+       consumer_wait_for(cts->consumer, 4);
+
+       /* Change the subscription to accept only event formatters */
+       stasis_subscription_accept_formatters(cts->sub, STASIS_SUBSCRIPTION_FORMATTER_EVENT);
+
+       /* We should NOT get these now */
+       ast_test_validate(test, send_msg(test, cts, types->ami, "FAIL"));
+       ast_test_validate(test, send_msg(test, cts, types->json, "FAIL"));
+       /* ... but we should still get this one */
+       ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass2"));
+       /* ... and this one should be new */
+       ast_test_validate(test, send_msg(test, cts, types->event, "Pass"));
+
+       /* We should now have a second amievent */
+       consumer_wait_for(cts->consumer, 6);
+
+       stasis_unsubscribe(cts->sub);
+       cts->sub = NULL;
+       consumer_wait_for_completion(cts->consumer);
+
+       dump_consumer(test, cts);
+
+       ast_test_validate(test, 1 == cts->consumer->complete);
+       ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass2"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->event, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+       return AST_TEST_PASS;
+}
+
+AST_TEST_DEFINE(combo_filters)
+{
+       RAII_VAR(struct cts *, cts, NULL, ao2_cleanup);
+       RAII_VAR(struct test_message_types *, types, NULL, ao2_cleanup);
+       int ix = 0;
+
+       switch (cmd) {
+       case TEST_INIT:
+               info->name = __func__;
+               info->category = test_category "filtering/";
+               info->summary = "Test message filtering by type and formatter";
+               info->description = "Test message filtering by type and formatter";
+               return AST_TEST_NOT_RUN;
+       case TEST_EXECUTE:
+               break;
+       }
+
+       types = create_message_types(test);
+       ast_test_validate(test, NULL != types);
+
+       cts = create_cts(test);
+       ast_test_validate(test, NULL != cts);
+
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type1) == 0);
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->type2) == 0);
+       ast_test_validate(test, stasis_subscription_accept_message_type(cts->sub, types->change) == 0);
+       ast_test_validate(test, stasis_subscription_set_filter(cts->sub, STASIS_SUBSCRIPTION_FILTER_SELECTIVE) == 0);
+       stasis_subscription_accept_formatters(cts->sub,
+               STASIS_SUBSCRIPTION_FORMATTER_AMI | STASIS_SUBSCRIPTION_FORMATTER_JSON);
+
+       /* We should get these */
+       ast_test_validate(test, send_msg(test, cts, types->type1, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->type2, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->ami, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->amievent, "Pass"));
+       ast_test_validate(test, send_msg(test, cts, types->json, "Pass"));
+
+       /* ... but not these */
+       ast_test_validate(test, send_msg(test, cts, types->type3, "FAIL"));
+       ast_test_validate(test, send_msg(test, cts, types->event, "FAIL"));
+
+       /* Wait for change(subscribe) and the "Pass" messages */
+       consumer_wait_for(cts->consumer, 6);
+
+       stasis_unsubscribe(cts->sub);
+       cts->sub = NULL;
+       consumer_wait_for_completion(cts->consumer);
+
+       dump_consumer(test, cts);
+
+       ast_test_validate(test, 1 == cts->consumer->complete);
+       ast_test_validate(test, 7 == cts->consumer->messages_rxed_len);
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Subscribe"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type1, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->type2, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->ami, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->amievent, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->json, "Pass"));
+       ast_test_validate(test, is_msg(cts->consumer->messages_rxed[ix++], types->change, "Unsubscribe"));
+
+       return AST_TEST_PASS;
+}
+
 static int unload_module(void)
 {
        AST_TEST_UNREGISTER(message_type);
@@ -2070,6 +2459,9 @@ static int unload_module(void)
        AST_TEST_UNREGISTER(to_ami);
        AST_TEST_UNREGISTER(dtor_order);
        AST_TEST_UNREGISTER(caching_dtor_order);
+       AST_TEST_UNREGISTER(type_filters);
+       AST_TEST_UNREGISTER(formatter_filters);
+       AST_TEST_UNREGISTER(combo_filters);
        return 0;
 }
 
@@ -2099,6 +2491,9 @@ static int load_module(void)
        AST_TEST_REGISTER(to_ami);
        AST_TEST_REGISTER(dtor_order);
        AST_TEST_REGISTER(caching_dtor_order);
+       AST_TEST_REGISTER(type_filters);
+       AST_TEST_REGISTER(formatter_filters);
+       AST_TEST_REGISTER(combo_filters);
        return AST_MODULE_LOAD_SUCCESS;
 }