Transition MWI to Stasis-core
authorKinsey Moore <kmoore@digium.com>
Sat, 16 Mar 2013 15:45:58 +0000 (15:45 +0000)
committerKinsey Moore <kmoore@digium.com>
Sat, 16 Mar 2013 15:45:58 +0000 (15:45 +0000)
Remove MWI's dependency on the event system by moving it to
Stasis-core. This also introduces forwarding topic pools in Stasis-core
which aggregate many dynamically allocated topics into a single primary
topic.

Review: https://reviewboard.asterisk.org/r/2368/
(closes issue ASTERISK-21097)
Patch-by: Kinsey Moore

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@383284 65c4cc65-6c06-0410-ace0-fbb531ad65f3

20 files changed:
apps/app_minivm.c
apps/app_voicemail.c
channels/chan_dahdi.c
channels/chan_iax2.c
channels/chan_mgcp.c
channels/chan_sip.c
channels/chan_skinny.c
channels/chan_unistim.c
channels/sig_pri.c
channels/sig_pri.h
channels/sip/include/sip.h
include/asterisk/app.h
include/asterisk/stasis.h
include/asterisk/xmpp.h
main/app.c
main/asterisk.c
main/channel.c
main/stasis.c
res/res_jabber.c
res/res_xmpp.c

index 498c6ea..53c5f09 100644 (file)
@@ -2013,7 +2013,6 @@ static int leave_voicemail(struct ast_channel *chan, char *username, struct leav
  * \brief Queue a message waiting event */
 static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int new, int old)
 {
-       struct ast_event *event;
        char *mailbox, *context;
 
        mailbox = ast_strdupa(mbx);
@@ -2022,16 +2021,7 @@ static void queue_mwi_event(const char *mbx, const char *ctx, int urgent, int ne
                context = "default";
        }
 
-       if (!(event = ast_event_new(AST_EVENT_MWI,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                       AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent),
-                       AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old,
-                       AST_EVENT_IE_END))) {
-               return;
-       }
-
-       ast_event_queue_and_cache(event);
+       stasis_publish_mwi_state(mailbox, context, new + urgent, old);
 }
 
 /*!\internal
index d0a8a78..f177869 100644 (file)
@@ -974,10 +974,8 @@ static ast_cond_t poll_cond = PTHREAD_COND_INITIALIZER;
 static pthread_t poll_thread = AST_PTHREADT_NULL;
 static unsigned char poll_thread_run;
 
-/*! Subscription to ... MWI event subscriptions */
-static struct ast_event_sub *mwi_sub_sub;
-/*! Subscription to ... MWI event un-subscriptions */
-static struct ast_event_sub *mwi_unsub_sub;
+/*! Subscription to MWI event subscription changes */
+static struct stasis_subscription *mwi_sub_sub;
 
 /*!
  * \brief An MWI subscription
@@ -991,16 +989,24 @@ struct mwi_sub {
        int old_urgent;
        int old_new;
        int old_old;
-       uint32_t uniqueid;
+       char *uniqueid;
        char mailbox[1];
 };
 
 struct mwi_sub_task {
        const char *mailbox;
        const char *context;
-       uint32_t uniqueid;
+       const char *uniqueid;
 };
 
+static void mwi_sub_task_dtor(struct mwi_sub_task *mwist)
+{
+       ast_free((void *) mwist->mailbox);
+       ast_free((void *) mwist->context);
+       ast_free((void *) mwist->uniqueid);
+       ast_free(mwist);
+}
+
 static struct ast_taskprocessor *mwi_subscription_tps;
 
 static AST_RWLIST_HEAD_STATIC(mwi_subs, mwi_sub);
@@ -7721,25 +7727,16 @@ static int vm_forwardoptions(struct ast_channel *chan, struct ast_vm_user *vmu,
 
 static void queue_mwi_event(const char *box, int urgent, int new, int old)
 {
-       struct ast_event *event;
        char *mailbox, *context;
 
        /* Strip off @default */
        context = mailbox = ast_strdupa(box);
        strsep(&context, "@");
-       if (ast_strlen_zero(context))
+       if (ast_strlen_zero(context)) {
                context = "default";
-
-       if (!(event = ast_event_new(AST_EVENT_MWI,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                       AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, (new+urgent),
-                       AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, old,
-                       AST_EVENT_IE_END))) {
-               return;
        }
 
-       ast_event_queue_and_cache(event);
+       stasis_publish_mwi_state(mailbox, context, new + urgent, old);
 }
 
 /*!
@@ -12533,28 +12530,28 @@ static void *mb_poll_thread(void *data)
 
 static void mwi_sub_destroy(struct mwi_sub *mwi_sub)
 {
+       ast_free(mwi_sub->uniqueid);
        ast_free(mwi_sub);
 }
 
 static int handle_unsubscribe(void *datap)
 {
        struct mwi_sub *mwi_sub;
-       uint32_t *uniqueid = datap;
-       
+       char *uniqueid = datap;
+
        AST_RWLIST_WRLOCK(&mwi_subs);
        AST_RWLIST_TRAVERSE_SAFE_BEGIN(&mwi_subs, mwi_sub, entry) {
-               if (mwi_sub->uniqueid == *uniqueid) {
+               if (!strcmp(mwi_sub->uniqueid, uniqueid)) {
                        AST_LIST_REMOVE_CURRENT(entry);
-                       break;
+                       /* Don't break here since a duplicate uniqueid
+                        * may have been added as a result of a cache dump. */
+                       mwi_sub_destroy(mwi_sub);
                }
        }
        AST_RWLIST_TRAVERSE_SAFE_END
        AST_RWLIST_UNLOCK(&mwi_subs);
 
-       if (mwi_sub)
-               mwi_sub_destroy(mwi_sub);
-
-       ast_free(uniqueid);     
+       ast_free(uniqueid);
        return 0;
 }
 
@@ -12574,7 +12571,7 @@ static int handle_subscribe(void *datap)
        if (!(mwi_sub = ast_calloc(1, len)))
                return -1;
 
-       mwi_sub->uniqueid = p->uniqueid;
+       mwi_sub->uniqueid = ast_strdup(p->uniqueid);
        if (!ast_strlen_zero(p->mailbox))
                strcpy(mwi_sub->mailbox, p->mailbox);
 
@@ -12586,75 +12583,85 @@ static int handle_subscribe(void *datap)
        AST_RWLIST_WRLOCK(&mwi_subs);
        AST_RWLIST_INSERT_TAIL(&mwi_subs, mwi_sub, entry);
        AST_RWLIST_UNLOCK(&mwi_subs);
-       ast_free((void *) p->mailbox);
-       ast_free((void *) p->context);
-       ast_free(p);
+       mwi_sub_task_dtor(p);
        poll_subscribed_mailbox(mwi_sub);
        return 0;
 }
 
-static void mwi_unsub_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_unsub_event_cb(struct stasis_subscription_change *change)
 {
-       uint32_t u, *uniqueid = ast_calloc(1, sizeof(*uniqueid));
+       char *uniqueid = ast_strdup(change->uniqueid);
 
        if (!uniqueid) {
                ast_log(LOG_ERROR, "Unable to allocate memory for uniqueid\n");
                return;
        }
 
-       if (ast_event_get_type(event) != AST_EVENT_UNSUB) {
+       if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
                ast_free(uniqueid);
-               return;
        }
+}
 
-       if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI) {
-               ast_free(uniqueid);
+static void mwi_sub_event_cb(struct stasis_subscription_change *change)
+{
+       struct mwi_sub_task *mwist;
+       char *context = ast_strdupa(stasis_topic_name(change->topic));
+       char *mailbox;
+
+       if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) {
                return;
        }
 
-       u = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
-       *uniqueid = u;
-       if (ast_taskprocessor_push(mwi_subscription_tps, handle_unsubscribe, uniqueid) < 0) {
-               ast_free(uniqueid);
+       mailbox = strsep(&context, "@");
+
+       mwist->mailbox = ast_strdup(mailbox);
+       mwist->context = ast_strdup(context);
+       mwist->uniqueid = ast_strdup(change->uniqueid);
+
+       if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
+               mwi_sub_task_dtor(mwist);
        }
 }
 
-static void mwi_sub_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
-       struct mwi_sub_task *mwist;
-       
-       if (ast_event_get_type(event) != AST_EVENT_SUB)
-               return;
-
-       if (ast_event_get_ie_uint(event, AST_EVENT_IE_EVENTTYPE) != AST_EVENT_MWI)
+       struct stasis_subscription_change *change;
+       /* Only looking for subscription change notices here */
+       if (stasis_message_type(msg) != stasis_subscription_change()) {
                return;
+       }
 
-       if ((mwist = ast_calloc(1, (sizeof(*mwist)))) == NULL) {
-               ast_log(LOG_ERROR, "could not allocate a mwi_sub_task\n");
+       change = stasis_message_data(msg);
+       if (change->topic == stasis_mwi_topic_all()) {
                return;
        }
-       mwist->mailbox = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX));
-       mwist->context = ast_strdup(ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT));
-       mwist->uniqueid = ast_event_get_ie_uint(event, AST_EVENT_IE_UNIQUEID);
-       
-       if (ast_taskprocessor_push(mwi_subscription_tps, handle_subscribe, mwist) < 0) {
-               ast_free(mwist);
+
+       if (!strcmp(change->description, "Subscribe")) {
+               mwi_sub_event_cb(change);
+       } else if (!strcmp(change->description, "Unsubscribe")) {
+               mwi_unsub_event_cb(change);
        }
 }
 
+static int dump_cache(void *obj, void *arg, int flags)
+{
+       struct stasis_message *msg = obj;
+       mwi_event_cb(NULL, NULL, NULL, msg);
+       return 0;
+}
+
 static void start_poll_thread(void)
 {
        int errcode;
-       mwi_sub_sub = ast_event_subscribe(AST_EVENT_SUB, mwi_sub_event_cb, "Voicemail MWI subscription", NULL,
-               AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI,
-               AST_EVENT_IE_END);
-
-       mwi_unsub_sub = ast_event_subscribe(AST_EVENT_UNSUB, mwi_unsub_event_cb, "Voicemail MWI subscription", NULL,
-               AST_EVENT_IE_EVENTTYPE, AST_EVENT_IE_PLTYPE_UINT, AST_EVENT_MWI,
-               AST_EVENT_IE_END);
+       mwi_sub_sub = stasis_subscribe(stasis_mwi_topic_all(), mwi_event_cb, NULL);
 
-       if (mwi_sub_sub)
-               ast_event_report_subs(mwi_sub_sub);
+       if (mwi_sub_sub) {
+               struct ao2_container *cached = stasis_cache_dump(stasis_mwi_topic_cached(), stasis_subscription_change());
+               if (cached) {
+                       ao2_callback(cached, OBJ_MULTIPLE | OBJ_NODATA, dump_cache, NULL);
+               }
+               ao2_cleanup(cached);
+       }
 
        poll_thread_run = 1;
 
@@ -12668,13 +12675,7 @@ static void stop_poll_thread(void)
        poll_thread_run = 0;
 
        if (mwi_sub_sub) {
-               ast_event_unsubscribe(mwi_sub_sub);
-               mwi_sub_sub = NULL;
-       }
-
-       if (mwi_unsub_sub) {
-               ast_event_unsubscribe(mwi_unsub_sub);
-               mwi_unsub_sub = NULL;
+               mwi_sub_sub = stasis_unsubscribe(mwi_sub_sub);
        }
 
        ast_mutex_lock(&poll_lock);
index e077e84..15075eb 100644 (file)
@@ -502,7 +502,7 @@ static enum ast_bridge_result dahdi_bridge(struct ast_channel *c0, struct ast_ch
 
 static int dahdi_sendtext(struct ast_channel *c, const char *text);
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
@@ -1215,7 +1215,7 @@ struct dahdi_pvt {
         */
        char mailbox[AST_MAX_EXTENSION];
        /*! \brief Opaque event subscription parameters for message waiting indication support. */
-       struct ast_event_sub *mwi_event_sub;
+       struct stasis_subscription *mwi_event_sub;
        /*! \brief Delayed dialing for E911.  Overlap digits for ISDN. */
        char dialdest[256];
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
@@ -3753,7 +3753,6 @@ struct sig_ss7_callback sig_ss7_callbacks =
 static void notify_message(char *mailbox_full, int thereornot)
 {
        char s[sizeof(mwimonitornotify) + 80];
-       struct ast_event *event;
        char *mailbox, *context;
 
        /* Strip off @default */
@@ -3762,16 +3761,7 @@ static void notify_message(char *mailbox_full, int thereornot)
        if (ast_strlen_zero(context))
                context = "default";
 
-       if (!(event = ast_event_new(AST_EVENT_MWI,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                       AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, thereornot,
-                       AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, thereornot,
-                       AST_EVENT_IE_END))) {
-               return;
-       }
-
-       ast_event_queue_and_cache(event);
+       stasis_publish_mwi_state(mailbox, context, thereornot, thereornot);
 
        if (!ast_strlen_zero(mailbox) && !ast_strlen_zero(mwimonitornotify)) {
                snprintf(s, sizeof(s), "%s %s %d", mwimonitornotify, mailbox, thereornot);
@@ -5413,24 +5403,25 @@ static int send_cwcidspill(struct dahdi_pvt *p)
 static int has_voicemail(struct dahdi_pvt *p)
 {
        int new_msgs;
-       struct ast_event *event;
        char *mailbox, *context;
+       RAII_VAR(struct stasis_message *, mwi_message, NULL, ao2_cleanup);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
        mailbox = context = ast_strdupa(p->mailbox);
        strsep(&context, "@");
-       if (ast_strlen_zero(context))
+       if (ast_strlen_zero(context)) {
                context = "default";
+       }
 
-       event = ast_event_get_cached(AST_EVENT_MWI,
-               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-               AST_EVENT_IE_END);
+       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+       mwi_message = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid));
 
-       if (event) {
-               new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
-               ast_event_destroy(event);
-       } else
+       if (mwi_message) {
+               struct stasis_mwi_state *mwi_state = stasis_message_data(mwi_message);
+               new_msgs = mwi_state->new_msgs;
+       } else {
                new_msgs = ast_app_has_voicemail(p->mailbox, NULL);
+       }
 
        return new_msgs;
 }
@@ -5965,10 +5956,12 @@ static void destroy_dahdi_pvt(struct dahdi_pvt *pvt)
                }
        }
        ast_free(p->cidspill);
-       if (p->use_smdi)
+       if (p->use_smdi) {
                ast_smdi_interface_unref(p->smdi_iface);
-       if (p->mwi_event_sub)
-               ast_event_unsubscribe(p->mwi_event_sub);
+       }
+       if (p->mwi_event_sub) {
+               p->mwi_event_sub = stasis_unsubscribe(p->mwi_event_sub);
+       }
        if (p->vars) {
                ast_variables_destroy(p->vars);
        }
@@ -5981,8 +5974,9 @@ static void destroy_dahdi_pvt(struct dahdi_pvt *pvt)
 
        ast_mutex_destroy(&p->lock);
        dahdi_close_sub(p, SUB_REAL);
-       if (p->owner)
+       if (p->owner) {
                ast_channel_tech_pvt_set(p->owner, NULL);
+       }
        ast_free(p);
 }
 
@@ -13226,15 +13220,20 @@ static struct dahdi_pvt *mkintf(int channel, const struct dahdi_chan_conf *conf,
                ast_copy_string(tmp->mailbox, conf->chan.mailbox, sizeof(tmp->mailbox));
                if (channel != CHAN_PSEUDO && !ast_strlen_zero(tmp->mailbox)) {
                        char *mailbox, *context;
+                       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+                       struct stasis_topic *mailbox_specific_topic;
+
                        mailbox = context = ast_strdupa(tmp->mailbox);
                        strsep(&context, "@");
                        if (ast_strlen_zero(context))
                                context = "default";
-                       tmp->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "Dahdi MWI subscription", NULL,
-                               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                               AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-                               AST_EVENT_IE_END);
+
+                       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+                       mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+                       if (mailbox_specific_topic) {
+                               tmp->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                       }
                }
 #ifdef HAVE_DAHDI_LINEREVERSE_VMWI
                tmp->mwisend_setting = conf->chan.mwisend_setting;
index 5a307bf..7280c48 100644 (file)
@@ -533,7 +533,7 @@ struct iax2_peer {
 
        int expire;                                     /*!< Schedule entry for expiry */
        int expiry;                                     /*!< How soon to expire */
-       iax2_format capability;        /*!< Capability */
+       iax2_format capability;                         /*!< Capability */
 
        /* Qualification */
        int callno;                                     /*!< Call number of POKE request */
@@ -545,12 +545,12 @@ struct iax2_peer {
        int pokefreqnotok;                              /*!< How often to check when the host has been determined to be down */
        int historicms;                                 /*!< How long recent average responses took */
        int smoothing;                                  /*!< Sample over how many units to determine historic ms */
-       uint16_t maxcallno;                                     /*!< Max call number limit for this peer.  Set on registration */
+       uint16_t maxcallno;                             /*!< Max call number limit for this peer.  Set on registration */
 
-       struct ast_event_sub *mwi_event_sub;
+       struct stasis_subscription *mwi_event_sub;      /*!< This subscription lets pollmailboxes know which mailboxes need to be polled */
 
        struct ast_acl_list *acl;
-       enum calltoken_peer_enum calltoken_required;        /*!< Is calltoken validation required or not, can be YES, NO, or AUTO */
+       enum calltoken_peer_enum calltoken_required;    /*!< Is calltoken validation required or not, can be YES, NO, or AUTO */
 };
 
 #define IAX2_TRUNK_PREFACE (sizeof(struct iax_frame) + sizeof(struct ast_iax2_meta_hdr) + sizeof(struct ast_iax2_meta_trunk_hdr))
@@ -1316,7 +1316,7 @@ static void iax2_lock_owner(int callno)
        }
 }
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        /* The MWI subscriptions exist just so the core knows we care about those
         * mailboxes.  However, we just grab the events out of the cache when it
@@ -8743,23 +8743,24 @@ static int update_registry(struct sockaddr_in *sin, int callno, char *devtype, i
                iax_ie_append_short(&ied, IAX_IE_REFRESH, p->expiry);
                iax_ie_append_addr(&ied, IAX_IE_APPARENT_ADDR, &peer_addr);
                if (!ast_strlen_zero(p->mailbox)) {
-                       struct ast_event *event;
                        int new, old;
                        char *mailbox, *context;
+                       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+                       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
                        context = mailbox = ast_strdupa(p->mailbox);
                        strsep(&context, "@");
-                       if (ast_strlen_zero(context))
+                       if (ast_strlen_zero(context)) {
                                context = "default";
+                       }
+
+                       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+                       msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid));
 
-                       event = ast_event_get_cached(AST_EVENT_MWI,
-                               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                               AST_EVENT_IE_END);
-                       if (event) {
-                               new = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
-                               old = ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
-                               ast_event_destroy(event);
+                       if (msg) {
+                               struct stasis_mwi_state *mwi_state = stasis_message_data(msg);
+                               new = mwi_state->new_msgs;
+                               old = mwi_state->old_msgs;
                        } else { /* Fall back on checking the mailbox directly */
                                ast_app_inboxcount(p->mailbox, &new, &old);
                        }
@@ -12392,8 +12393,9 @@ static void peer_destructor(void *obj)
        if (peer->dnsmgr)
                ast_dnsmgr_release(peer->dnsmgr);
 
-       if (peer->mwi_event_sub)
-               ast_event_unsubscribe(peer->mwi_event_sub);
+       if (peer->mwi_event_sub) {
+               peer->mwi_event_sub = stasis_unsubscribe(peer->mwi_event_sub);
+       }
 
        ast_string_field_free_memory(peer);
 }
@@ -12667,14 +12669,21 @@ static struct iax2_peer *build_peer(const char *name, struct ast_variable *v, st
 
        if (!ast_strlen_zero(peer->mailbox)) {
                char *mailbox, *context;
+               struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+               struct stasis_topic *mailbox_specific_topic;
+
                context = mailbox = ast_strdupa(peer->mailbox);
                strsep(&context, "@");
-               if (ast_strlen_zero(context))
+               if (ast_strlen_zero(context)) {
                        context = "default";
-               peer->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "IAX MWI subscription", NULL,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-                       AST_EVENT_IE_END);
+               }
+
+               ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+               mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+               if (mailbox_specific_topic) {
+                       peer->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+               }
        }
 
        if (subscribe_acl_change) {
index 0c3dac6..65a5390 100644 (file)
@@ -82,6 +82,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/event.h"
 #include "asterisk/chanvars.h"
 #include "asterisk/pktccops.h"
+#include "asterisk/stasis.h"
 
 /*
  * Define to work around buggy dlink MGCP phone firmware which
@@ -342,7 +343,7 @@ struct mgcp_endpoint {
        char curtone[80];                       /*!< Current tone */
        char mailbox[AST_MAX_EXTENSION];
        char parkinglot[AST_MAX_CONTEXT];   /*!< Parkinglot */
-       struct ast_event_sub *mwi_event_sub;
+       struct stasis_subscription *mwi_event_sub;
        ast_group_t callgroup;
        ast_group_t pickupgroup;
        int callwaiting;
@@ -483,7 +484,7 @@ static struct ast_channel_tech mgcp_tech = {
        .func_channel_read = acf_channel_read,
 };
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        /* This module does not handle MWI in an event-based manner.  However, it
         * subscribes to MWI for each mailbox that is configured so that the core
@@ -494,24 +495,26 @@ static void mwi_event_cb(const struct ast_event *event, void *userdata)
 static int has_voicemail(struct mgcp_endpoint *p)
 {
        int new_msgs;
-       struct ast_event *event;
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
        char *mbox, *cntx;
 
        cntx = mbox = ast_strdupa(p->mailbox);
        strsep(&cntx, "@");
-       if (ast_strlen_zero(cntx))
+       if (ast_strlen_zero(cntx)) {
                cntx = "default";
+       }
 
-       event = ast_event_get_cached(AST_EVENT_MWI,
-               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox,
-               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx,
-               AST_EVENT_IE_END);
+       ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx);
 
-       if (event) {
-               new_msgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
-               ast_event_destroy(event);
-       } else
+       msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state(), ast_str_buffer(uniqueid));
+
+       if (msg) {
+               struct stasis_mwi_state *mwi_state = stasis_message_data(msg);
+               new_msgs = mwi_state->new_msgs;
+       } else {
                new_msgs = ast_app_has_voicemail(p->mailbox, NULL);
+       }
 
        return new_msgs;
 }
@@ -3972,6 +3975,7 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
        struct mgcp_endpoint *e;
        struct mgcp_subchannel *sub;
        struct ast_variable *chanvars = NULL;
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
        /*char txident[80];*/
        int i=0, y=0;
@@ -4168,16 +4172,20 @@ static struct mgcp_gateway *build_gateway(char *cat, struct ast_variable *v)
                                ast_copy_string(e->parkinglot, parkinglot, sizeof(e->parkinglot));
                                if (!ast_strlen_zero(e->mailbox)) {
                                        char *mbox, *cntx;
+                                       struct stasis_topic *mailbox_specific_topic;
+
                                        cntx = mbox = ast_strdupa(e->mailbox);
                                        strsep(&cntx, "@");
                                        if (ast_strlen_zero(cntx)) {
                                                cntx = "default";
                                        }
-                                       e->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "MGCP MWI subscription", NULL,
-                                               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox,
-                                               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cntx,
-                                               AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-                                               AST_EVENT_IE_END);
+                                       ast_str_reset(uniqueid);
+                                       ast_str_set(&uniqueid, 0, "%s@%s", mbox, cntx);
+
+                                       maibox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+                                       if (mailbox_specific_topic) {
+                                               e->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, NULL);
+                                       }
                                }
                                snprintf(e->rqnt_ident, sizeof(e->rqnt_ident), "%08lx", ast_random());
                                e->msgstate = -1;
@@ -4516,8 +4524,9 @@ static void destroy_endpoint(struct mgcp_endpoint *e)
                ast_free(s);
        }
 
-       if (e->mwi_event_sub)
-               ast_event_unsubscribe(e->mwi_event_sub);
+       if (e->mwi_event_sub) {
+               e->mwi_event_sub = stasis_unsubscribe(e->mwi_event_sub);
+       }
 
        if (e->chanvars) {
                ast_variables_destroy(e->chanvars);
index 631c1db..28e8d4d 100644 (file)
@@ -294,6 +294,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "sip/include/dialplan_functions.h"
 #include "sip/include/security_events.h"
 #include "asterisk/sip_api.h"
+#include "asterisk/app.h"
 
 /*** DOCUMENTATION
        <application name="SIPDtmfMode" language="en_US">
@@ -1275,7 +1276,7 @@ static int sip_poke_noanswer(const void *data);
 static int sip_poke_peer(struct sip_peer *peer, int force);
 static void sip_poke_all_peers(void);
 static void sip_peer_hold(struct sip_pvt *p, int hold);
-static void mwi_event_cb(const struct ast_event *, void *);
+static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *);
 static void network_change_event_cb(const struct ast_event *, void *);
 static void acl_change_event_cb(const struct ast_event *event, void *userdata);
 static void sip_keepalive_all_peers(void);
@@ -5225,8 +5226,9 @@ static void register_peer_exten(struct sip_peer *peer, int onoff)
 /*! Destroy mailbox subscriptions */
 static void destroy_mailbox(struct sip_mailbox *mailbox)
 {
-       if (mailbox->event_sub)
-               ast_event_unsubscribe(mailbox->event_sub);
+       if (mailbox->event_sub) {
+               mailbox->event_sub = stasis_unsubscribe(mailbox->event_sub);
+       }
        ast_free(mailbox);
 }
 
@@ -16644,11 +16646,16 @@ static void sip_peer_hold(struct sip_pvt *p, int hold)
 }
 
 /*! \brief Receive MWI events that we have subscribed to */
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        struct sip_peer *peer = userdata;
-
-       sip_send_mwi_to_peer(peer, 0);
+       if (stasis_subscription_final_message(sub, msg)) {
+               ao2_cleanup(peer);
+               return;
+       }
+       if (stasis_mwi_state_message() == stasis_message_type(msg)) {
+               sip_send_mwi_to_peer(peer, 0);
+       }
 }
 
 static void network_change_event_subscribe(void)
@@ -24787,16 +24794,9 @@ static int handle_request_notify(struct sip_pvt *p, struct sip_request *req, str
                if (!ast_strlen_zero(mailbox) && !ast_strlen_zero(c)) {
                        char *old = strsep(&c, " ");
                        char *new = strsep(&old, "/");
-                       struct ast_event *event;
 
-                       if ((event = ast_event_new(AST_EVENT_MWI,
-                                                  AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-                                                  AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, "SIP_Remote",
-                                                  AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(new),
-                                                  AST_EVENT_IE_OLDMSGS, AST_EVENT_IE_PLTYPE_UINT, atoi(old),
-                                                  AST_EVENT_IE_END))) {
-                               ast_event_queue_and_cache(event);
-                       }
+                       stasis_publish_mwi_state(mailbox, "SIP_Remote", atoi(new), atoi(old));
+
                        transmit_response(p, "200 OK", req);
                } else {
                        transmit_response(p, "489 Bad event", req);
@@ -27617,16 +27617,20 @@ static int handle_request_publish(struct sip_pvt *p, struct sip_request *req, st
 static void add_peer_mwi_subs(struct sip_peer *peer)
 {
        struct sip_mailbox *mailbox;
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
        AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
-               if (mailbox->event_sub) {
-                       ast_event_unsubscribe(mailbox->event_sub);
-               }
+               struct stasis_topic *mailbox_specific_topic;
+               mailbox->event_sub = stasis_unsubscribe(mailbox->event_sub);
+
+               ast_str_reset(uniqueid);
+               ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default"));
 
-               mailbox->event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "SIP mbox event", peer,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"),
-                       AST_EVENT_IE_END);
+               mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+               if (mailbox_specific_topic) {
+                       ao2_ref(peer, +1);
+                       mailbox->event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, peer);
+               }
        }
 }
 
@@ -28832,19 +28836,24 @@ static int get_cached_mwi(struct sip_peer *peer, int *new, int *old)
 {
        struct sip_mailbox *mailbox;
        int in_cache;
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
        in_cache = 0;
        AST_LIST_TRAVERSE(&peer->mailboxes, mailbox, entry) {
-               struct ast_event *event;
-               event = ast_event_get_cached(AST_EVENT_MWI,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox->mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, S_OR(mailbox->context, "default"),
-                       AST_EVENT_IE_END);
-               if (!event)
+               RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+               struct stasis_mwi_state *mwi_state;
+
+               ast_str_reset(uniqueid);
+               ast_str_set(&uniqueid, 0, "%s@%s", mailbox->mailbox, S_OR(mailbox->context, "default"));
+
+               msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid));
+               if (!msg) {
                        continue;
-               *new += ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
-               *old += ast_event_get_ie_uint(event, AST_EVENT_IE_OLDMSGS);
-               ast_event_destroy(event);
+               }
+
+               mwi_state = stasis_message_data(msg);
+               *new += mwi_state->new_msgs;
+               *old += mwi_state->old_msgs;
                in_cache = 1;
        }
 
index c974cae..6045a09 100644 (file)
@@ -1442,7 +1442,7 @@ struct skinny_line {
        SKINNY_LINE_OPTIONS
        ast_mutex_t lock;
        struct skinny_container *container;
-       struct ast_event_sub *mwi_event_sub; /* Event based MWI */
+       struct stasis_subscription *mwi_event_sub; /* Event based MWI */
        struct skinny_subchannel *activesub;
        AST_LIST_HEAD(, skinny_subchannel) sub;
        AST_LIST_HEAD(, skinny_subline) sublines;
@@ -1611,7 +1611,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s
 static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
 static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
 static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
-static void mwi_event_cb(const struct ast_event *event, void *userdata);
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
 static int skinny_dialer_cb(const void *data);
 static int skinny_reload(void);
 
@@ -2261,7 +2261,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s)
                                manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Registered\r\n", l->name, d->name);
                                register_exten(l);
                                /* initialize MWI on line and device */
-                               mwi_event_cb(0, l);
+                               mwi_event_cb(l, NULL, NULL, NULL);
                                AST_LIST_TRAVERSE(&l->sublines, subline, list) {
                                        ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
                                }
@@ -3507,7 +3507,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data
        send_callinfo(sub);
 }
 
-static void mwi_event_cb(const struct ast_event *event, void *userdata)
+static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        struct skinny_line *l = userdata;
        struct skinny_device *d = l->device;
@@ -3518,8 +3518,9 @@ static void mwi_event_cb(const struct ast_event *event, void *userdata)
                return;
        }
 
-       if (event) {
-               l->newmsgs = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+       if (msg && stasis_mwi_state_message() == stasis_message_type(msg)) {
+               struct stasis_mwi_state *mwi_state = stasis_message_data(msg);
+               l->newmsgs = mwi_state->new_msgs;
        }
 
        if (l->newmsgs) {
@@ -8250,16 +8251,22 @@ static struct skinny_line *config_line(const char *lname, struct ast_variable *v
 
        if (!ast_strlen_zero(l->mailbox)) {
                char *cfg_mailbox, *cfg_context;
+               struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+               struct stasis_topic *mailbox_specific_topic;
+
                cfg_context = cfg_mailbox = ast_strdupa(l->mailbox);
                ast_verb(3, "Setting mailbox '%s' on line %s\n", cfg_mailbox, l->name);
                strsep(&cfg_context, "@");
-               if (ast_strlen_zero(cfg_context))
-                        cfg_context = "default";
-               l->mwi_event_sub = ast_event_subscribe(AST_EVENT_MWI, mwi_event_cb, "skinny MWI subsciption", l,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, cfg_mailbox,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, cfg_context,
-                       AST_EVENT_IE_NEWMSGS, AST_EVENT_IE_PLTYPE_EXISTS,
-                       AST_EVENT_IE_END);
+               if (ast_strlen_zero(cfg_context)) {
+                       cfg_context = "default";
+               }
+
+               ast_str_set(&uniqueid, 0, "%s@%s", cfg_mailbox, cfg_context);
+
+               mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+               if (mailbox_specific_topic) {
+                       l->mwi_event_sub = stasis_subscribe(mailbox_specific_topic, mwi_event_cb, l);
+               }
        }
 
        if (!ast_strlen_zero(vmexten) && ast_strlen_zero(l->vmexten)) {
@@ -8694,8 +8701,9 @@ static int unload_module(void)
                                }
                                ast_mutex_unlock(&sub->lock);
                        }
-                       if (l->mwi_event_sub)
-                               ast_event_unsubscribe(l->mwi_event_sub);
+                       if (l->mwi_event_sub) {
+                               l->mwi_event_sub = stasis_unsubscribe(l->mwi_event_sub);
+                       }
                        ast_mutex_unlock(&l->lock);
                        manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: Skinny\r\nPeer: Skinny/%s@%s\r\nPeerStatus: Unregistered\r\n", l->name, d->name);
                        unregister_exten(l);
index b4ec9e4..130549c 100644 (file)
@@ -5500,23 +5500,24 @@ static int unistim_sendtext(struct ast_channel *ast, const char *text)
 /*--- unistim_send_mwi_to_peer: Send message waiting indication ---*/
 static int unistim_send_mwi_to_peer(struct unistim_line *peer, unsigned int tick)
 {
-       struct ast_event *event;
        int new;
        char *mailbox, *context;
+       RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 
        context = mailbox = ast_strdupa(peer->mailbox);
        strsep(&context, "@");
        if (ast_strlen_zero(context)) {
                context = "default";
        }
-       event = ast_event_get_cached(AST_EVENT_MWI,
-               AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mailbox,
-               AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, context,
-               AST_EVENT_IE_END);
 
-       if (event) {
-               new = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
-               ast_event_destroy(event);
+       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+       msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid));
+
+       if (msg) {
+               struct stasis_mwi_state *mwi_state = stasis_message_data(msg);
+               new = mwi_state->new_msgs;
        } else { /* Fall back on checking the mailbox directly */
                new = ast_app_has_voicemail(peer->mailbox, "INBOX");
        }
index e01cead..e3b9b3f 100644 (file)
@@ -8752,23 +8752,30 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm
  *
  * \return Nothing
  */
-static void sig_pri_mwi_event_cb(const struct ast_event *event, void *userdata)
+static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        struct sig_pri_span *pri = userdata;
        const char *mbox_context;
        const char *mbox_number;
        int num_messages;
        int idx;
+       struct stasis_mwi_state *mwi_state;
 
-       mbox_number = ast_event_get_ie_str(event, AST_EVENT_IE_MAILBOX);
+       if (stasis_mwi_state_message() != stasis_message_type(msg)) {
+               return;
+       }
+
+       mwi_state = stasis_message_data(msg);
+
+       mbox_number = mwi_state->mailbox;
        if (ast_strlen_zero(mbox_number)) {
                return;
        }
-       mbox_context = ast_event_get_ie_str(event, AST_EVENT_IE_CONTEXT);
+       mbox_context = mwi_state->context;
        if (ast_strlen_zero(mbox_context)) {
                return;
        }
-       num_messages = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+       num_messages = mwi_state->new_msgs;
 
        for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) {
                if (!pri->mbox[idx].sub) {
@@ -8799,27 +8806,28 @@ static void sig_pri_mwi_event_cb(const struct ast_event *event, void *userdata)
 static void sig_pri_mwi_cache_update(struct sig_pri_span *pri)
 {
        int idx;
-       int num_messages;
-       struct ast_event *event;
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+       struct stasis_mwi_state *mwi_state;
 
        for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) {
+               RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
                if (!pri->mbox[idx].sub) {
                        /* Mailbox slot is empty */
                        continue;
                }
 
-               event = ast_event_get_cached(AST_EVENT_MWI,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, pri->mbox[idx].number,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, pri->mbox[idx].context,
-                       AST_EVENT_IE_END);
-               if (!event) {
+               ast_str_reset(uniqueid);
+               ast_str_set(&uniqueid, 0, "%s@%s", pri->mbox[idx].number, pri->mbox[idx].context);
+
+               msg = stasis_cache_get(stasis_mwi_topic_cached(), stasis_mwi_state_message(), ast_str_buffer(uniqueid));
+               if (!msg) {
                        /* No cached event for this mailbox. */
                        continue;
                }
-               num_messages = ast_event_get_ie_uint(event, AST_EVENT_IE_NEWMSGS);
+
+               mwi_state = stasis_message_data(msg);
                sig_pri_send_mwi_indication(pri, pri->mbox[idx].vm_number, pri->mbox[idx].number,
-                       pri->mbox[idx].context, num_messages);
-               ast_event_destroy(event);
+                       pri->mbox[idx].context, mwi_state->new_msgs);
        }
 }
 #endif /* defined(HAVE_PRI_MWI) */
@@ -8841,7 +8849,7 @@ void sig_pri_stop_pri(struct sig_pri_span *pri)
 #if defined(HAVE_PRI_MWI)
        for (idx = 0; idx < ARRAY_LEN(pri->mbox); ++idx) {
                if (pri->mbox[idx].sub) {
-                       pri->mbox[idx].sub = ast_event_unsubscribe(pri->mbox[idx].sub);
+                       pri->mbox[idx].sub = stasis_unsubscribe(pri->mbox[idx].sub);
                }
        }
 #endif /* defined(HAVE_PRI_MWI) */
@@ -8905,13 +8913,14 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
        char *saveptr;
        char *prev_vm_number;
        struct ast_str *mwi_description = ast_str_alloca(64);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
 #endif /* defined(HAVE_PRI_MWI) */
 
 #if defined(HAVE_PRI_MWI)
        /* Prepare the mbox[] for use. */
        for (i = 0; i < ARRAY_LEN(pri->mbox); ++i) {
                if (pri->mbox[i].sub) {
-                       pri->mbox[i].sub = ast_event_unsubscribe(pri->mbox[i].sub);
+                       pri->mbox[i].sub = stasis_unsubscribe(pri->mbox[i].sub);
                }
        }
 #endif /* defined(HAVE_PRI_MWI) */
@@ -8951,6 +8960,7 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
        for (i = 0; i < ARRAY_LEN(pri->mbox); ++i) {
                char *mbox_number;
                char *mbox_context;
+               struct stasis_topic *mailbox_specific_topic;
 
                mbox_number = strsep(&saveptr, ",");
                if (!mbox_number) {
@@ -8976,13 +8986,17 @@ int sig_pri_start_pri(struct sig_pri_span *pri)
                /* Fill the mbox[] element. */
                pri->mbox[i].number = mbox_number;
                pri->mbox[i].context = mbox_context;
+
+               ast_str_reset(uniqueid);
+               ast_str_set(&uniqueid, 0, "%s@%s", mbox_number, mbox_context);
+
                ast_str_set(&mwi_description, -1, "%s span %d[%d] MWI mailbox %s@%s",
                        sig_pri_cc_type_name, pri->span, i, mbox_number, mbox_context);
-               pri->mbox[i].sub = ast_event_subscribe(AST_EVENT_MWI, sig_pri_mwi_event_cb,
-                       ast_str_buffer(mwi_description), pri,
-                       AST_EVENT_IE_MAILBOX, AST_EVENT_IE_PLTYPE_STR, mbox_number,
-                       AST_EVENT_IE_CONTEXT, AST_EVENT_IE_PLTYPE_STR, mbox_context,
-                       AST_EVENT_IE_END);
+
+               mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+               if (mailbox_specific_topic) {
+                       pri->mbox[i].sub = stasis_subscribe(mailbox_specific_topic, sig_pri_mwi_event_cb, pri);
+               }
                if (!pri->mbox[i].sub) {
                        ast_log(LOG_ERROR, "%s span %d could not subscribe to MWI events for %s@%s.",
                                sig_pri_cc_type_name, pri->span, mbox_number, mbox_context);
index 4de9077..db05286 100644 (file)
@@ -405,7 +405,7 @@ struct sig_pri_mbox {
         * \brief MWI mailbox event subscription.
         * \note NULL if mailbox not configured.
         */
-       struct ast_event_sub *sub;
+       struct stasis_subscription *sub;
        /*! \brief Mailbox number */
        const char *number;
        /*! \brief Mailbox context. */
index e5177bd..6eb2f29 100644 (file)
@@ -1262,7 +1262,7 @@ struct sip_pkt {
  */
 struct sip_mailbox {
        /*! Associated MWI subscription */
-       struct ast_event_sub *event_sub;
+       struct stasis_subscription *event_sub;
        AST_LIST_ENTRY(sip_mailbox) entry;
        unsigned int delme:1;
        char *context;
index cdc40e7..0505786 100644 (file)
@@ -28,6 +28,7 @@
 #include "asterisk/threadstorage.h"
 #include "asterisk/file.h"
 #include "asterisk/linkedlists.h"
+#include "asterisk/utils.h"
 
 struct ast_flags64;
 
@@ -1086,6 +1087,96 @@ void ast_safe_fork_cleanup(void);
  */
 int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen defunit);
 
+/*!
+ * \brief Publish a MWI state update via stasis
+ * \param[in] uniqueid A unique identifier for this mailbox (usually mailbox@context)
+ * \param[in] mailbox The number identifying this mailbox
+ * \param[in] context The context this mailbox resides in
+ * \param[in] new_msgs The number of new messages in this mailbox
+ * \param[in] old_msgs The number of old messages in this mailbox
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+#define stasis_publish_mwi_state(mailbox, context, new_msgs, old_msgs) \
+       stasis_publish_mwi_state_full(mailbox, context, new_msgs, old_msgs, NULL)
+
+/*!
+ * \brief Publish a MWI state update via stasis with EID
+ * \param[in] uniqueid A unique identifier for this mailbox (usually mailbox@context)
+ * \param[in] mailbox The number identifying this mailbox
+ * \param[in] context The context this mailbox resides in
+ * \param[in] new_msgs The number of new messages in this mailbox
+ * \param[in] old_msgs The number of old messages in this mailbox
+ * \param[in] eid The EID of the server that originally published the message
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int stasis_publish_mwi_state_full(
+                       const char *mailbox,
+                       const char *context,
+                       int new_msgs,
+                       int old_msgs,
+                       struct ast_eid *eid);
+
+/*!
+ * \brief The structure that contains MWI state
+ * \since 12
+ */
+struct stasis_mwi_state {
+       AST_DECLARE_STRING_FIELDS(
+               AST_STRING_FIELD(uniqueid);     /*!< Unique identifier for this mailbox/context */
+               AST_STRING_FIELD(mailbox);      /*!< Mailbox for this event */
+               AST_STRING_FIELD(context);      /*!< Context that this mailbox belongs to */
+       );
+       int new_msgs;                           /*!< The current number of new messages for this mailbox */
+       int old_msgs;                           /*!< The current number of old messages for this mailbox */
+       struct ast_eid eid;                     /*!< The EID of the server where this message originated */
+};
+
+/*!
+ * \brief Get the Stasis topic for MWI messages
+ * \retval The topic structure for MWI messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_mwi_topic_all(void);
+
+/*!
+ * \brief Get the Stasis topic for MWI messages on a unique ID
+ * \param uniqueid The unique id for which to get the topic
+ * \retval The topic structure for MWI messages for a given uniqueid
+ * \retval NULL if it failed to be found or allocated
+ * \since 12
+ */
+struct stasis_topic *stasis_mwi_topic(const char *uniqueid);
+
+/*!
+ * \brief Get the Stasis caching topic for MWI messages
+ * \retval The caching topic structure for MWI messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_caching_topic *stasis_mwi_topic_cached(void);
+
+/*!
+ * \brief Get the Stasis message type for MWI messages
+ * \retval The message type structure for MWI messages
+ * \retval NULL if it has not been allocated
+ * \since 12
+ */
+struct stasis_message_type *stasis_mwi_state_message(void);
+
+/*!
+ * \brief Initialize the application core
+ * \retval 0 Success
+ * \retval -1 Failure
+ * \since 12
+ */
+int app_init(void);
+
+#define AST_MAX_MAILBOX_UNIQUEID (AST_MAX_EXTENSION + AST_MAX_CONTEXT + 2)
 #if defined(__cplusplus) || defined(c_plusplus)
 }
 #endif
index 9a5f753..f0d73fd 100644 (file)
@@ -377,6 +377,27 @@ struct stasis_subscription_change {
  */
 struct stasis_message_type *stasis_subscription_change(void);
 
+/*!
+ * \brief Pool for topic aggregation
+ */
+struct stasis_topic_pool;
+
+/*!
+ * \brief Create a topic pool that routes messages from dynamically generated topics to the given topic
+ * \param pooled_topic Topic to which messages will be routed
+ * \retval the new stasis_topic_pool or NULL on failure
+ */
+struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic);
+
+/*!
+ * \brief Find or create a topic in the pool
+ * \param pool Pool for which to get the topic
+ * \param topic_name Name of the topic to get
+ * \retval The already stored or newly allocated topic
+ * \retval NULL if the topic was not found and could not be allocated
+ */
+struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name);
+
 /*! @} */
 
 /*! @{ */
index 07abb6e..6833b6c 100644 (file)
@@ -134,7 +134,7 @@ struct ast_xmpp_client {
         pthread_t thread;
        int timeout;
        unsigned int reconnect:1; /*!< Reconnect this client */
-       struct ast_event_sub *mwi_sub; /*!< If distributing event information the MWI subscription */
+       struct stasis_subscription *mwi_sub; /*!< If distributing event information the MWI subscription */
        struct ast_event_sub *device_state_sub; /*!< If distributing event information the device state subscription */
 };
 
index 6db65f3..dca7484 100644 (file)
@@ -66,6 +66,10 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/threadstorage.h"
 #include "asterisk/test.h"
 #include "asterisk/module.h"
+#include "asterisk/astobj2.h"
+#include "asterisk/stasis.h"
+
+#define MWI_TOPIC_BUCKETS 57
 
 AST_THREADSTORAGE_PUBLIC(ast_str_thread_global_buf);
 
@@ -78,6 +82,11 @@ struct zombie {
 
 static AST_LIST_HEAD_STATIC(zombies, zombie);
 
+static struct stasis_topic *mwi_topic_all;
+static struct stasis_caching_topic *mwi_topic_cached;
+static struct stasis_message_type *mwi_message_type;
+static struct stasis_topic_pool *mwi_topic_pool;
+
 static void *shaun_of_the_dead(void *data)
 {
        struct zombie *cur;
@@ -2632,3 +2641,123 @@ int ast_app_parse_timelen(const char *timestr, int *result, enum ast_timelen uni
        return 0;
 }
 
+
+
+static void mwi_state_dtor(void *obj)
+{
+       struct stasis_mwi_state *mwi_state = obj;
+       ast_string_field_free_memory(mwi_state);
+}
+
+struct stasis_topic *stasis_mwi_topic_all(void)
+{
+       return mwi_topic_all;
+}
+
+struct stasis_caching_topic *stasis_mwi_topic_cached(void)
+{
+       return mwi_topic_cached;
+}
+
+struct stasis_message_type *stasis_mwi_state_message(void)
+{
+       return mwi_message_type;
+}
+
+struct stasis_topic *stasis_mwi_topic(const char *uniqueid)
+{
+       return stasis_topic_pool_get_topic(mwi_topic_pool, uniqueid);
+}
+
+int stasis_publish_mwi_state_full(
+                       const char *mailbox,
+                       const char *context,
+                       int new_msgs,
+                       int old_msgs,
+                       struct ast_eid *eid)
+{
+       RAII_VAR(struct stasis_mwi_state *, mwi_state, NULL, ao2_cleanup);
+       RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
+       struct ast_str *uniqueid = ast_str_alloca(AST_MAX_MAILBOX_UNIQUEID);
+       struct stasis_topic *mailbox_specific_topic;
+
+       ast_assert(!ast_strlen_zero(mailbox));
+       ast_assert(!ast_strlen_zero(context));
+
+       ast_str_set(&uniqueid, 0, "%s@%s", mailbox, context);
+
+       mwi_state = ao2_alloc(sizeof(*mwi_state), mwi_state_dtor);
+       if (ast_string_field_init(mwi_state, 256)) {
+               return -1;
+       }
+
+       ast_string_field_set(mwi_state, uniqueid, ast_str_buffer(uniqueid));
+       ast_string_field_set(mwi_state, mailbox, mailbox);
+       ast_string_field_set(mwi_state, context, context);
+       mwi_state->new_msgs = new_msgs;
+       mwi_state->old_msgs = old_msgs;
+       if (eid) {
+               mwi_state->eid = *eid;
+       } else {
+               ast_set_default_eid(&mwi_state->eid);
+       }
+
+       message = stasis_message_create(stasis_mwi_state_message(), mwi_state);
+
+       mailbox_specific_topic = stasis_mwi_topic(ast_str_buffer(uniqueid));
+       if (!mailbox_specific_topic) {
+               return -1;
+       }
+
+       stasis_publish(mailbox_specific_topic, message);
+
+       return 0;
+}
+
+static const char *mwi_state_get_id(struct stasis_message *message)
+{
+       if (stasis_mwi_state_message() == stasis_message_type(message)) {
+               struct stasis_mwi_state *mwi_state = stasis_message_data(message);
+               return mwi_state->uniqueid;
+       } else if (stasis_subscription_change() == stasis_message_type(message)) {
+               struct stasis_subscription_change *change = stasis_message_data(message);
+               return change->uniqueid;
+       }
+
+       return NULL;
+}
+
+static void app_exit(void)
+{
+       ao2_cleanup(mwi_topic_all);
+       mwi_topic_all = NULL;
+       mwi_topic_cached = stasis_caching_unsubscribe(mwi_topic_cached);
+       ao2_cleanup(mwi_message_type);
+       mwi_message_type = NULL;
+       ao2_cleanup(mwi_topic_pool);
+       mwi_topic_pool = NULL;
+}
+
+int app_init(void)
+{
+       mwi_topic_all = stasis_topic_create("stasis_mwi_topic");
+       if (!mwi_topic_all) {
+               return -1;
+       }
+       mwi_topic_cached = stasis_caching_topic_create(mwi_topic_all, mwi_state_get_id);
+       if (!mwi_topic_cached) {
+               return -1;
+       }
+       mwi_message_type = stasis_message_type_create("stasis_mwi_state");
+       if (!mwi_message_type) {
+               return -1;
+       }
+       mwi_topic_pool = stasis_topic_pool_create(mwi_topic_all);
+       if (!mwi_topic_pool) {
+               return -1;
+       }
+
+       ast_register_atexit(app_exit);
+       return 0;
+}
+
index 4e5e58c..3a0e87c 100644 (file)
@@ -4178,6 +4178,11 @@ int main(int argc, char *argv[])
 
        aco_init();
 
+       if (app_init()) {
+               printf("App core initialization failed.\n%s", term_quit());
+               exit(1);
+       }
+
        if (astdb_init()) {
                printf("%s", term_quit());
                exit(1);
index 3289eda..3f8319b 100644 (file)
@@ -8637,8 +8637,7 @@ static void channels_shutdown(void)
        __channel_varset = NULL;
        ao2_cleanup(__channel_topic_all);
        __channel_topic_all = NULL;
-       stasis_caching_unsubscribe(__channel_topic_all_cached);
-       __channel_topic_all_cached = NULL;
+       __channel_topic_all_cached = stasis_caching_unsubscribe(__channel_topic_all_cached);
        ast_data_unregister(NULL);
        ast_cli_unregister_multiple(cli_channel, ARRAY_LEN(cli_channel));
        if (channels) {
index a4d44b8..2ad0caf 100644 (file)
@@ -41,6 +41,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 /*! Initial size of the subscribers list. */
 #define INITIAL_SUBSCRIBERS_MAX 4
 
+/*! The number of buckets to use for topic pools */
+#define TOPIC_POOL_BUCKETS 57
+
 /*! Threadpool for dispatching notifications to subscribers */
 static struct ast_threadpool *pool;
 
@@ -470,6 +473,96 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
        stasis_publish(topic, msg);
 }
 
+struct topic_pool_entry {
+       struct stasis_subscription *forward;
+       struct stasis_topic *topic;
+};
+
+static void topic_pool_entry_dtor(void *obj)
+{
+       struct topic_pool_entry *entry = obj;
+       entry->forward = stasis_unsubscribe(entry->forward);
+       ao2_cleanup(entry->topic);
+       entry->topic = NULL;
+}
+
+static struct topic_pool_entry *topic_pool_entry_alloc(void)
+{
+       return ao2_alloc(sizeof(struct topic_pool_entry), topic_pool_entry_dtor);
+}
+
+struct stasis_topic_pool {
+       struct ao2_container *pool_container;
+       struct stasis_topic *pool_topic;
+};
+
+static void topic_pool_dtor(void *obj)
+{
+       struct stasis_topic_pool *pool = obj;
+       ao2_cleanup(pool->pool_container);
+       pool->pool_container = NULL;
+       ao2_cleanup(pool->pool_topic);
+       pool->pool_topic = NULL;
+}
+
+static int topic_pool_entry_hash(const void *obj, const int flags)
+{
+       const char *topic_name= (flags & OBJ_KEY) ? obj : stasis_topic_name(((struct topic_pool_entry*) obj)->topic);
+       return ast_str_case_hash(topic_name);
+}
+
+static int topic_pool_entry_cmp(void *obj, void *arg, int flags)
+{
+       struct topic_pool_entry *opt1 = obj, *opt2 = arg;
+       const char *topic_name = (flags & OBJ_KEY) ? arg : stasis_topic_name(opt2->topic);
+       return strcasecmp(stasis_topic_name(opt1->topic), topic_name) ? 0 : CMP_MATCH | CMP_STOP;
+}
+
+struct stasis_topic_pool *stasis_topic_pool_create(struct stasis_topic *pooled_topic)
+{
+       RAII_VAR(struct stasis_topic_pool *, pool, ao2_alloc(sizeof(*pool), topic_pool_dtor), ao2_cleanup);
+       if (!pool) {
+               return NULL;
+       }
+       pool->pool_container = ao2_container_alloc(TOPIC_POOL_BUCKETS, topic_pool_entry_hash, topic_pool_entry_cmp);
+       ao2_ref(pooled_topic, +1);
+       pool->pool_topic = pooled_topic;
+
+       ao2_ref(pool, +1);
+       return pool;
+}
+
+struct stasis_topic *stasis_topic_pool_get_topic(struct stasis_topic_pool *pool, const char *topic_name)
+{
+       RAII_VAR(struct topic_pool_entry *, topic_pool_entry, NULL, ao2_cleanup);
+       SCOPED_AO2LOCK(topic_container_lock, pool->pool_container);
+       topic_pool_entry = ao2_find(pool->pool_container, topic_name, OBJ_KEY | OBJ_NOLOCK);
+
+       if (topic_pool_entry) {
+               return topic_pool_entry->topic;
+       }
+
+       topic_pool_entry = topic_pool_entry_alloc();
+
+       if (!topic_pool_entry) {
+               return NULL;
+       }
+
+       topic_pool_entry->topic = stasis_topic_create(topic_name);
+       if (!topic_pool_entry->topic) {
+               return NULL;
+       }
+
+       topic_pool_entry->forward = stasis_forward_all(topic_pool_entry->topic, pool->pool_topic);
+       if (!topic_pool_entry->forward) {
+               return NULL;
+       }
+
+       ao2_link_flags(pool->pool_container, topic_pool_entry, OBJ_NOLOCK);
+
+       return topic_pool_entry->topic;
+}
+
 /*! \brief Cleanup function */
 static void stasis_exit(void)
 {
index e8e7905..1f9ddba 100644 (file)
@@ -373,7 +373,7 @@ static void aji_pubsub_purge_nodes(struct aji_client *client,
 static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
        const char *context, const char *oldmsgs, const char *newmsgs);
 static void aji_devstate_cb(const struct ast_event *ast_event, void *data);
-static void aji_mwi_cb(const struct ast_event *ast_event, void *data);
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
 static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
                                       const char *event_type, unsigned int cachable);
 /* No transports in this version */
@@ -410,7 +410,7 @@ static char *app_ajileave = "JabberLeave";
 
 static struct aji_client_container clients;
 static struct aji_capabilities *capabilities = NULL;
-static struct ast_event_sub *mwi_sub = NULL;
+static struct stasis_subscription *mwi_sub = NULL;
 static struct ast_event_sub *device_state_sub = NULL;
 static ast_cond_t message_received_condition;
 static ast_mutex_t messagelock;
@@ -3240,30 +3240,33 @@ int ast_aji_disconnect(struct aji_client *client)
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void aji_mwi_cb(const struct ast_event *ast_event, void *data)
+static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        const char *mailbox;
        const char *context;
        char oldmsgs[10];
        char newmsgs[10];
-       struct aji_client *client;
-       if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
-       {
+       struct aji_client *client = data;
+       struct stasis_mwi_state *mwi_state;
+
+       if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state() != stasis_message_type(msg)) {
+               return;
+       }
+
+       mwi_state = stasis_message_data(msg);
+
+       if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) {
                /* If the event didn't originate from this server, don't send it back out. */
-               ast_debug(1, "Returning here\n");
                return;
        }
 
-       client = ASTOBJ_REF((struct aji_client *) data);
-       mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX);
-       context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT);
+       mailbox = mwi_state->mailbox;
+       context = mwi_state->context;
        snprintf(oldmsgs, sizeof(oldmsgs), "%d",
-               ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS));
+               mwi_state->old_msgs);
        snprintf(newmsgs, sizeof(newmsgs), "%d",
-               ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS));
+               mwi_state->new_msgs);
        aji_publish_mwi(client, mailbox, context, oldmsgs, newmsgs);
-       ASTOBJ_UNREF(client, ast_aji_client_destroy);
-
 }
 /*!
  * \brief Callback function for device state events
@@ -3300,8 +3303,7 @@ static void aji_devstate_cb(const struct ast_event *ast_event, void *data)
 static void aji_init_event_distribution(struct aji_client *client)
 {
        if (!mwi_sub) {
-               mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_mwi_cb, "aji_mwi_subscription",
-                       client, AST_EVENT_IE_END);
+               mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), aji_mwi_cb, client);
        }
        if (!device_state_sub) {
                if (ast_enable_distributed_devstate()) {
@@ -3364,14 +3366,10 @@ static int aji_handle_pubsub_event(void *data, ikspak *pak)
                context = strsep(&item_id, "@");
                sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs);
                sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs);
-               if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX,
-                       AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT,
-                       AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS,
-                       AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS,
-                       AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW,
-                       &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) {
-                       return IKS_FILTER_EAT;
-               }
+
+               stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid);
+
+               return IKS_FILTER_EAT;
        } else {
                ast_debug(1, "Don't know how to handle PubSub event of type %s\n",
                        iks_name(item_content));
@@ -4771,7 +4769,7 @@ static int unload_module(void)
        ast_manager_unregister("JabberSend");
        ast_custom_function_unregister(&jabberstatus_function);
        if (mwi_sub) {
-               ast_event_unsubscribe(mwi_sub);
+               mwi_sub = stasis_unsubscribe(mwi_sub);
        }
        if (device_state_sub) {
                ast_event_unsubscribe(device_state_sub);
index f2f200c..1901aa2 100644 (file)
@@ -1319,24 +1319,30 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con
  * \param data void pointer to ast_client structure
  * \return void
  */
-static void xmpp_pubsub_mwi_cb(const struct ast_event *ast_event, void *data)
+static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
 {
        struct ast_xmpp_client *client = data;
        const char *mailbox, *context;
        char oldmsgs[10], newmsgs[10];
+       struct stasis_mwi_state *mwi_state;
 
-       if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID))) {
+       if (!stasis_subscription_is_subscribed(sub) || stasis_mwi_state_message() != stasis_message_type(msg)) {
+               return;
+       }
+
+       mwi_state = stasis_message_data(msg);
+
+       if (ast_eid_cmp(&ast_eid_default, &mwi_state->eid)) {
                /* If the event didn't originate from this server, don't send it back out. */
-               ast_debug(1, "Returning here\n");
                return;
        }
 
-       mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX);
-       context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT);
+       mailbox = mwi_state->mailbox;
+       context = mwi_state->context;
        snprintf(oldmsgs, sizeof(oldmsgs), "%d",
-                ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS));
+                mwi_state->old_msgs);
        snprintf(newmsgs, sizeof(newmsgs), "%d",
-                ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS));
+                mwi_state->new_msgs);
        xmpp_pubsub_publish_mwi(client, mailbox, context, oldmsgs, newmsgs);
 }
 
@@ -1479,14 +1485,10 @@ static int xmpp_pubsub_handle_event(void *data, ikspak *pak)
                context = strsep(&item_id, "@");
                sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs);
                sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs);
-               if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX,
-                                           AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT,
-                                           AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS,
-                                           AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS,
-                                           AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW,
-                                           &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) {
-                       return IKS_FILTER_EAT;
-               }
+
+               stasis_publish_mwi_state_full(item_id, context, newmsgs, oldmsgs, &pubsub_eid);
+
+               return IKS_FILTER_EAT;
        } else {
                ast_debug(1, "Don't know how to handle PubSub event of type %s\n",
                          iks_name(item_content));
@@ -1587,20 +1589,17 @@ static void xmpp_init_event_distribution(struct ast_xmpp_client *client)
        xmpp_pubsub_unsubscribe(client, "device_state");
        xmpp_pubsub_unsubscribe(client, "message_waiting");
 
-       if (!(client->mwi_sub = ast_event_subscribe(AST_EVENT_MWI, xmpp_pubsub_mwi_cb, "xmpp_pubsub_mwi_subscription",
-                                                   client, AST_EVENT_IE_END))) {
+       if (!(client->mwi_sub = stasis_subscribe(stasis_mwi_topic_all(), xmpp_pubsub_mwi_cb, client))) {
                return;
        }
 
        if (ast_enable_distributed_devstate()) {
                return;
        }
-       
 
        if (!(client->device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
                                                             xmpp_pubsub_devstate_cb, "xmpp_pubsub_devstate_subscription", client, AST_EVENT_IE_END))) {
-               ast_event_unsubscribe(client->mwi_sub);
-               client->mwi_sub = NULL;
+               client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
                return;
        }
 
@@ -3524,8 +3523,7 @@ int ast_xmpp_client_disconnect(struct ast_xmpp_client *client)
        }
 
        if (client->mwi_sub) {
-               ast_event_unsubscribe(client->mwi_sub);
-               client->mwi_sub = NULL;
+               client->mwi_sub = stasis_unsubscribe(client->mwi_sub);
                xmpp_pubsub_unsubscribe(client, "message_waiting");
        }