Add distributed devicestate via the XMPP protocol.
authorTilghman Lesher <tilghman@meg.abyt.es>
Tue, 15 Jun 2010 17:06:23 +0000 (17:06 +0000)
committerTilghman Lesher <tilghman@meg.abyt.es>
Tue, 15 Jun 2010 17:06:23 +0000 (17:06 +0000)
(closes issue #15757)
 Reported by: Marquis
 Patches:
       distributed_devstate-XMPP.txt uploaded by lmadsen (license 10)
 Tested by: Marquis, lmadsen, marcelloceschia

Review: https://reviewboard.asterisk.org/r/351/

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@270519 65c4cc65-6c06-0410-ace0-fbb531ad65f3

CHANGES
configs/jabber.conf.sample
doc/distributed_devstate-XMPP.txt [new file with mode: 0644]
include/asterisk/jabber.h
res/res_jabber.c

diff --git a/CHANGES b/CHANGES
index 66e023a..5e69680 100644 (file)
--- a/CHANGES
+++ b/CHANGES
@@ -496,6 +496,8 @@ Miscellaneous
    initialization order.
  * The Realtime dialplan switch now caches entries for 1 second.  This provides a
    significant increase in performance (about 3X) for installations using this switchtype.
+ * Distributed devicestate now supports the use of the XMPP protocol, in addition to
+   AIS.  For more information, please see doc/distributed_devstate-XMPP.txt
 
 CLI Changes
 -----------
index 5cbd0c0..d02a7b3 100644 (file)
@@ -1,29 +1,41 @@
 [general]
-;debug=yes                             ;;Turn on debugging by default.
-;autoprune=yes                         ;;Auto remove users from buddy list. Depending on your
-                                       ;;setup (ie, using your personal Gtalk account for a test)
-                                       ;;you might lose your contacts list. Default is 'no'.
-;autoregister=yes                      ;;Auto register users from buddy list.
-;auth_policy=accept                    ;;Auto accept users' subscription requests (default).
-                                                       ;;Set to deny for auto denial.
+;debug=yes                              ;;Turn on debugging by default.
+;autoprune=yes                          ;;Auto remove users from buddy list. Depending on your
+                                        ;;setup (ie, using your personal Gtalk account for a test)
+                                        ;;you might lose your contacts list. Default is 'no'.
+;autoregister=yes                       ;;Auto register users from buddy list.
+;collection_nodes=yes                   ;;Enable support for XEP-0248 for use with
+                                        ;;distributed device state.  Default is 'no'.
+;pubsub_autocreate=yes                  ;;Whether or not the PubSub server supports/is using
+                                        ;;auto-create for nodes.  If it is, we have to
+                                        ;;explicitly pre-create nodes before publishing them.
+                                        ;;Default is 'no'.
+;auth_policy=accept                     ;;Auto accept users' subscription requests (default).
+                                        ;;Set to deny for auto denial.
 
 
-;[asterisk]                            ;;label
-;type=client                           ;;Client or Component connection
-;serverhost=astjab.org                 ;;Route to server for example,
-                                       ;;      talk.google.com
-;username=asterisk@astjab.org/asterisk ;;Username with optional resource.
-;secret=blah                           ;;Password
-;priority=1                            ;;Resource priority
-;port=5222                             ;;Port to use defaults to 5222
-;usetls=yes                            ;;Use tls or not
-;usesasl=yes                           ;;Use sasl or not
-;buddy=mogorman@astjab.org             ;;Manual addition of buddy to list.
-;status=available                      ;;One of: chat, available, away,
-                                       ;;      xaway, or dnd
-;statusmessage="I am available"                ;;Have custom status message for
-                                       ;;Asterisk.
-;timeout=5                             ;;Timeout (in seconds) on the message stack, defaults to 5.
-                                       ;;Messages stored longer than this value will be deleted by Asterisk.
-                                       ;;This option applies to incoming messages only, which are intended to
-                                       ;;be processed by the JABBER_RECEIVE dialplan function.
+;[asterisk]                             ;;label
+;type=client                            ;;Client or Component connection
+;serverhost=astjab.org                  ;;Route to server for example,
+                                        ;;     talk.google.com
+;pubsub_node=pubsub.astjab.org          ;;Node to use for publishing events via PubSub
+;username=asterisk@astjab.org/asterisk  ;;Username with optional resource.
+;secret=blah                            ;;Password
+;priority=1                             ;;Resource priority
+;port=5222                              ;;Port to use defaults to 5222
+;usetls=yes                             ;;Use tls or not
+;usesasl=yes                            ;;Use sasl or not
+;buddy=mogorman@astjab.org              ;;Manual addition of buddy to list.
+                                        ;;For distributed events, these buddies are
+                                        ;;automatically added in the whitelist as
+                                        ;;'owners' of the node(s).
+;distribute_events=yes                  ;;Whether or not to distribute events using
+                                        ;;this connection.  Default is 'no'.
+;status=available                       ;;One of: chat, available, away,
+                                        ;;     xaway, or dnd
+;statusmessage="I am available"         ;;Have custom status message for
+                                        ;;Asterisk.
+;timeout=5                              ;;Timeout (in seconds) on the message stack, defaults to 5.
+                                        ;;Messages stored longer than this value will be deleted by Asterisk.
+                                        ;;This option applies to incoming messages only, which are intended to
+                                        ;;be processed by the JABBER_RECEIVE dialplan function.
diff --git a/doc/distributed_devstate-XMPP.txt b/doc/distributed_devstate-XMPP.txt
new file mode 100644 (file)
index 0000000..1a8c143
--- /dev/null
@@ -0,0 +1,433 @@
+===============================================================================
+===
+=== XMPP PubSub Distributed Device State
+===
+=== Copyright (C) 2010, Digium, Inc.
+=== Leif Madsen <lmadsen@digium.com>
+===
+===============================================================================
+
+-------------------------------------------------------------------------------
+--- INTRODUCTION
+-------------------------------------------------------------------------------
+
+This document describes installing and utilizing XMPP PubSub events to 
+distribute device state and message waiting indication (MWI) events between
+servers. The difference between this method and OpenAIS (see 
+distributed_devstate.txt) is that OpenAIS can only be used in low latency
+networks; meaning only on the LAN, and not across the internet.
+
+If you plan on distributing device state or MWI across the internet, then you
+will require the use of XMPP PubSub events.
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Tigase Installation
+-------------------------------------------------------------------------------
+
+-- Description --
+
+Currently the only server supported for XMPP PubSub events is the Tigase open 
+source XMPP/Jabber environment. This is the server that the various Asterisk
+servers will connect to in order to distribute the events. The Tigase server can
+even be clustered in order to provide high availability for your device state;
+however, that is beyond the scope of this document.
+
+For more information about Tigase, visit their web site:
+
+    http://www.tigase.org
+
+-- Download --
+
+To download the Tigase environment, get the latest version at:
+
+    http://www.tigase.org/en/filebrowser/tigase-server
+
+-- Install --
+
+The Tigase server requires a working Java environment, including both a JRE
+(Java Runtime Environment) and a JDK (Java Development Kit), currently at least
+version 1.6.
+
+For more information about how to install Tigase, see the web site:
+
+    http://www.tigase.org/en/content/quick-start
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Tigase Configuration
+-------------------------------------------------------------------------------
+
+While installing Tigase, be sure you enable the PubSub module. Without it, the
+PubSub events won't be accepted by the server, and your device state will not be
+distributed.
+
+There are a couple of things you need to configure in Tigase before you start it
+in order for Asterisk to connect. The first thing we need to do is generate the
+self-signed certificate. To do this we use the keytool application. More
+information can be found here:
+
+    http://www.tigase.org/en/content/server-certificate
+
+-- Generating the keystore file --
+
+Generally, we need to run the following commands to generate a new keystore
+file.
+
+# cd /opt/Tigase-4.3.1-b1858/certs
+
+Be sure to change the 'yourdomain' to your domain.
+
+# keytool -genkey -alias yourdomain -keystore rsa-keystore \
+      -keyalg RSA -sigalg MD5withRSA
+
+The keytool application will then ask you for a password. Use the password
+'keystore' as this is the default password that Tigase will use to load the
+keystore file.
+
+You then need to specify your domain as the first value to be entered in the
+security certificate.
+
+What is your first and last name?
+  [Unknown]: asterisk.mydomain.tld
+What is the name of your organizational unit?
+  [Unknown]:
+What is the name of your organization?
+  [Unknown]:
+What is the name of your City or Locality?
+  [Unknown]:
+What is the name of your State or Province?
+  [Unknown]:
+What is the two-letter country code for this unit?
+  [Unknown]:
+Is CN=asterisk.mydomain.tld, OU=Unknown, O=Unknown, L=Unknown, ST=Unknown, C=Unknown correct?
+  [no]: yes
+
+You will then be asked for another password, in which case you must just press
+enter for the same password as Tigase will not work without them being the same.
+
+Enter key password for <mykey>
+             (RETURN if same as keystore password):
+
+
+-- Configuring init.properties --
+
+The next step is to configure the init.properties file which is used by Tigase
+to generate the tigase.xml file. Whenever you change the init.properties file
+because sure to remove the current tigase.xml file so that it will be
+regenerated at start up.
+
+# cd /opt/Tigase-4.3.1-b1858/etc
+
+Then edit the init.properties file and add the following:
+
+config-type=--gen-config-def
+--admins=admin@asterisk.mydomain.tld
+--virt-hosts=asterisk.mydomain.tld
+--debug=server
+--user-db=derby
+--user-db-uri=jdbc:derby:/opt/Tigase-4.3.1-b1858
+--comp-name-1=pubsub
+--comp-class-1=tigase.pubsub.PubSubComponent
+
+Be sure to change the domain in the --admin and --virt-hosts options. The most
+important lines are --comp-name-1 and --comp-class-1 which tell Tigase to load
+the PubSub module.
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Running Tigase
+-------------------------------------------------------------------------------
+
+You can then start the Tigase server with the tigase.sh script.
+
+# cd /opt/Tigase-4.3.1-b1858
+# ./scripts/tigase.sh start etc/tigase.conf
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Adding Buddies to Tigase
+-------------------------------------------------------------------------------
+
+At this time, Asterisk is not able to automatically register your peers for you,
+so you'll need to use an external application to do the initial registration.
+
+Pidgin is an excellent multi-protocol instant messenger application which
+supports XMPP. It runs on Linux, Windows, and OSX, and is open source. You can
+get Pidgin from http://www.pidgin.im
+
+Then add the two buddies we'll use in Asterisk with Pidgin by connecting to
+the Tigase server. For more information about how to register new buddies, see
+the Pidgin documentation.
+
+Once the initial registration is done and loaded into Tigase, you no longer need
+to worry about using Pidgin. Asterisk will then be able to load the peers into
+memory at start up.
+
+The example peers we've used in the following documentation for our two nodes
+are:
+
+server1@asterisk.mydomain.tld/astvoip1
+server2@asterisk.mydomain.tld/astvoip2
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Installing Asterisk
+-------------------------------------------------------------------------------
+
+Install Asterisk as usual. However, you'll need to make sure you have the
+res_jabber module compiled, which requires the iksemel development library.
+Additionally, be sure you have the OpenSSL development library installed so you
+can connect securly to the Tigase server.
+
+Make sure you check menuselect that res_jabber is selected so that it will
+compile.
+
+# cd asterisk-source
+# ./configure
+
+# make menuselect
+  ---> Resource Modules
+
+If you don't have jabber.conf in your existing configuration, because sure to
+copy the sample configuration file there.
+
+# cd configs
+# cp jabber.conf.sample /etc/asterisk/jabber.conf
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Configuring Asterisk
+-------------------------------------------------------------------------------
+
+We then need to configure our servers to communicate with the Tigase server. We
+need to modify the jabber.conf file on the servers. The configurations below are
+for a 2 server setup, but could be expanded for additional servers easily.
+
+The key note here is to note that the pubsub_node option needs to start with
+pubsub, so for example, pubsub.asterisk.mydomain.tld. Without the 'pubsub' your
+Asterisk system will not be able to distribute events.
+
+Additionally, you will need to specify each of the servers you need to connec to
+using the 'buddy' option.
+
+
+-- Asterisk Server 1 --
+
+[general]
+debug=no                                ;;Turn on debugging by default.
+;autoprune=yes                          ;;Auto remove users from buddy list. Depending on your
+                                        ;;setup (ie, using your personal Gtalk account for a test)
+                                        ;;you might lose your contacts list. Default is 'no'.
+autoregister=yes                        ;;Auto register users from buddy list.
+;collection_nodes=yes                   ;;Enable support for XEP-0248 for use with
+                                        ;;distributed device state.  Default is 'no'.
+;pubsub_autocreate=yes                  ;;Whether or not the PubSub server supports/is using
+                                        ;;auto-create for nodes.  If it is, we have to
+                                        ;;explicitly pre-create nodes before publishing them.
+                                        ;;Default is 'no'.
+
+[asterisk]
+type=client
+serverhost=asterisk.mydomain.tld
+pubsub_node=pubsub.asterisk.mydomain.tld
+username=server1@asterisk.mydomain.tld/astvoip1
+secret=welcome
+distribute_events=yes
+status=available
+usetls=no
+usesasl=yes
+buddy=server2@asterisk.mydomain.tld/astvoip2
+
+
+-- Asterisk Server 2 --
+
+[general]
+debug=yes                              ;;Turn on debugging by default.
+;autoprune=yes                         ;;Auto remove users from buddy list. Depending on your
+                                       ;;setup (ie, using your personal Gtalk account for a test)
+                                       ;;you might lose your contacts list. Default is 'no'.
+autoregister=yes                       ;;Auto register users from buddy list.
+;collection_nodes=yes                  ;;Enable support for XEP-0248 for use with
+                                       ;;distributed device state.  Default is 'no'.
+;pubsub_autocreate=yes                 ;;Whether or not the PubSub server supports/is using
+                                       ;;auto-create for nodes.  If it is, we have to
+                                       ;;explicitly pre-create nodes before publishing them.
+                                       ;;Default is 'no'.
+
+[asterisk]
+type=client
+serverhost=asterisk.mydomain.tld
+pubsub_node=pubsub.asterisk.mydomain.tld
+username=server2@asterisk.mydomain.tld/astvoip2
+secret=welcome
+distribute_events=yes
+status=available
+usetls=no
+usesasl=yes
+buddy=server1@asterisk.mydomain.tld/astvoip1
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Basic Testing of Asterisk with XMPP PubSub
+-------------------------------------------------------------------------------
+
+Once you have Asterisk installed with XMPP PubSub, it is time to test it out.
+
+We need to start up our first server and make sure we get connected to the XMPP
+server. We can verify this with an Asterisk console command to determine if
+we're connected.
+
+On Asterisk 1 we can run 'jabber show connected' to verify we're connected to
+the XMPP server.
+
+*CLI> jabber show connected 
+Jabber Users and their status:
+       User: server1@asterisk.mydomain.tld/astvoip1     - Connected
+----
+   Number of users: 1
+
+The command above has given us output which verifies we've connected our first
+server.
+
+We can then check the state of our buddies with the 'jabber show buddies' CLI
+command.
+
+*CLI> jabber show buddies
+Jabber buddy lists
+Client: server1@asterisk.mydomain.tld/astvoip1
+       Buddy:  server2@asterisk.mydomain.tld
+               Resource: None
+       Buddy:  server2@asterisk.mydomain.tld/astvoip2
+               Resource: None
+
+The output above tells us we're not connected to any buddies, and thus we're not
+distributing state to anyone (or getting it from anyone). That makes sense since
+we haven't yet started our other server.
+
+Now, let's start the other server and verify the servers are able to establish
+a connection between each other.
+
+On Asterisk 2, again we run the 'jabber show connected' command to make sure
+we've connected successfully to the XMPP server.
+
+*CLI> jabber show connected 
+Jabber Users and their status:
+       User: server2@asterisk.mydomain.tld/astvoip2     - Connected
+----
+   Number of users: 1
+
+And now we can check the status of our buddies.
+
+*CLI> jabber show buddies
+Jabber buddy lists
+Client: server2@scooter/astvoip2
+       Buddy:  server1@asterisk.mydomain.tld
+               Resource: astvoip1
+                       node: http://www.asterisk.org/xmpp/client/caps
+                       version: asterisk-xmpp
+                       Jingle capable: yes
+               Status: 1
+               Priority: 0
+       Buddy:  server1@asterisk.mydomain.tld/astvoip1
+               Resource: None
+
+Excellent! So we're connected to the buddy on Asterisk 1, and we could run the
+same command on Asterisk 1 to verify the buddy on Asterisk 2 is seen.
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Testing Distributed Device State
+-------------------------------------------------------------------------------
+
+The easiest way to test distributed device state is to use the DEVICE_STATE()
+diaplan function.  For example, you could have the following piece of dialplan
+on every server:
+
+[devstate_test]
+
+exten => 1234,hint,Custom:mystate
+
+exten => set_inuse,1,Set(DEVICE_STATE(Custom:mystate)=INUSE)
+exten => set_not_inuse,1,Set(DEVICE_STATE(Custom:mystate)=NOT_INUSE)
+
+exten => check,1,NoOp(Custom:mystate is ${DEVICE_STATE(Custom:mystate)})
+
+
+Now, you can test that the cluster-wide state of "Custom:mystate" is what
+you would expect after going to the CLI of each server and adjusting the state.
+
+server1*CLI> console dial set_inuse@devstate_test
+   ...
+
+server2*CLI> console dial check@devstate_test
+    -- Executing [check@devstate_test:1] NoOp("OSS/dsp", "Custom:mystate is INUSE") in new stack
+
+Various combinations of setting and checking the state on different servers can
+be used to verify that it works as expected.  Also, you can see the status of
+the hint on each server, as well, to see how extension state would reflect the
+state change with distributed device state:
+
+server2*CLI> core show hints
+    -= Registered Asterisk Dial Plan Hints =-
+                   1234@devstate_test       : Custom:mystate        State:InUse           Watchers  0
+
+
+One other helpful thing here during testing and debugging is to enable debug
+logging.  To do so, enable debug on the console in /etc/asterisk/logger.conf.
+Also, enable debug at the Asterisk CLI.
+
+*CLI> core set debug 1
+
+When you have this debug enabled, you will see output during the processing of
+every device state change.  The important thing to look for is where the known
+state of the device for each server is added together to determine the overall
+state.
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Notes On Large Installations
+-------------------------------------------------------------------------------
+
+On larger installations where you want a fully meshed network of buddies (i.e.
+all servers have all the buddies of the remote servers), you may want some
+method of distributing those buddies automatically so you don't need to modify
+all servers (N+1) every time you add a new server to the cluster.
+
+The problem there is that you're confined by what's allowed in XEP-0060, and 
+unfortunately that means modifying affiliations by individual JID (as opposed to
+the various subscription access models, which are more flexible).
+
+See here for details:
+http://xmpp.org/extensions/xep-0060.html#owner-affiliations
+
+One method for making this slightly easier is to utilize the #exec functionality
+in configuration files, and dynamically generate the buddies via script that
+pulls the information from a database, or to #include a file which is 
+automatically generated on all the servers when you add a new node to the
+cluster.
+
+Unfortunately this still requires a reload of res_jabber.so on all the servers,
+but this could also be solved through the use of the Asterisk Manager Interface
+(AMI).
+
+So while this is not the ideal situation, it is programmatically solvable with
+existing technologies and features found in Asterisk today.
+
+-------------------------------------------------------------------------------
+
+-------------------------------------------------------------------------------
+--- Questions, Comments, and Bug Reports
+-------------------------------------------------------------------------------
+
+Please utilize the Asterisk issue tracker for all bug reports at
+https://issues.asterisk.org
index a3ee0b2..5f5dd14 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2005, Digium, Inc.
+ * Copyright (C) 1999 - 2010, Digium, Inc.
  *
  * Matt O'Gorman <mogorman@digium.com>
  *
@@ -85,13 +85,19 @@ enum aji_state {
 enum {
        AJI_AUTOPRUNE = (1 << 0),
        AJI_AUTOREGISTER = (1 << 1),
-       AJI_AUTOACCEPT = (1 << 2)
+       AJI_AUTOACCEPT = (1 << 2),
+};
+
+enum {
+       AJI_XEP0248 = (1 << 0),
+       AJI_PUBSUB = (1 << 1),
+       AJI_PUBSUB_AUTOCREATE = (1 << 2),
 };
 
 enum aji_btype {
-       AJI_USER=0,
-       AJI_TRANS=1,
-       AJI_UTRANS=2
+       AJI_USER = 0,
+       AJI_TRANS = 1,
+       AJI_UTRANS = 2,
 };
 
 struct aji_version {
@@ -145,6 +151,7 @@ struct aji_client {
        char password[160];
        char user[AJI_MAX_JIDLEN];
        char serverhost[AJI_MAX_RESJIDLEN];
+       char pubsub_node[AJI_MAX_RESJIDLEN];
        char statusmessage[256];
        char name_space[256];
        char sid[10]; /* Session ID */
@@ -170,6 +177,7 @@ struct aji_client {
        int timeout;
        int message_timeout;
        int authorized;
+       int distribute_events;
        struct ast_flags flags;
        int component; /* 0 client,  1 component */
        struct aji_buddy_container buddies;
index 6110288..68c7c8f 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * Asterisk -- An open source telephony toolkit.
  *
- * Copyright (C) 1999 - 2006, Digium, Inc.
+ * Copyright (C) 1999 - 2010, Digium, Inc.
  *
  * Matt O'Gorman <mogorman@digium.com>
  *
@@ -58,6 +58,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #include "asterisk/astobj.h"
 #include "asterisk/astdb.h"
 #include "asterisk/manager.h"
+#include "asterisk/event.h"
+#include "asterisk/devicestate.h"
 
 /*** DOCUMENTATION
        <application name="JabberSend" language="en_US">
@@ -277,8 +279,8 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
        </manager>
  ***/
 
-/*! \todo This should really be renamed to xmpp.conf. For backwards compatibility, we
-       need to read both files */
+/*!\todo This should really be renamed to xmpp.conf. For backwards compatibility, we
+ * need to read both files */
 #define JABBER_CONFIG "jabber.conf"
 
 /*-- Forward declarations */
@@ -325,7 +327,45 @@ static int aji_ditems_handler(void *data, ikspak *pak);
 static int aji_register_query_handler(void *data, ikspak *pak);
 static int aji_register_approve_handler(void *data, ikspak *pak);
 static int aji_reconnect(struct aji_client *client);
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd,
+       struct ast_cli_args *a);
+static char *aji_cli_list_pubsub_nodes(struct ast_cli_entry *e, int cmd,
+       struct ast_cli_args *a);
+static char *aji_cli_delete_pubsub_node(struct ast_cli_entry *e, int cmd, struct
+       ast_cli_args *a);
+static char *aji_cli_purge_pubsub_nodes(struct ast_cli_entry *e, int cmd, struct
+               ast_cli_args *a);
 static iks *jabber_make_auth(iksid * id, const char *pass, const char *sid);
+static int aji_receive_node_list(void *data, ikspak* pak);
+static void aji_init_event_distribution(struct aji_client *client);
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type,
+       const char *name, const char *collection_name);
+static iks* aji_build_node_config(iks *pubsub, const char *node_type,
+       const char *collection_name);
+static void aji_create_pubsub_collection(struct aji_client *client,
+       const char *collection_name);
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+   const char *leaf_name);
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd,
+       struct ast_cli_args *a);
+static void aji_create_affiliations(struct aji_client *client, const char *node);
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type);
+static void aji_publish_device_state(struct aji_client *client, const char * device,
+       const char *device_state);
+static int aji_handle_pubsub_error(void *data, ikspak *pak);
+static int aji_handle_pubsub_event(void *data, ikspak *pak);
+static void aji_pubsub_subscribe(struct aji_client *client, const char *node);
+static void aji_delete_pubsub_node(struct aji_client *client, const char *node_name);
+static iks* aji_build_node_request(struct aji_client *client, const char *collection);
+static int aji_delete_node_list(void *data, ikspak* pak);
+static void aji_pubsub_purge_nodes(struct aji_client *client,
+       const char* collection_name);
+static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
+       const char *context, const char *oldmsgs, const char *newmsgs);
+static void aji_devstate_cb(const struct ast_event *ast_event, void *data);
+static void aji_mwi_cb(const struct ast_event *ast_event, void *data);
+static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
+       const char *event_type);
 /* No transports in this version */
 /*
 static int aji_create_transport(char *label, struct aji_client *client);
@@ -339,6 +379,11 @@ static struct ast_cli_entry aji_cli[] = {
        AST_CLI_DEFINE(aji_show_clients, "Show state of clients and components"),
        AST_CLI_DEFINE(aji_show_buddies, "Show buddy lists of our clients"),
        AST_CLI_DEFINE(aji_test, "Shows roster, but is generally used for mog's debugging."),
+       AST_CLI_DEFINE(aji_cli_create_collection, "Creates a PubSub node collection."),
+       AST_CLI_DEFINE(aji_cli_list_pubsub_nodes, "Lists PubSub nodes"),
+       AST_CLI_DEFINE(aji_cli_create_leafnode, "Creates a PubSub leaf node"),
+       AST_CLI_DEFINE(aji_cli_delete_pubsub_node, "Deletes a PubSub node"),
+       AST_CLI_DEFINE(aji_cli_purge_pubsub_nodes, "Purges PubSub nodes"),
 };
 
 static char *app_ajisend = "JabberSend";
@@ -349,12 +394,16 @@ static char *app_ajileave = "JabberLeave";
 
 static struct aji_client_container clients;
 static struct aji_capabilities *capabilities = NULL;
+static struct ast_event_sub *mwi_sub = NULL;
+static struct ast_event_sub *device_state_sub = NULL;
 static ast_cond_t message_received_condition;
 static ast_mutex_t messagelock;
 
 /*! \brief Global flags, initialized to default values */
 static struct ast_flags globalflags = { AJI_AUTOREGISTER | AJI_AUTOACCEPT };
 
+/*! \brief PubSub flags, initialized to default values */
+static struct ast_flags pubsubflags = { AJI_AUTOREGISTER };
 /*!
  * \internal
  * \brief Deletes the aji_client data structure.
@@ -430,17 +479,20 @@ static struct aji_version *aji_find_version(char *node, char *version, ikspak *p
 
        list = capabilities;
 
-       if (!node)
+       if (!node) {
                node = pak->from->full;
-       if (!version)
+       }
+       if (!version) {
                version = "none supplied.";
-       while(list) {
+       }
+       while (list) {
                if (!strcasecmp(list->node, node)) {
                        res = list->versions;
                        while(res) {
-                                if (!strcasecmp(res->version, version))
-                                        return res;
-                                res = res->next;
+                               if (!strcasecmp(res->version, version)) {
+                                       return res;
+                               }
+                               res = res->next;
                        }
                        /* Specified version not found. Let's add it to
                           this node in our capabilities list */
@@ -489,14 +541,15 @@ static struct aji_version *aji_find_version(char *node, char *version, ikspak *p
  * \internal
  * \brief Find the aji_resource we want
  * \param buddy aji_buddy A buddy
- * \param name 
+ * \param name
  * \return aji_resource object
 */
 static struct aji_resource *aji_find_resource(struct aji_buddy *buddy, char *name)
 {
        struct aji_resource *res = NULL;
-       if (!buddy || !name)
+       if (!buddy || !name) {
                return res;
+       }
        res = buddy->resources;
        while (res) {
                if (!strcasecmp(res->resource, name)) {
@@ -515,8 +568,9 @@ static struct aji_resource *aji_find_resource(struct aji_buddy *buddy, char *nam
 */
 static int gtalk_yuck(iks *node)
 {
-       if (iks_find_with_attrib(node, "c", "node", "http://www.google.com/xmpp/client/caps"))
+       if (iks_find_with_attrib(node, "c", "node", "http://www.google.com/xmpp/client/caps")) {
                return 1;
+       }
        return 0;
 }
 
@@ -577,8 +631,9 @@ static int aji_status_exec(struct ast_channel *chan, const char *data)
                AST_APP_ARG(resource);
        );
 
-       if (deprecation_warning++ % 10 == 0)
+       if (deprecation_warning++ % 10 == 0) {
                ast_log(LOG_WARNING, "JabberStatus is deprecated.  Please use the JABBER_STATUS dialplan function in the future.\n");
+       }
 
        if (!data) {
                ast_log(LOG_ERROR, "Usage: JabberStatus(<sender>,<jid>[/<resource>],<varname>\n");
@@ -608,12 +663,14 @@ static int aji_status_exec(struct ast_channel *chan, const char *data)
                return -1;
        }
        r = aji_find_resource(buddy, jid.resource);
-       if (!r && buddy->resources) 
+       if (!r && buddy->resources) {
                r = buddy->resources;
-       if (!r)
+       }
+       if (!r) {
                ast_log(LOG_NOTICE, "Resource '%s' of buddy '%s' was not found\n", jid.resource, jid.screenname);
-       else
+       } else {
                stat = r->status;
+       }
        snprintf(status, sizeof(status), "%d", stat);
        pbx_builtin_setvar_helper(chan, args.variable, status);
        return 0;
@@ -670,12 +727,14 @@ static int acf_jabberstatus_read(struct ast_channel *chan, const char *name, cha
                return -1;
        }
        r = aji_find_resource(buddy, jid.resource);
-       if (!r && buddy->resources) 
+       if (!r && buddy->resources) {
                r = buddy->resources;
-       if (!r)
+       }
+       if (!r) {
                ast_log(LOG_NOTICE, "Resource %s of buddy %s was not found.\n", jid.resource, jid.screenname);
-       else
+       } else {
                stat = r->status;
+       }
        snprintf(buf, buflen, "%d", stat);
        return 0;
 }
@@ -723,7 +782,7 @@ static int acf_jabberreceive_read(struct ast_channel *chan, const char *name, ch
        AST_STANDARD_APP_ARGS(args, parse);
 
        if (args.argc < 2 || args.argc > 3) {
-                ast_log(LOG_WARNING, "%s requires arguments (account,jid[,timeout])\n", name);
+               ast_log(LOG_WARNING, "%s requires arguments (account,jid[,timeout])\n", name);
                return -1;
        }
 
@@ -771,7 +830,7 @@ static int acf_jabberreceive_read(struct ast_channel *chan, const char *name, ch
                struct timeval wait;
                int res;
 
-                wait = ast_tvadd(start, ast_tv(timeout, 0));
+               wait = ast_tvadd(start, ast_tv(timeout, 0));
                ts.tv_sec = wait.tv_sec;
                ts.tv_nsec = wait.tv_usec * 1000;
 
@@ -782,7 +841,7 @@ static int acf_jabberreceive_read(struct ast_channel *chan, const char *name, ch
                if (res == ETIMEDOUT) {
                        ast_debug(3, "No message received from %s in %d seconds\n", args.jid, timeout);
                        break;
-               };
+               }
 
                AST_LIST_LOCK(&client->messages);
                AST_LIST_TRAVERSE_SAFE_BEGIN(&client->messages, tmp, list) {
@@ -940,23 +999,23 @@ static int aji_join_exec(struct ast_channel *chan, const char *data)
                ast_log(LOG_ERROR, "%s requires arguments (sender,jid[,nickname])\n", app_ajijoin);
                return -1;
        }
-       
+
        if (!(client = ast_aji_get_client(args.sender))) {
                ast_log(LOG_ERROR, "Could not find sender connection: '%s'\n", args.sender);
                return -1;
        }
-       
+
        if (strchr(args.jid, '/')) {
                ast_log(LOG_ERROR, "Invalid room name : resource must not be appended\n");
                ASTOBJ_UNREF(client, aji_client_destroy);
                return -1;
-       }       
-       
+       }
+
        if (!ast_strlen_zero(args.nick)) {
                snprintf(nick, AJI_MAX_RESJIDLEN, "%s", args.nick);
        } else {
                if (client->component) {
-                       sprintf(nick, "asterisk");                      
+                       sprintf(nick, "asterisk");
                } else {
                        snprintf(nick, AJI_MAX_RESJIDLEN, "%s", client->jid->user);
                }
@@ -967,11 +1026,11 @@ static int aji_join_exec(struct ast_channel *chan, const char *data)
        } else {
                ast_log(LOG_ERROR, "Problem with specified jid of '%s'\n", args.jid);
        }
-       
+
        ASTOBJ_UNREF(client, aji_client_destroy);
        return 0;
 }
-       
+
 /*!
 * \brief Application to leave a chat room
 * \param chan ast_channel
@@ -989,29 +1048,29 @@ static int aji_leave_exec(struct ast_channel *chan, const char *data)
                AST_APP_ARG(jid);
                AST_APP_ARG(nick);
        );
-       
+
        if (!data) {
                ast_log(LOG_ERROR, "%s requires arguments (sender,jid[,nickname])\n", app_ajileave);
                return -1;
        }
        s = ast_strdupa(data);
-       
+
        AST_STANDARD_APP_ARGS(args, s);
        if (args.argc < 2 || args.argc > 3) {
                ast_log(LOG_ERROR, "%s requires arguments (sender,jid[,nickname])\n", app_ajileave);
                return -1;
        }
-       
+
        if (!(client = ast_aji_get_client(args.sender))) {
                ast_log(LOG_ERROR, "Could not find sender connection: '%s'\n", args.sender);
                return -1;
        }
-       
+
        if (strchr(args.jid, '/')) {
                ast_log(LOG_ERROR, "Invalid room name, resource must not be appended\n");
                ASTOBJ_UNREF(client, aji_client_destroy);
                return -1;
-       } 
+       }
        if (!ast_strlen_zero(args.nick)) {
                snprintf(nick, AJI_MAX_RESJIDLEN, "%s", args.nick);
        } else {
@@ -1021,10 +1080,10 @@ static int aji_leave_exec(struct ast_channel *chan, const char *data)
                        snprintf(nick, AJI_MAX_RESJIDLEN, "%s", client->jid->user);
                }
        }
-       
+
        if (!ast_strlen_zero(args.jid) && strchr(args.jid, '@')) {
                ast_aji_leave_chat(client, args.jid, nick);
-       } 
+       }
        ASTOBJ_UNREF(client, aji_client_destroy);
        return 0;
 }
@@ -1088,24 +1147,24 @@ static int aji_sendgroup_exec(struct ast_channel *chan, const char *data)
                AST_APP_ARG(message);
                AST_APP_ARG(nick);
        );
-       
+
        if (!data) {
                ast_log(LOG_ERROR, "%s requires arguments (sender,groupchatid,message[,nickname])\n", app_ajisendgroup);
                return -1;
        }
        s = ast_strdupa(data);
-       
+
        AST_STANDARD_APP_ARGS(args, s);
        if (args.argc < 3 || args.argc > 4) {
                ast_log(LOG_ERROR, "%s requires arguments (sender,groupchatid,message[,nickname])\n", app_ajisendgroup);
                return -1;
        }
-       
+
        if (!(client = ast_aji_get_client(args.sender))) {
                ast_log(LOG_ERROR, "Could not find sender connection: '%s'\n", args.sender);
                return -1;
        }
-       
+
        if (ast_strlen_zero(args.nick) || args.argc == 3) {
                if (client->component) {
                        sprintf(nick, "asterisk");
@@ -1115,11 +1174,11 @@ static int aji_sendgroup_exec(struct ast_channel *chan, const char *data)
        } else {
                snprintf(nick, AJI_MAX_RESJIDLEN, "%s", args.nick);
        }
-       
+
        if (strchr(args.groupchat, '@') && !ast_strlen_zero(args.message)) {
                res = ast_aji_send_groupchat(client, nick, args.groupchat, args.message);
        }
-       
+
        ASTOBJ_UNREF(client, aji_client_destroy);
        if (res != IKS_OK) {
                return -1;
@@ -1154,64 +1213,64 @@ static int aji_start_tls(struct aji_client *client)
        int ret;
 
        /* This is sent not encrypted */
-       ret = iks_send_raw(client->p, "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>");
-       if (ret)
+       if ((ret = iks_send_raw(client->p, "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'/>"))) {
                return ret;
+       }
 
        client->stream_flags |= TRY_SECURE;
        return IKS_OK;
 }
 
-/*! 
+/*!
  * \internal
  * \brief TLS handshake, OpenSSL initialization
  * \param client the configured XMPP client we use to connect to a XMPP server
- * \return IKS_OK on success, IKS_NET_TLSFAIL on failure 
+ * \return IKS_OK on success, IKS_NET_TLSFAIL on failure
  */
 static int aji_tls_handshake(struct aji_client *client)
 {
        int ret;
        int sock;
-       
-       ast_debug(1, "Starting TLS handshake\n"); 
+
+       ast_debug(1, "Starting TLS handshake\n");
 
        /* Choose an SSL/TLS protocol version, create SSL_CTX */
        client->ssl_method = SSLv3_method();
-       client->ssl_context = SSL_CTX_new(client->ssl_method);                
-       if (!client->ssl_context)
+       if (!(client->ssl_context = SSL_CTX_new(client->ssl_method))) {
                return IKS_NET_TLSFAIL;
+       }
 
        /* Create new SSL session */
-       client->ssl_session = SSL_new(client->ssl_context);
-       if (!client->ssl_session)
+       if (!(client->ssl_session = SSL_new(client->ssl_context))) {
                return IKS_NET_TLSFAIL;
+       }
 
        /* Enforce TLS on our XMPP connection */
        sock = iks_fd(client->p);
-       ret = SSL_set_fd(client->ssl_session, sock);
-       if (!ret)
+       if (!(ret = SSL_set_fd(client->ssl_session, sock))) {
                return IKS_NET_TLSFAIL;
+       }
 
        /* Perform SSL handshake */
-       ret = SSL_connect(client->ssl_session);
-       if (!ret)
+       if (!(ret = SSL_connect(client->ssl_session))) {
                return IKS_NET_TLSFAIL;
+       }
 
        client->stream_flags &= (~TRY_SECURE);
        client->stream_flags |= SECURE;
 
        /* Sent over the established TLS connection */
-       ret = aji_send_header(client, client->jid->server);
-       if (ret != IKS_OK)
+       if ((ret = aji_send_header(client, client->jid->server)) != IKS_OK) {
                return IKS_NET_TLSFAIL;
+       }
 
-       ast_debug(1, "TLS started with server\n"); 
+       ast_debug(1, "TLS started with server\n");
 
        return IKS_OK;
 }
 #endif /* HAVE_OPENSSL */
 
-/*! 
+/*!
  * \internal
  * \brief Secured or unsecured IO socket receiving function
  * \param client the configured XMPP client we use to connect to a XMPP server
@@ -1232,11 +1291,12 @@ static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len,
 #ifdef HAVE_OPENSSL
        if (aji_is_secure(client)) {
                sock = SSL_get_fd(client->ssl_session);
-               if (sock < 0)
-                       return -1;              
+               if (sock < 0) {
+                       return -1;
+               }
        } else
 #endif /* HAVE_OPENSSL */
-               sock = iks_fd(client->p);       
+               sock = iks_fd(client->p);
 
        memset(&tv, 0, sizeof(struct timeval));
        FD_ZERO(&fds);
@@ -1265,7 +1325,7 @@ static int aji_io_recv(struct aji_client *client, char *buffer, size_t buf_len,
        return res;
 }
 
-/*! 
+/*!
  * \internal
  * \brief Tries to receive data from the Jabber server
  * \param client the configured XMPP client we use to connect to a XMPP server
@@ -1296,7 +1356,7 @@ static int aji_recv (struct aji_client *client, int timeout)
                buf[len] = '\0';
 
                /* our iksemel parser won't work as expected if we feed
-                  it with XML packets that contain multiple whitespace 
+                  it with XML packets that contain multiple whitespace
                   characters between tags */
                while (pos < len) {
                        c = buf[pos];
@@ -1314,11 +1374,11 @@ static int aji_recv (struct aji_client *client, int timeout)
                pos = 0;
                newbufpos = 0;
 
-               /* Log the message here, because iksemel's logHook is 
+               /* Log the message here, because iksemel's logHook is
                   unaccessible */
                aji_log_hook(client, buf, len, 1);
 
-               /* let iksemel deal with the string length, 
+               /* let iksemel deal with the string length,
                   and reset our buffer */
                ret = iks_parse(client->p, newbuf, 0, 0);
                memset(newbuf, 0, sizeof(newbuf));
@@ -1337,12 +1397,12 @@ static int aji_recv (struct aji_client *client, int timeout)
                if (ret != IKS_OK) {
                        return ret;
                }
-               ast_debug(3, "XML parsing successful\n");       
+               ast_debug(3, "XML parsing successful\n");
        }
        return IKS_OK;
 }
 
-/*! 
+/*!
  * \internal
  * \brief Sends XMPP header to the server
  * \param client the configured XMPP client we use to connect to a XMPP server
@@ -1369,7 +1429,7 @@ static int aji_send_header(struct aji_client *client, const char *to)
        return IKS_OK;
 }
 
-/*! 
+/*!
  * \brief Wraps raw sending
  * \param client the configured XMPP client we use to connect to a XMPP server
  * \param x the XMPP packet to send
@@ -1380,12 +1440,12 @@ int ast_aji_send(struct aji_client *client, iks *x)
        return aji_send_raw(client, iks_string(iks_stack(x), x));
 }
 
-/*! 
+/*!
  * \internal
  * \brief Sends an XML string over an XMPP connection
  * \param client the configured XMPP client we use to connect to a XMPP server
  * \param xmlstr the XML string to send
- * The XML data is sent whether the connection is secured or not. In the 
+ * The XML data is sent whether the connection is secured or not. In the
  * latter case, we just call iks_send_raw().
  * \return IKS_OK on success, any other value on failure
  */
@@ -1398,18 +1458,19 @@ static int aji_send_raw(struct aji_client *client, const char *xmlstr)
        if (aji_is_secure(client)) {
                ret = SSL_write(client->ssl_session, xmlstr, len);
                if (ret) {
-                       /* Log the message here, because iksemel's logHook is 
+                       /* Log the message here, because iksemel's logHook is
                           unaccessible */
                        aji_log_hook(client, xmlstr, len, 0);
                        return IKS_OK;
                }
        }
 #endif
-       /* If needed, data will be sent unencrypted, and logHook will 
+       /* If needed, data will be sent unencrypted, and logHook will
           be called inside iks_send_raw */
        ret = iks_send_raw(client->p, xmlstr);
-       if (ret != IKS_OK)
-               return ret;     
+       if (ret != IKS_OK) {
+               return ret;
+       }
 
        return IKS_OK;
 }
@@ -1426,19 +1487,21 @@ static void aji_log_hook(void *data, const char *xmpp, size_t size, int is_incom
 {
        struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
 
-       if (!ast_strlen_zero(xmpp))
+       if (!ast_strlen_zero(xmpp)) {
                manager_event(EVENT_FLAG_USER, "JabberEvent", "Account: %s\r\nPacket: %s\r\n", client->name, xmpp);
+       }
 
        if (client->debug) {
-               if (is_incoming)
+               if (is_incoming) {
                        ast_verbose("\nJABBER: %s INCOMING: %s\n", client->name, xmpp);
-               else {
+               } else {
                        if (strlen(xmpp) == 1) {
                                if (option_debug > 2  && xmpp[0] == ' ') {
                                        ast_verbose("\nJABBER: Keep alive packet\n");
                                }
-                       } else
+                       } else {
                                ast_verbose("\nJABBER: %s OUTGOING: %s\n", client->name, xmpp);
+                       }
                }
 
        }
@@ -1551,8 +1614,9 @@ static int aji_act_hook(void *data, int type, iks *node)
                                        ast_aji_increment_mid(client->mid);
                                        ast_aji_send(client, auth);
                                        iks_delete(auth);
-                               } else
+                               } else {
                                        ast_log(LOG_ERROR, "Out of memory.\n");
+                               }
                        }
                        break;
 
@@ -1567,8 +1631,9 @@ static int aji_act_hook(void *data, int type, iks *node)
                        if (!strcmp("stream:features", iks_name(node))) {
                                features = iks_stream_features(node);
                                if (client->usesasl) {
-                                       if (client->usetls && !aji_is_secure(client))
+                                       if (client->usetls && !aji_is_secure(client)) {
                                                break;
+                                       }
                                        if (client->authorized) {
                                                if (features & IKS_STREAM_BIND) {
                                                        iks_filter_add_rule(client->f, aji_client_connect, client, IKS_RULE_TYPE, IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_DONE);
@@ -1617,16 +1682,16 @@ static int aji_act_hook(void *data, int type, iks *node)
                                aji_send_header(client, client->jid->server);
                        }
                        break;
-               case IKS_NODE_ERROR: 
-                               ast_log(LOG_ERROR, "JABBER: Node Error\n");
-                               ASTOBJ_UNREF(client, aji_client_destroy);
-                               return IKS_HOOK;
-                               break;
-               case IKS_NODE_STOP: 
-                               ast_log(LOG_WARNING, "JABBER: Disconnected\n");
-                               ASTOBJ_UNREF(client, aji_client_destroy);
-                               return IKS_HOOK;
-                               break;
+               case IKS_NODE_ERROR:
+                       ast_log(LOG_ERROR, "JABBER: Node Error\n");
+                       ASTOBJ_UNREF(client, aji_client_destroy);
+                       return IKS_HOOK;
+                       break;
+               case IKS_NODE_STOP:
+                       ast_log(LOG_WARNING, "JABBER: Disconnected\n");
+                       ASTOBJ_UNREF(client, aji_client_destroy);
+                       return IKS_HOOK;
+                       break;
                }
        } else if (client->state != AJI_CONNECTED && client->component) {
                switch (type) {
@@ -1690,7 +1755,7 @@ static int aji_act_hook(void *data, int type, iks *node)
                ast_debug(1, "JABBER: I don't know anything about paktype '%d'\n", pak->type);
                break;
        }
-       
+
        iks_filter_packet(client->f, pak);
 
        if (node)
@@ -1729,17 +1794,16 @@ static int aji_register_approve_handler(void *data, ikspak *pak)
                        iks_insert_attrib(presence, "type", "subscribe");
                        iks_insert_attrib(x, "xmlns", "vcard-temp:x:update");
                        iks_insert_node(presence, x);
-                       ast_aji_send(client, presence); 
+                       ast_aji_send(client, presence);
                }
        } else {
                ast_log(LOG_ERROR, "Out of memory.\n");
        }
 
-
        iks_delete(iq);
        iks_delete(presence);
        iks_delete(x);
-       
+
        ASTOBJ_UNREF(client, aji_client_destroy);
        return IKS_FILTER_EAT;
 }
@@ -1753,7 +1817,7 @@ static int aji_register_approve_handler(void *data, ikspak *pak)
 static int aji_register_query_handler(void *data, ikspak *pak)
 {
        struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
-       struct aji_buddy *buddy = NULL; 
+       struct aji_buddy *buddy = NULL;
        char *node = NULL;
        iks *iq = NULL, *query = NULL;
 
@@ -1787,7 +1851,7 @@ static int aji_register_query_handler(void *data, ikspak *pak)
 
                iks_delete(error);
                iks_delete(notacceptable);
-       } else  if (!(node = iks_find_attrib(pak->query, "node"))) {
+       } else if (!(node = iks_find_attrib(pak->query, "node"))) {
                iks *instructions = NULL;
                char *explain = "Welcome to Asterisk - the Open Source PBX.\n";
                iq = iks_new("iq");
@@ -1819,7 +1883,7 @@ static int aji_register_query_handler(void *data, ikspak *pak)
  * \internal
  * \brief Handles stuff
  * \param data void
- * \param pak ikspak 
+ * \param pak ikspak
  * \return IKS_FILTER_EAT.
 */
 static int aji_ditems_handler(void *data, ikspak *pak)
@@ -1934,8 +1998,9 @@ static int aji_client_info_handler(void *data, ikspak *pak)
                }
                if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
                        resource->cap->jingle = 1;
-               } else
+               } else {
                        resource->cap->jingle = 0;
+               }
        } else if (pak->subtype == IKS_TYPE_GET) {
                iks *iq, *disco, *ident, *google, *query;
                iq = iks_new("iq");
@@ -1959,8 +2024,9 @@ static int aji_client_info_handler(void *data, ikspak *pak)
                        iks_insert_node(query, google);
                        iks_insert_node(query, disco);
                        ast_aji_send(client, iq);
-               } else
+               } else {
                        ast_log(LOG_ERROR, "Out of Memory.\n");
+               }
 
                iks_delete(iq);
                iks_delete(query);
@@ -1995,14 +2061,15 @@ static int aji_dinfo_handler(void *data, ikspak *pak)
        }
        if (pak->subtype == IKS_TYPE_RESULT) {
                if (!resource) {
-                       ast_log(LOG_NOTICE,"JABBER: Received client info from %s when not requested.\n", pak->from->full);
+                       ast_log(LOG_NOTICE, "JABBER: Received client info from %s when not requested.\n", pak->from->full);
                        ASTOBJ_UNREF(client, aji_client_destroy);
                        return IKS_FILTER_EAT;
                }
                if (iks_find_with_attrib(pak->query, "feature", "var", "http://www.google.com/xmpp/protocol/voice/v1")) {
                        resource->cap->jingle = 1;
-               } else
+               } else {
                        resource->cap->jingle = 0;
+               }
        } else if (pak->subtype == IKS_TYPE_GET && !(node = iks_find_attrib(pak->query, "node"))) {
                iks *iq, *query, *identity, *disco, *reg, *commands, *gateway, *version, *vcard, *search;
 
@@ -2016,7 +2083,6 @@ static int aji_dinfo_handler(void *data, ikspak *pak)
                version = iks_new("feature");
                vcard = iks_new("feature");
                search = iks_new("feature");
-
                if (iq && query && identity && disco && reg && commands && gateway && version && vcard && search && client) {
                        iks_insert_attrib(iq, "from", client->user);
                        iks_insert_attrib(iq, "to", pak->from->full);
@@ -2058,7 +2124,6 @@ static int aji_dinfo_handler(void *data, ikspak *pak)
                iks_delete(version);
                iks_delete(vcard);
                iks_delete(search);
-
        } else if (pak->subtype == IKS_TYPE_GET && !strcasecmp(node, "http://jabber.org/protocol/commands")) {
                iks *iq, *query, *confirm;
                iq = iks_new("iq");
@@ -2212,27 +2277,31 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
        status = (pak->show) ? pak->show : 6;
        priority = atoi((iks_find_cdata(pak->x, "priority")) ? iks_find_cdata(pak->x, "priority") : "0");
        tmp = buddy->resources;
-       descrip = ast_strdup(iks_find_cdata(pak->x,"status"));
+       descrip = ast_strdup(iks_find_cdata(pak->x, "status"));
 
        while (tmp && pak->from->resource) {
                if (!strcasecmp(tmp->resource, pak->from->resource)) {
                        tmp->status = status;
-                       if (tmp->description) ast_free(tmp->description);
+                       if (tmp->description) {
+                               ast_free(tmp->description);
+                       }
                        tmp->description = descrip;
                        found = tmp;
                        if (status == 6) {      /* Sign off Destroy resource */
                                if (last && found->next) {
                                        last->next = found->next;
                                } else if (!last) {
-                                       if (found->next)
+                                       if (found->next) {
                                                buddy->resources = found->next;
-                                       else
+                                       } else {
                                                buddy->resources = NULL;
+                                       }
                                } else if (!found->next) {
-                                       if (last)
+                                       if (last) {
                                                last->next = NULL;
-                                       else
+                                       } else {
                                                buddy->resources = NULL;
+                                       }
                                }
                                ast_free(found);
                                found = NULL;
@@ -2241,33 +2310,38 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
                        /* resource list is sorted by descending priority */
                        if (tmp->priority != priority) {
                                found->priority = priority;
-                               if (!last && !found->next)
+                               if (!last && !found->next) {
                                        /* resource was found to be unique,
                                           leave loop */
                                        break;
+                               }
                                /* search for resource in our list
                                   and take it out for the moment */
-                               if (last)
+                               if (last) {
                                        last->next = found->next;
-                               else
+                               } else {
                                        buddy->resources = found->next;
+                               }
 
                                last = NULL;
                                tmp = buddy->resources;
-                               if (!buddy->resources)
+                               if (!buddy->resources) {
                                        buddy->resources = found;
+                               }
                                /* priority processing */
                                while (tmp) {
-                                       /* insert resource back according to 
+                                       /* insert resource back according to
                                           its priority value */
                                        if (found->priority > tmp->priority) {
-                                               if (last)
+                                               if (last) {
                                                        /* insert within list */
                                                        last->next = found;
+                                               }
                                                found->next = tmp;
-                                               if (!last)
+                                               if (!last) {
                                                        /* insert on top */
                                                        buddy->resources = found;
+                                               }
                                                break;
                                        }
                                        if (!tmp->next) {
@@ -2303,11 +2377,13 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
                tmp = buddy->resources;
                while (tmp) {
                        if (found->priority > tmp->priority) {
-                               if (last)
+                               if (last) {
                                        last->next = found;
+                               }
                                found->next = tmp;
-                               if (!last)
+                               if (!last) {
                                        buddy->resources = found;
+                               }
                                break;
                        }
                        if (!tmp->next) {
@@ -2317,10 +2393,11 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
                        last = tmp;
                        tmp = tmp->next;
                }
-               if (!tmp)
+               if (!tmp) {
                        buddy->resources = found;
+               }
        }
-       
+
        ASTOBJ_UNLOCK(buddy);
        ASTOBJ_UNREF(buddy, aji_buddy_destroy);
 
@@ -2336,28 +2413,27 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
        /* retrieve capabilites of the new resource */
        if (status != 6 && found && !found->cap) {
                found->cap = aji_find_version(node, ver, pak);
-               if (gtalk_yuck(pak->x)) /* gtalk should do discover */
+               if (gtalk_yuck(pak->x)) { /* gtalk should do discover */
                        found->cap->jingle = 1;
-               if (found->cap->jingle && option_debug > 4) {
-                       ast_debug(1,"Special case for google till they support discover.\n");
                }
-               else {
+               if (found->cap->jingle && option_debug > 4) {
+                       ast_debug(1, "Special case for google till they support discover.\n");
+               } else {
                        iks *iq, *query;
                        iq = iks_new("iq");
                        query = iks_new("query");
-                       if (query && iq)  {
+                       if (query && iq) {
                                iks_insert_attrib(iq, "type", "get");
                                iks_insert_attrib(iq, "to", pak->from->full);
-                               iks_insert_attrib(iq,"from", client->jid->full);
+                               iks_insert_attrib(iq, "from", client->jid->full);
                                iks_insert_attrib(iq, "id", client->mid);
                                ast_aji_increment_mid(client->mid);
                                iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#info");
                                iks_insert_node(iq, query);
                                ast_aji_send(client, iq);
-                               
-                       } else
+                       } else {
                                ast_log(LOG_ERROR, "Out of memory.\n");
-                       
+                       }
                        iks_delete(query);
                        iks_delete(iq);
                }
@@ -2395,17 +2471,17 @@ static void aji_handle_presence(struct aji_client *client, ikspak *pak)
                ast_debug(3, "JABBER: Kinky! how did that happen %i\n", pak->show);
        }
 
-    if (found) {
-       manager_event(EVENT_FLAG_USER, "JabberStatus",
+       if (found) {
+               manager_event(EVENT_FLAG_USER, "JabberStatus",
                        "Account: %s\r\nJID: %s\r\nResource: %s\r\nStatus: %d\r\nPriority: %d"
                        "\r\nDescription: %s\r\n",
                        client->name, pak->from->partial, found->resource, found->status,
                        found->priority, found->description);
-    } else {
-       manager_event(EVENT_FLAG_USER, "JabberStatus",
+       } else {
+               manager_event(EVENT_FLAG_USER, "JabberStatus",
                        "Account: %s\r\nJID: %s\r\nStatus: %d\r\n",
-                       client->name, pak->from->partial, pak->show ? pak->show:IKS_SHOW_UNAVAILABLE);
-    }
+                       client->name, pak->from->partial, pak->show ? pak->show : IKS_SHOW_UNAVAILABLE);
+       }
 }
 
 /*!
@@ -2420,7 +2496,7 @@ static void aji_handle_subscribe(struct aji_client *client, ikspak *pak)
        iks *presence = NULL, *status = NULL;
        struct aji_buddy* buddy = NULL;
 
-       switch (pak->subtype) { 
+       switch (pak->subtype) {
        case IKS_TYPE_SUBSCRIBE:
                if (ast_test_flag(&client->flags, AJI_AUTOACCEPT)) {
                        presence = iks_new("presence");
@@ -2434,8 +2510,9 @@ static void aji_handle_subscribe(struct aji_client *client, ikspak *pak)
                                iks_insert_cdata(status, "Asterisk has approved subscription", 0);
                                iks_insert_node(presence, status);
                                ast_aji_send(client, presence);
-                       } else
+                       } else {
                                ast_log(LOG_ERROR, "Unable to allocate nodes\n");
+                       }
 
                        iks_delete(presence);
                        iks_delete(status);
@@ -2501,12 +2578,12 @@ static int aji_send_raw_chat(struct aji_client *client, int groupchat, const cha
        } else {
                snprintf(from, AJI_MAX_JIDLEN, "%s", client->jid->full);
        }
-       
+
        if (client->state != AJI_CONNECTED) {
                ast_log(LOG_WARNING, "JABBER: Not connected can't send\n");
                return -1;
-       }   
-       
+       }
+
        message_packet = iks_make_msg(groupchat ? IKS_TYPE_GROUPCHAT : IKS_TYPE_CHAT, address, message);
        if (!message_packet) {
                ast_log(LOG_ERROR, "Out of memory.\n");
@@ -2515,7 +2592,7 @@ static int aji_send_raw_chat(struct aji_client *client, int groupchat, const cha
        iks_insert_attrib(message_packet, "from", from);
        res = ast_aji_send(client, message_packet);
        iks_delete(message_packet);
-       
+
        return res;
 }
 
@@ -2539,8 +2616,9 @@ int ast_aji_create_chat(struct aji_client *client, char *room, char *server, cha
                iks_insert_attrib(iq, "id", client->mid);
                ast_aji_increment_mid(client->mid);
                ast_aji_send(client, iq);
-       } else 
+       } else {
                ast_log(LOG_ERROR, "Out of memory.\n");
+       }
 
        iks_delete(iq);
 
@@ -2573,7 +2651,7 @@ int ast_aji_leave_chat(struct aji_client *client, char *room, char *nick)
 /*!
  * \brief invite to a chatroom.
  * \param client the configured XMPP client we use to connect to a XMPP server
- * \param user 
+ * \param user
  * \param room
  * \param message
  * \return res.
@@ -2596,13 +2674,14 @@ int ast_aji_invite_chat(struct aji_client *client, char *user, char *room, char
                iks_insert_node(invite, body);
                iks_insert_node(invite, namespace);
                res = ast_aji_send(client, invite);
-       } else 
+       } else {
                ast_log(LOG_ERROR, "Out of memory.\n");
+       }
 
        iks_delete(body);
        iks_delete(namespace);
        iks_delete(invite);
-       
+
        return res;
 }
 
@@ -2617,7 +2696,7 @@ static void *aji_recv_loop(void *data)
        struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
        int res = IKS_HOOK;
 
-       while(res != IKS_OK) {
+       while (res != IKS_OK) {
                ast_debug(3, "JABBER: Connecting.\n");
                res = aji_reconnect(client);
                sleep(4);
@@ -2625,7 +2704,7 @@ static void *aji_recv_loop(void *data)
 
        do {
                if (res == IKS_NET_RWERR || client->timeout == 0) {
-                       while(res != IKS_OK) {
+                       while (res != IKS_OK) {
                                ast_debug(3, "JABBER: reconnecting.\n");
                                res = aji_reconnect(client);
                                sleep(4);
@@ -2633,7 +2712,7 @@ static void *aji_recv_loop(void *data)
                }
 
                res = aji_recv(client, 1);
-               
+
                if (client->state == AJI_DISCONNECTING) {
                        ast_debug(2, "Ending our Jabber client's thread due to a disconnect\n");
                        pthread_exit(NULL);
@@ -2803,14 +2882,14 @@ static void aji_pruneregister(struct aji_client *client)
                        res = ast_aji_send(client, iks_make_s10n(IKS_TYPE_UNSUBSCRIBED, iterator->name,
                                                                 "GoodBye.  You are no longer in the Asterisk config file so I am removing"
                                                                 " your access to my presence.\n"));
-                       iks_insert_attrib(removeiq, "from", client->jid->full); 
-                       iks_insert_attrib(removeiq, "type", "set"); 
+                       iks_insert_attrib(removeiq, "from", client->jid->full);
+                       iks_insert_attrib(removeiq, "type", "set");
                        iks_insert_attrib(removequery, "xmlns", "jabber:iq:roster");
                        iks_insert_attrib(removeitem, "jid", iterator->name);
                        iks_insert_attrib(removeitem, "subscription", "remove");
                        res = ast_aji_send(client, removeiq);
                } else if (ast_test_flag(&iterator->flags, AJI_AUTOREGISTER)) {
-                       res = ast_aji_send(client, iks_make_s10n(IKS_TYPE_SUBSCRIBE, iterator->name, 
+                       res = ast_aji_send(client, iks_make_s10n(IKS_TYPE_SUBSCRIBE, iterator->name,
                                                                 "Greetings! I am the Asterisk Open Source PBX and I want to subscribe to your presence\n"));
                        ast_clear_flag(&iterator->flags, AJI_AUTOREGISTER);
                }
@@ -2822,7 +2901,7 @@ static void aji_pruneregister(struct aji_client *client)
        iks_delete(removequery);
        iks_delete(removeitem);
        iks_delete(send);
-       
+
        ASTOBJ_CONTAINER_PRUNE_MARKED(&client->buddies, aji_buddy_destroy);
 }
 
@@ -2839,7 +2918,7 @@ static int aji_filter_roster(void *data, ikspak *pak)
        int flag = 0;
        iks *x = NULL;
        struct aji_buddy *buddy;
-       
+
        client->state = AJI_CONNECTED;
        ASTOBJ_CONTAINER_TRAVERSE(&client->buddies, 1, {
                ASTOBJ_RDLOCK(iterator);
@@ -2854,10 +2933,11 @@ static int aji_filter_roster(void *data, ikspak *pak)
                        }
                        x = iks_next(x);
                }
-               if (!flag)
+               if (!flag) {
                        ast_copy_flags(&iterator->flags, &client->flags, AJI_AUTOREGISTER);
+               }
                iks_delete(x);
-               
+
                ASTOBJ_UNLOCK(iterator);
        });
 
@@ -2877,7 +2957,7 @@ static int aji_filter_roster(void *data, ikspak *pak)
                                x = iks_next(x);
                                continue;
                        }
-                       
+
                        buddy = ast_calloc(1, sizeof(*buddy));
                        if (!buddy) {
                                ast_log(LOG_WARNING, "Out of memory\n");
@@ -2921,13 +3001,16 @@ static int aji_reconnect(struct aji_client *client)
 {
        int res = 0;
 
-       if (client->state)
+       if (client->state) {
                client->state = AJI_DISCONNECTED;
-       client->timeout=50;
-       if (client->p)
+       }
+       client->timeout = 50;
+       if (client->p) {
                iks_parser_reset(client->p);
-       if (client->authorized)
+       }
+       if (client->authorized) {
                client->authorized = 0;
+       }
 
        res = aji_initialize(client);
 
@@ -2952,7 +3035,7 @@ static int aji_get_roster(struct aji_client *client)
        }
 
        iks_delete(roster);
-       
+
        return 1;
 }
 
@@ -2974,11 +3057,16 @@ static int aji_client_connect(void *data, ikspak *pak)
                        client->state = AJI_CONNECTING;
                        client->jid = (iks_find_cdata(pak->query, "jid")) ? iks_id_new(client->stack, iks_find_cdata(pak->query, "jid")) : client->jid;
                        iks_filter_remove_hook(client->f, aji_client_connect);
-                       if (!client->component) /*client*/
+                       if (!client->component) { /*client*/
                                aji_get_roster(client);
+                               if (client->distribute_events) {
+                                       aji_init_event_distribution(client);
+                               }
+                       }
                }
-       } else
+       } else {
                ast_log(LOG_ERROR, "Out of memory.\n");
+       }
 
        ASTOBJ_UNREF(client, aji_client_destroy);
        return res;
@@ -2994,7 +3082,7 @@ static int aji_initialize(struct aji_client *client)
 {
        int connected = IKS_NET_NOCONN;
 
-#ifdef HAVE_OPENSSL    
+#ifdef HAVE_OPENSSL
        /* reset stream flags */
        client->stream_flags = 0;
 #endif
@@ -3004,8 +3092,9 @@ static int aji_initialize(struct aji_client *client)
        if (connected == IKS_NET_NOCONN) {
                ast_log(LOG_ERROR, "JABBER ERROR: No Connection\n");
                return IKS_HOOK;
-       } else  if (connected == IKS_NET_NODNS) {
-               ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to  %s\n", client->name, S_OR(client->serverhost, client->jid->server));
+       } else if (connected == IKS_NET_NODNS) {
+               ast_log(LOG_ERROR, "JABBER ERROR: No DNS %s for client to  %s\n", client->name,
+                       S_OR(client->serverhost, client->jid->server));
                return IKS_HOOK;
        }
 
@@ -3037,6 +3126,771 @@ int ast_aji_disconnect(struct aji_client *client)
 }
 
 /*!
+ * \brief Callback function for MWI events
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void aji_mwi_cb(const struct ast_event *ast_event, void *data)
+{
+       const char *mailbox;
+       const char *context;
+       char oldmsgs[10];
+       char newmsgs[10];
+       if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
+       {
+               /* If the event didn't originate from this server, don't send it back out. */
+               ast_log(LOG_DEBUG, "Returning here\n");
+               return;
+       }
+
+       struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+       mailbox = ast_event_get_ie_str(ast_event, AST_EVENT_IE_MAILBOX);
+       context = ast_event_get_ie_str(ast_event, AST_EVENT_IE_CONTEXT);
+       snprintf(oldmsgs, sizeof(oldmsgs), "%d",
+               ast_event_get_ie_uint(ast_event, AST_EVENT_IE_OLDMSGS));
+       snprintf(newmsgs, sizeof(newmsgs), "%d",
+               ast_event_get_ie_uint(ast_event, AST_EVENT_IE_NEWMSGS));
+       aji_publish_mwi(client, mailbox, context, oldmsgs, newmsgs);
+
+}
+/*!
+ * \brief Callback function for device state events
+ * \param ast_event
+ * \param data void pointer to ast_client structure
+ * \return void
+ */
+static void aji_devstate_cb(const struct ast_event *ast_event, void *data)
+{
+       const char *device;
+       const char *device_state;
+       if (ast_eid_cmp(&ast_eid_default, ast_event_get_ie_raw(ast_event, AST_EVENT_IE_EID)))
+       {
+               /* If the event didn't originate from this server, don't send it back out. */
+               ast_log(LOG_DEBUG, "Returning here\n");
+               return;
+       }
+
+       struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+       device = ast_event_get_ie_str(ast_event, AST_EVENT_IE_DEVICE);
+       device_state = ast_devstate_str(ast_event_get_ie_uint(ast_event, AST_EVENT_IE_STATE));
+       aji_publish_device_state(client, device, device_state);
+}
+
+/*!
+ * \brief Initialize collections for event distribution
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \return void
+ */
+static void aji_init_event_distribution(struct aji_client *client)
+{
+       if (!mwi_sub) {
+               mwi_sub = ast_event_subscribe(AST_EVENT_MWI, aji_mwi_cb, "aji_mwi_subscription",
+                       client, AST_EVENT_IE_END);
+       }
+       if (!device_state_sub) {
+               if (ast_enable_distributed_devstate()) {
+                       return;
+               }
+               device_state_sub = ast_event_subscribe(AST_EVENT_DEVICE_STATE_CHANGE,
+                       aji_devstate_cb, "aji_devstate_subscription", client, AST_EVENT_IE_END);
+               ast_event_dump_cache(device_state_sub);
+       }
+
+       aji_pubsub_subscribe(client, "device_state");
+       aji_pubsub_subscribe(client, "message_waiting");
+       iks_filter_add_rule(client->f, aji_handle_pubsub_event, client, IKS_RULE_TYPE,
+               IKS_PAK_MESSAGE, IKS_RULE_FROM, client->pubsub_node, IKS_RULE_DONE);
+       iks_filter_add_rule(client->f, aji_handle_pubsub_error, client, IKS_RULE_TYPE,
+               IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_ERROR, IKS_RULE_DONE);
+
+}
+
+/*!
+ * \brief Callback for handling PubSub events
+ * \param data void pointer to aji_client structure
+ * \return IKS_FILTER_EAT
+ */
+static int aji_handle_pubsub_event(void *data, ikspak *pak)
+{
+       char *item_id, *device_state, *context;
+       int oldmsgs, newmsgs;
+       iks *item, *item_content;
+       struct ast_eid pubsub_eid;
+       struct ast_event *event;
+       item = iks_find(iks_find(iks_find(pak->x, "event"), "items"), "item");
+       if (!item) {
+               ast_log(LOG_ERROR, "Could not parse incoming PubSub event\n");
+               return IKS_FILTER_EAT;
+       }
+       item_id = iks_find_attrib(item, "id");
+       item_content = iks_child(item);
+       ast_str_to_eid(&pubsub_eid, iks_find_attrib(item_content, "eid"));
+       if (!ast_eid_cmp(&ast_eid_default, &pubsub_eid)) {
+               ast_log(LOG_DEBUG, "Returning here, eid of incoming event matches ours!\n");
+               return IKS_FILTER_EAT;
+       }
+       if (!strcasecmp(iks_name(item_content), "state")) {
+               device_state = iks_find_cdata(item, "state");
+               if (!(event = ast_event_new(AST_EVENT_DEVICE_STATE_CHANGE,
+                       AST_EVENT_IE_DEVICE, AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_STATE,
+                       AST_EVENT_IE_PLTYPE_UINT, ast_devstate_val(device_state), AST_EVENT_IE_EID,
+                       AST_EVENT_IE_PLTYPE_RAW, &pubsub_eid, sizeof(pubsub_eid),
+                       AST_EVENT_IE_END))) {
+                       return IKS_FILTER_EAT;
+               }
+       } else if (!strcasecmp(iks_name(item_content), "mailbox")) {
+               context = strsep(&item_id, "@");
+               sscanf(iks_find_cdata(item_content, "OLDMSGS"), "%10d", &oldmsgs);
+               sscanf(iks_find_cdata(item_content, "NEWMSGS"), "%10d", &newmsgs);
+               if (!(event = ast_event_new(AST_EVENT_MWI, AST_EVENT_IE_MAILBOX,
+                       AST_EVENT_IE_PLTYPE_STR, item_id, AST_EVENT_IE_CONTEXT,
+                       AST_EVENT_IE_PLTYPE_STR, context, AST_EVENT_IE_OLDMSGS,
+                       AST_EVENT_IE_PLTYPE_UINT, oldmsgs, AST_EVENT_IE_NEWMSGS,
+                       AST_EVENT_IE_PLTYPE_UINT, newmsgs, AST_EVENT_IE_EID, AST_EVENT_IE_PLTYPE_RAW,
+                       &pubsub_eid, sizeof(pubsub_eid), AST_EVENT_IE_END))) {
+                       return IKS_FILTER_EAT;
+               }
+       } else {
+               ast_log(LOG_DEBUG, "Don't know how to handle PubSub event of type %s\n",
+                       iks_name(item_content));
+               return IKS_FILTER_EAT;
+       }
+       ast_event_queue_and_cache(event);
+       return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Add Owner affiliations for pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node the name of the node to which to add affiliations
+ * \return void
+ */
+static void aji_create_affiliations(struct aji_client *client, const char *node)
+{
+       int res = 0;
+       iks *modify_affiliates = aji_pubsub_iq_create(client, "set");
+       iks *pubsub, *affiliations, *affiliate;
+       pubsub = iks_insert(modify_affiliates, "pubsub");
+       iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub#owner");
+       affiliations = iks_insert(pubsub, "affiliations");
+       iks_insert_attrib(affiliations, "node", node);
+       ASTOBJ_CONTAINER_TRAVERSE(&client->buddies, 1, {
+               ASTOBJ_RDLOCK(iterator);
+               affiliate = iks_insert(affiliations, "affiliation");
+               iks_insert_attrib(affiliate, "jid", iterator->name);
+               iks_insert_attrib(affiliate, "affiliation", "owner");
+               ASTOBJ_UNLOCK(iterator);
+       });
+       res = ast_aji_send(client, modify_affiliates);
+       iks_delete(modify_affiliates);
+}
+
+/*!
+ * \brief Subscribe to a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node the name of the node to which to subscribe
+ * \return void
+ */
+static void aji_pubsub_subscribe(struct aji_client *client, const char *node)
+{
+       iks *request = aji_pubsub_iq_create(client, "set");
+       iks *pubsub, *subscribe;
+
+       pubsub = iks_insert(request, "pubsub");
+       iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+       subscribe = iks_insert(pubsub, "subscribe");
+       iks_insert_attrib(subscribe, "jid", client->jid->partial);
+       iks_insert_attrib(subscribe, "node", node);
+       if (ast_test_flag(&globalflags, AJI_XEP0248)) {
+               iks *options, *x, *sub_options, *sub_type, *sub_depth;
+               options = iks_insert(pubsub, "options");
+               x = iks_insert(options, "x");
+               iks_insert_attrib(x, "xmlns", "jabber:x:data");
+               iks_insert_attrib(x, "type", "submit");
+               sub_options = iks_insert(x, "field");
+               iks_insert_attrib(sub_options, "var", "FORM_TYPE");
+               iks_insert_attrib(sub_options, "type", "hidden");
+               iks_insert_cdata(iks_insert(sub_options, "value"),
+                       "http://jabber.org/protocol/pubsub#subscribe_options", 51);
+               sub_type = iks_insert(x, "field");
+               iks_insert_attrib(sub_type, "var", "pubsub#subscription_type");
+               iks_insert_cdata(iks_insert(sub_type, "value"), "items", 5);
+               sub_depth = iks_insert(x, "field");
+               iks_insert_attrib(sub_type, "var", "pubsub#subscription_depth");
+               iks_insert_cdata(iks_insert(sub_depth, "value"), "all", 3);
+       }
+       ast_aji_send(client, request);
+       iks_delete(request);
+}
+
+/*!
+ * \brief Build the skeleton of a publish
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node Name of the node that will be published to
+ * \param event_type
+ * \return iks *
+ */
+static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
+       const char *event_type)
+{
+       iks *request = aji_pubsub_iq_create(client, "set");
+       iks *pubsub, *publish, *item;
+       pubsub = iks_insert(request, "pubsub");
+       iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+       publish = iks_insert(pubsub, "publish");
+       if (ast_test_flag(&globalflags, AJI_XEP0248)) {
+               iks_insert_attrib(publish, "node", node);
+       } else {
+               iks_insert_attrib(publish, "node", event_type);
+       }
+       item = iks_insert(publish, "item");
+       iks_insert_attrib(item, "id", node);
+       return item;
+
+}
+
+/*!
+ * \brief Publish device state to a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param device the name of the device whose state to publish
+ * \param device_state the state to publish
+ * \return void
+ */
+static void aji_publish_device_state(struct aji_client *client, const char *device,
+       const char *device_state)
+{
+       iks *request = aji_build_publish_skeleton(client, device, "device_state");
+       iks *state;
+       char eid_str[20];
+       if (ast_test_flag(&pubsubflags, AJI_PUBSUB_AUTOCREATE)) {
+               if (ast_test_flag(&pubsubflags, AJI_XEP0248)) {
+                       aji_create_pubsub_node(client, "leaf", device, "device_state");
+               } else {
+                       aji_create_pubsub_node(client, NULL, device, NULL);
+               }
+       }
+       ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+       state = iks_insert(request, "state");
+       iks_insert_attrib(state, "xmlns", "http://asterisk.org");
+       iks_insert_attrib(state, "eid", eid_str);
+       iks_insert_cdata(state, device_state, strlen(device_state));
+       ast_aji_send(client, iks_root(request));
+       iks_delete(request);
+}
+
+/*!
+ * \brief Publish MWI to a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param device the name of the device whose state to publish
+ * \param device_state the state to publish
+ * \return void
+ */
+static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
+       const char *context, const char *oldmsgs, const char *newmsgs)
+{
+       char full_mailbox[AST_MAX_EXTENSION+AST_MAX_CONTEXT];
+       char eid_str[20];
+       iks *mailbox_node;
+       snprintf(full_mailbox, sizeof(full_mailbox), "%s@%s", mailbox, context);
+       iks *request = aji_build_publish_skeleton(client, full_mailbox, "message_waiting");
+       ast_eid_to_str(eid_str, sizeof(eid_str), &ast_eid_default);
+       mailbox_node = iks_insert(request, "mailbox");
+       iks_insert_attrib(mailbox_node, "xmlns", "http://asterisk.org");
+       iks_insert_attrib(mailbox_node, "eid", eid_str);
+       iks_insert_cdata(iks_insert(mailbox_node, "NEWMSGS"), newmsgs, strlen(newmsgs));
+       iks_insert_cdata(iks_insert(mailbox_node, "OLDMSGS"), oldmsgs, strlen(oldmsgs));
+       ast_aji_send(client, iks_root(request));
+       iks_delete(request);
+}
+
+/*!
+ * \brief Create an IQ packet
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param type the type of IQ packet to create
+ * \return iks*
+ */
+static iks* aji_pubsub_iq_create(struct aji_client *client, const char *type)
+{
+       iks *request = iks_new("iq");
+
+       iks_insert_attrib(request, "to", client->pubsub_node);
+       iks_insert_attrib(request, "from", client->jid->full);
+       iks_insert_attrib(request, "type", type);
+       ast_aji_increment_mid(client->mid);
+       iks_insert_attrib(request, "id", client->mid);
+       return request;
+}
+
+static int aji_handle_pubsub_error(void *data, ikspak *pak)
+{
+       char *node_name;
+       char *error;
+       int error_num;
+       iks *orig_request;
+       iks *orig_pubsub = iks_find(pak->x, "pubsub");
+       struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+       if (!orig_pubsub) {
+               ast_log(LOG_ERROR, "Error isn't a PubSub error, why are we here?\n");
+               return IKS_FILTER_EAT;
+       }
+       orig_request = iks_child(orig_pubsub);
+       error = iks_find_attrib(iks_find(pak->x, "error"), "code");
+       node_name = iks_find_attrib(orig_request, "node");
+       if (!sscanf(error, "%30d", &error_num)) {
+               return IKS_FILTER_EAT;
+       }
+       if (error_num > 399 && error_num < 500 && error_num != 404) {
+               ast_log(LOG_ERROR,
+                       "Error performing operation on PubSub node %s, %s.\n", node_name, error);
+               return IKS_FILTER_EAT;
+       } else if (error_num > 499 && error_num < 600) {
+               ast_log(LOG_ERROR, "PubSub Server error, %s\n", error);
+               return IKS_FILTER_EAT;
+       }
+
+       if (!strcasecmp(iks_name(orig_request), "publish")) {
+               if (ast_test_flag(&pubsubflags, AJI_XEP0248)) {
+                       if (iks_find(iks_find(orig_request, "item"), "state")) {
+                               aji_create_pubsub_leaf(client, "device_state", node_name);
+                       } else if (iks_find(iks_find(orig_request, "item"), "mailbox")) {
+                               aji_create_pubsub_leaf(client, "message_waiting", node_name);
+                       }
+               } else {
+                       aji_create_pubsub_node(client, NULL, node_name, NULL);
+               }
+               iks *request = aji_pubsub_iq_create(client, "set");
+               iks_insert_node(request, orig_pubsub);
+               ast_aji_send(client, request);
+               iks_delete(request);
+               return IKS_FILTER_EAT;
+       } else if (!strcasecmp(iks_name(orig_request), "subscribe")) {
+               if (ast_test_flag(&pubsubflags, AJI_XEP0248)) {
+                       aji_create_pubsub_collection(client, node_name);
+               } else {
+                       aji_create_pubsub_node(client, NULL, node_name, NULL);
+               }
+       }
+
+       return IKS_FILTER_EAT;
+}
+
+/*!
+ * \brief Request item list from pubsub
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param collection name of the collection for request
+ * \return void
+ */
+static void aji_request_pubsub_nodes(struct aji_client *client, const char *collection)
+{
+       int res = 0;
+       iks *request = aji_build_node_request(client, collection);
+
+       iks_filter_add_rule(client->f, aji_receive_node_list, client, IKS_RULE_TYPE,
+               IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid,
+               IKS_RULE_DONE);
+       res = ast_aji_send(client, request);
+       iks_delete(request);
+
+}
+
+/*!
+ * \brief Build the a node request
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param collection name of the collection for request
+ * \return iks*
+ */
+static iks* aji_build_node_request(struct aji_client *client, const char *collection)
+{
+       iks *request = aji_pubsub_iq_create(client, "get");
+       iks *query;
+       query = iks_insert(request, "query");
+       iks_insert_attrib(query, "xmlns", "http://jabber.org/protocol/disco#items");
+       if (collection) {
+               iks_insert_attrib(query, "node", collection);
+       }
+       return request;
+}
+
+/*!
+ * \brief Receive pubsub item lists
+ * \param data pointer to aji_client structure
+ * \param pak response from pubsub diso#items query
+ * \return IKS_FILTER_EAT
+ */
+static int aji_receive_node_list(void *data, ikspak* pak)
+{
+
+       struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+       iks *item;
+       if (iks_has_children(pak->query)) {
+               item = iks_first_tag(pak->query);
+               ast_verbose("Connection: %s\nNode name: %s\n", client->jid->partial,
+                       iks_find_attrib(item, "node"));
+               while ((item = iks_next_tag(item))) {
+                       ast_verbose("Node name: %s\n", iks_find_attrib(item, "node"));
+               }
+       }
+       iks_delete(item);
+       return IKS_FILTER_EAT;
+}
+
+
+/*!
+ * \brief Method to expose PubSub node list via CLI.
+ * \param e pointer to ast_cli_entry structure
+ * \param cmd
+ * \param a pointer to ast_cli_args structure
+ * \return char *
+ */
+static char *aji_cli_list_pubsub_nodes(struct ast_cli_entry *e, int cmd, struct
+ast_cli_args *a)
+{
+               struct aji_client *client;
+               const char *name = "asterisk";
+               const char *collection = NULL;
+
+               switch (cmd) {
+               case CLI_INIT:
+                               e->command = "jabber list nodes";
+                               e->usage =
+                                       "Usage: jabber list nodes [name]\n"
+                                       "       Lists nodes on PubSub server\n"
+                                       "       as configured in jabber.conf.\n";
+                       return NULL;
+               case CLI_GENERATE:
+                       return NULL;
+               }
+
+               if (a->argc > 5 || a->argc < 4) {
+                       return CLI_SHOWUSAGE;
+               } else if (a->argc == 4 || a->argc == 5) {
+                       name = a->argv[3];
+               }
+               if (a->argc == 5) {
+                       collection = a->argv[4];
+               }
+        if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+                       ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+                       return CLI_FAILURE;
+               }
+
+               ast_cli(a->fd, "Listing pubsub nodes.\n");
+               aji_request_pubsub_nodes(client, collection);
+               return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Method to purge PubSub nodes via CLI.
+ * \param e pointer to ast_cli_entry structure
+ * \param cmd
+ * \param a pointer to ast_cli_args structure
+ * \return char *
+ */
+static char *aji_cli_purge_pubsub_nodes(struct ast_cli_entry *e, int cmd, struct
+       ast_cli_args *a)
+{
+       struct aji_client *client;
+       const char *name = "asterisk";
+
+       switch (cmd) {
+               case CLI_INIT:
+                       e->command = "jabber purge nodes";
+                       e->usage =
+                                       "Usage: jabber purge nodes [name]\n"
+                                       "       Purges nodes on PubSub server\n"
+                                       "       as configured in jabber.conf.\n";
+                       return NULL;
+               case CLI_GENERATE:
+                       return NULL;
+       }
+
+       if (a->argc > 5) {
+               return CLI_SHOWUSAGE;
+       } else if (a->argc == 5) {
+               name = a->argv[3];
+       }
+       if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+               ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+               return CLI_FAILURE;
+       }
+       if (ast_test_flag(&pubsubflags, AJI_XEP0248)) {
+               aji_pubsub_purge_nodes(client, a->argv[4]);
+       } else {
+               aji_delete_pubsub_node(client, a->argv[4]);
+       }
+       return CLI_SUCCESS;
+}
+
+static void aji_pubsub_purge_nodes(struct aji_client *client, const char* collection_name)
+{
+       int res = 0;
+       iks *request = aji_build_node_request(client, collection_name);
+       ast_aji_send(client, request);
+       iks_filter_add_rule(client->f, aji_delete_node_list, client, IKS_RULE_TYPE,
+               IKS_PAK_IQ, IKS_RULE_SUBTYPE, IKS_TYPE_RESULT, IKS_RULE_ID, client->mid,
+               IKS_RULE_DONE);
+       res = ast_aji_send(client, request);
+       iks_delete(request);
+}
+
+/*!
+ * \brief Delete pubsub item lists
+ * \param data pointer to aji_client structure
+ * \param pak response from pubsub diso#items query
+ * \return IKS_FILTER_EAT
+ */
+static int aji_delete_node_list(void *data, ikspak* pak)
+{
+
+       struct aji_client *client = ASTOBJ_REF((struct aji_client *) data);
+       iks *item;
+       if (iks_has_children(pak->query)) {
+               item = iks_first_tag(pak->query);
+               ast_log(LOG_WARNING, "Connection: %s  Node name: %s\n", client->jid->partial,
+                               iks_find_attrib(item, "node"));
+               while ((item = iks_next_tag(item))) {
+                       aji_delete_pubsub_node(client, iks_find_attrib(item, "node"));
+               }
+       }
+       iks_delete(item);
+       return IKS_FILTER_EAT;
+}
+
+
+/*!
+ * \brief Method to expose PubSub node deletion via CLI.
+ * \param e pointer to ast_cli_entry structure
+ * \param cmd
+ * \param a pointer to ast_cli_args structure
+ * \return char *
+ */
+static char *aji_cli_delete_pubsub_node(struct ast_cli_entry *e, int cmd, struct
+       ast_cli_args *a)
+{
+       struct aji_client *client;
+       const char *name = "asterisk";
+
+       switch (cmd) {
+               case CLI_INIT:
+                       e->command = "jabber delete node";
+                       e->usage =
+                                       "Usage: jabber delete node [name]\n"
+                                       "       Deletes a node on PubSub server\n"
+                                       "       as configured in jabber.conf.\n";
+                       return NULL;
+               case CLI_GENERATE:
+                       return NULL;
+       }
+
+       if (a->argc > 5) {
+               return CLI_SHOWUSAGE;
+       } else if (a->argc == 5) {
+               name = a->argv[3];
+       }
+       if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+               ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+               return CLI_FAILURE;
+       }
+       aji_delete_pubsub_node(client, a->argv[4]);
+       return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Delete a PubSub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node_name the name of the node to delete
+ * return void
+ */
+static void aji_delete_pubsub_node(struct aji_client *client, const char *node_name)
+{
+       iks *request = aji_pubsub_iq_create(client, "set");
+       iks *pubsub, *delete;
+       pubsub = iks_insert(request, "pubsub");
+       iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub#owner");
+       delete = iks_insert(pubsub, "delete");
+       iks_insert_attrib(delete, "node", node_name);
+       ast_aji_send(client, request);
+}
+
+/*!
+ * \brief Create a PubSub collection node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param collection_name The name to use for this collection
+ * \return void.
+ */
+static void aji_create_pubsub_collection(struct aji_client *client, const char
+*collection_name)
+{
+       aji_create_pubsub_node(client, "collection", collection_name, NULL);
+}
+
+
+/*!
+ * \brief Create a PubSub leaf node.
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param leaf_name The name to use for this collection
+ * \return void.
+ */
+static void aji_create_pubsub_leaf(struct aji_client *client, const char *collection_name,
+const char *leaf_name)
+{
+       aji_create_pubsub_node(client, "leaf", leaf_name, collection_name);
+}
+
+/*!
+ * \brief Create a pubsub node
+ * \param client the configured XMPP client we use to connect to a XMPP server
+ * \param node_type the type of node to create
+ * \param name the name of the node to create
+ * \return iks*
+ */
+static iks* aji_create_pubsub_node(struct aji_client *client, const char *node_type, const
+               char *name, const char *collection_name)
+{
+       int res = 0;
+       iks *node = aji_pubsub_iq_create(client, "set");
+       iks *pubsub, *create, *configure;
+       pubsub = iks_insert(node, "pubsub");
+       iks_insert_attrib(pubsub, "xmlns", "http://jabber.org/protocol/pubsub");
+       create = iks_insert(pubsub, "create");
+       iks_insert_attrib(create, "node", name);
+       configure = aji_build_node_config(pubsub, node_type, collection_name);
+       res = ast_aji_send(client, node);
+       aji_create_affiliations(client, name);
+       iks_delete(node);
+       return 0;
+}
+
+
+
+static iks* aji_build_node_config(iks *pubsub, const char *node_type, const char *collection_name)
+{
+       iks *configure, *x, *field_owner, *field_node_type, *field_node_config,
+               *field_deliver_payload, *field_persist_items, *field_access_model,
+               *field_pubsub_collection;
+       configure = iks_insert(pubsub, "configure");
+       x = iks_insert(configure, "x");
+       iks_insert_attrib(x, "xmlns", "jabber:x:data");
+       iks_insert_attrib(x, "type", "submit");
+       field_owner = iks_insert(x, "field");
+       iks_insert_attrib(field_owner, "var", "FORM_TYPE");
+       iks_insert_attrib(field_owner, "type", "hidden");
+       iks_insert_cdata(iks_insert(field_owner, "value"),
+               "http://jabber.org/protocol/pubsub#owner", 39);
+       if (node_type) {
+               field_node_type = iks_insert(x, "field");
+               iks_insert_attrib(field_node_type, "var", "pubsub#node_type");
+               iks_insert_cdata(iks_insert(field_node_type, "value"), node_type, strlen(node_type));
+       }
+       field_node_config = iks_insert(x, "field");
+       iks_insert_attrib(field_node_config, "var", "FORM_TYPE");
+       iks_insert_attrib(field_node_config, "type", "hidden");
+       iks_insert_cdata(iks_insert(field_node_config, "value"),
+               "http://jabber.org/protocol/pubsub#node_config", 45);
+       field_deliver_payload = iks_insert(x, "field");
+       iks_insert_attrib(field_deliver_payload, "var", "pubsub#deliver_payloads");
+       iks_insert_cdata(iks_insert(field_deliver_payload, "value"), "1", 1);
+       field_persist_items = iks_insert(x, "field");
+       iks_insert_attrib(field_persist_items, "var", "pubsub#persist_items");
+       iks_insert_cdata(iks_insert(field_persist_items, "value"), "1", 1);
+       field_access_model = iks_insert(x, "field");
+       iks_insert_attrib(field_access_model, "var", "pubsub#access_model");
+       iks_insert_cdata(iks_insert(field_access_model, "value"), "whitelist", 9);
+       if (node_type && !strcasecmp(node_type, "leaf")) {
+               field_pubsub_collection = iks_insert(x, "field");
+               iks_insert_attrib(field_pubsub_collection, "var", "pubsub#collection");
+               iks_insert_cdata(iks_insert(field_pubsub_collection, "value"), collection_name,
+                       strlen(collection_name));
+       }
+       return configure;
+}
+
+
+
+/*!
+ * \brief Method to expose PubSub collection node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_collection(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+               struct aji_client *client;
+               const char *name = "asterisk";
+               const char *collection_name = "test_collection";
+
+               switch (cmd) {
+               case CLI_INIT:
+                               e->command = "jabber create collection";
+                               e->usage =
+                                       "Usage: jabber create collection [name] [node name]\n"
+                                       "       Creates a PubSub collection node using the account\n"
+                                       "       as configured in jabber.conf.\n";
+                       return NULL;
+               case CLI_GENERATE:
+                       return NULL;
+               }
+
+               if (a->argc > 5) {
+                       return CLI_SHOWUSAGE;
+               } else if (a->argc == 5) {
+                       name = a->argv[3];
+                       collection_name = a->argv[4];
+               }
+
+        if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+                       ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+                       return CLI_FAILURE;
+               }
+
+               ast_cli(a->fd, "Creating test PubSub node collection.\n");
+               aji_create_pubsub_collection(client, collection_name);
+               return CLI_SUCCESS;
+}
+
+/*!
+ * \brief Method to expose PubSub leaf node creation via CLI.
+ * \return char *.
+ */
+static char *aji_cli_create_leafnode(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
+{
+       struct aji_client *client;
+       const char *name = "asterisk";
+       const char *collection_name = "test_collection";
+       const char *leaf_name = "test_leaf";
+
+       switch (cmd) {
+               case CLI_INIT:
+                       e->command = "jabber create leaf";
+                       e->usage =
+                                       "Usage: jabber create leaf [name] [collection_name] [node name]\n"
+                                       "       Creates a PubSub leaf node using the account\n"
+                                       "       as configured in jabber.conf.\n";
+                       return NULL;
+               case CLI_GENERATE:
+                       return NULL;
+       }
+
+       if (a->argc > 6) {
+               return CLI_SHOWUSAGE;
+       } else if (a->argc == 6) {
+               name = a->argv[3];
+               collection_name = a->argv[4];
+               leaf_name = a->argv[5];
+       }
+
+       if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
+               ast_cli(a->fd, "Unable to find client '%s'!\n", name);
+               return CLI_FAILURE;
+       }
+
+       ast_cli(a->fd, "Creating test PubSub node collection.\n");
+       aji_create_pubsub_leaf(client, collection_name, leaf_name);
+       return CLI_SUCCESS;
+}
+
+
+
+/*!
  * \internal
  * \brief set presence of client.
  * \param client the configured XMPP client we use to connect to a XMPP server
@@ -3055,10 +3909,12 @@ static void aji_set_presence(struct aji_client *client, char *to, char *from, in
        char priorityS[10];
 
        if (presence && cnode && client && priority) {
-               if (to)
+               if (to) {
                        iks_insert_attrib(presence, "to", to);
-               if (from)
+               }
+               if (from) {
                        iks_insert_attrib(presence, "from", from);
+               }
                snprintf(priorityS, sizeof(priorityS), "%d", client->priority);
                iks_insert_cdata(priority, priorityS, strlen(priorityS));
                iks_insert_node(presence, priority);
@@ -3068,8 +3924,9 @@ static void aji_set_presence(struct aji_client *client, char *to, char *from, in
                iks_insert_attrib(cnode, "xmlns", "http://jabber.org/protocol/caps");
                iks_insert_node(presence, cnode);
                res = ast_aji_send(client, presence);
-       } else
+       } else {
                ast_log(LOG_ERROR, "Out of memory.\n");
+       }
 
        iks_delete(cnode);
        iks_delete(presence);
@@ -3092,10 +3949,10 @@ static int aji_set_group_presence(struct aji_client *client, char *room, int lev
        iks *presence = NULL, *x = NULL;
        char from[AJI_MAX_JIDLEN];
        char roomid[AJI_MAX_JIDLEN];
-       
+
        presence = iks_make_pres(level, NULL);
        x = iks_new("x");
-       
+
        if (client->component) {
                snprintf(from, AJI_MAX_JIDLEN, "%s@%s/%s", nick, client->jid->full, nick);
                snprintf(roomid, AJI_MAX_JIDLEN, "%s/%s", room, nick);
@@ -3103,19 +3960,19 @@ static int aji_set_group_presence(struct aji_client *client, char *room, int lev
                snprintf(from, AJI_MAX_JIDLEN, "%s", client->jid->full);
                snprintf(roomid, AJI_MAX_JIDLEN, "%s/%s", room, nick ? nick : client->jid->user);
        }
-       
+
        if (!presence || !x || !client) {
                ast_log(LOG_ERROR, "Out of memory.\n");
                res = -1;
                goto safeout;
-       } else { 
+       } else {
                iks_insert_attrib(presence, "to", roomid);
                iks_insert_attrib(presence, "from", from);
                iks_insert_attrib(x, "xmlns", MUC_NS);
                iks_insert_node(presence, x);
                res = ast_aji_send(client, presence);
        }
-       
+
 safeout:
        iks_delete(presence);
        iks_delete(x);
@@ -3140,12 +3997,13 @@ static char *aji_do_set_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_a
                return NULL;
        }
 
-       if (a->argc != e->args)
+       if (a->argc != e->args) {
                return CLI_SHOWUSAGE;
+       }
 
        if (!strncasecmp(a->argv[e->args - 1], "on", 2)) {
                ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
-                       ASTOBJ_RDLOCK(iterator); 
+                       ASTOBJ_RDLOCK(iterator);
                        iterator->debug = 1;
                        ASTOBJ_UNLOCK(iterator);
                });
@@ -3153,7 +4011,7 @@ static char *aji_do_set_debug(struct ast_cli_entry *e, int cmd, struct ast_cli_a
                return CLI_SUCCESS;
        } else if (!strncasecmp(a->argv[e->args - 1], "off", 3)) {
                ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
-                       ASTOBJ_RDLOCK(iterator); 
+                       ASTOBJ_RDLOCK(iterator);
                        iterator->debug = 0;
                        ASTOBJ_UNLOCK(iterator);
                });
@@ -3195,7 +4053,7 @@ static char *aji_show_clients(struct ast_cli_entry *e, int cmd, struct ast_cli_a
 {
        char *status;
        int count = 0;
-       
+
        switch (cmd) {
        case CLI_INIT:
                e->command = "jabber show connected";
@@ -3255,22 +4113,22 @@ static char *aji_show_buddies(struct ast_cli_entry *e, int cmd, struct ast_cli_a
 
        ast_cli(a->fd, "Jabber buddy lists\n");
        ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
-               ast_cli(a->fd,"Client: %s\n", iterator->user);
+               ast_cli(a->fd, "Client: %s\n", iterator->user);
                client = iterator;
                ASTOBJ_CONTAINER_TRAVERSE(&client->buddies, 1, {
                        ASTOBJ_RDLOCK(iterator);
-                       ast_cli(a->fd,"\tBuddy:\t%s\n", iterator->name);
+                       ast_cli(a->fd, "\tBuddy:\t%s\n", iterator->name);
                        if (!iterator->resources)
-                               ast_cli(a->fd,"\t\tResource: None\n");
+                               ast_cli(a->fd, "\t\tResource: None\n");
                        for (resource = iterator->resources; resource; resource = resource->next) {
-                               ast_cli(a->fd,"\t\tResource: %s\n", resource->resource);
+                               ast_cli(a->fd, "\t\tResource: %s\n", resource->resource);
                                if (resource->cap) {
-                                       ast_cli(a->fd,"\t\t\tnode: %s\n", resource->cap->parent->node);
-                                       ast_cli(a->fd,"\t\t\tversion: %s\n", resource->cap->version);
-                                       ast_cli(a->fd,"\t\t\tJingle capable: %s\n", resource->cap->jingle ? "yes" : "no");
+                                       ast_cli(a->fd, "\t\t\tnode: %s\n", resource->cap->parent->node);
+                                       ast_cli(a->fd, "\t\t\tversion: %s\n", resource->cap->version);
+                                       ast_cli(a->fd, "\t\t\tJingle capable: %s\n", resource->cap->jingle ? "yes" : "no");
                                }
-                               ast_cli(a->fd,"\t\tStatus: %d\n", resource->status);
-                               ast_cli(a->fd,"\t\tPriority: %d\n", resource->priority);
+                               ast_cli(a->fd, "\t\tStatus: %d\n", resource->status);
+                               ast_cli(a->fd, "\t\tPriority: %d\n", resource->priority);
                        }
                        ASTOBJ_UNLOCK(iterator);
                });
@@ -3303,10 +4161,11 @@ static char *aji_test(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
                return NULL;
        }
 
-       if (a->argc > 3)
+       if (a->argc > 3) {
                return CLI_SHOWUSAGE;
-       else if (a->argc == 3)
+       } else if (a->argc == 3) {
                name = a->argv[2];
+       }
 
        if (!(client = ASTOBJ_CONTAINER_FIND(&clients, name))) {
                ast_cli(a->fd, "Unable to find client '%s'!\n", name);
@@ -3327,7 +4186,7 @@ static char *aji_test(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
                        }
                        ast_verbose("   Priority: %d\n", resource->priority);
                        ast_verbose("   Status: %d\n", resource->status);
-                       ast_verbose("   Message: %s\n", S_OR(resource->description,""));
+                       ast_verbose("   Message: %s\n", S_OR(resource->description, ""));
                }
                ASTOBJ_UNLOCK(iterator);
        });
@@ -3347,7 +4206,7 @@ static char *aji_test(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
  * \brief creates aji_client structure.
  * \param label
  * \param var ast_variable
- * \param debug 
+ * \param debug
  * \return 0.
  */
 static int aji_create_client(char *label, struct ast_variable *var, int debug)
@@ -3356,7 +4215,7 @@ static int aji_create_client(char *label, struct ast_variable *var, int debug)
        struct aji_client *client = NULL;
        int flag = 0;
 
-       client = ASTOBJ_CONTAINER_FIND(&clients,label);
+       client = ASTOBJ_CONTAINER_FIND(&clients, label);
        if (!client) {
                flag = 1;
                client = ast_calloc(1, sizeof(*client));
@@ -3385,6 +4244,7 @@ static int aji_create_client(char *label, struct ast_variable *var, int debug)
        client->keepalive = 1;
        client->timeout = 50;
        client->message_timeout = 5;
+       client->distribute_events = 0;
        AST_LIST_HEAD_INIT(&client->messages);
        client->component = 0;
        ast_copy_string(client->statusmessage, "Online and Available", sizeof(client->statusmessage));
@@ -3396,79 +4256,91 @@ static int aji_create_client(char *label, struct ast_variable *var, int debug)
                client->state = AJI_DISCONNECTED;
        }
        while (var) {
-               if (!strcasecmp(var->name, "username"))
+               if (!strcasecmp(var->name, "username")) {
                        ast_copy_string(client->user, var->value, sizeof(client->user));
-               else if (!strcasecmp(var->name, "serverhost"))
+               } else if (!strcasecmp(var->name, "serverhost")) {
                        ast_copy_string(client->serverhost, var->value, sizeof(client->serverhost));
-               else if (!strcasecmp(var->name, "secret"))
+               } else if (!strcasecmp(var->name, "secret")) {
                        ast_copy_string(client->password, var->value, sizeof(client->password));
-               else if (!strcasecmp(var->name, "statusmessage"))
+               } else if (!strcasecmp(var->name, "statusmessage")) {
                        ast_copy_string(client->statusmessage, var->value, sizeof(client->statusmessage));
-               else if (!strcasecmp(var->name, "port"))
+               } else if (!strcasecmp(var->name, "port")) {
                        client->port = atoi(var->value);
-               else if (!strcasecmp(var->name, "timeout"))
+               } else if (!strcasecmp(var->name, "timeout")) {
                        client->message_timeout = atoi(var->value);
-               else if (!strcasecmp(var->name, "debug"))
+               } else if (!strcasecmp(var->name, "debug")) {
                        client->debug = (ast_false(var->value)) ? 0 : 1;
-               else if (!strcasecmp(var->name, "type")) {
-                       if (!strcasecmp(var->value, "component"))
+               } else if (!strcasecmp(var->name, "type")) {
+                       if (!strcasecmp(var->value, "component")) {
                                client->component = 1;
+                               if (client->distribute_events) {
+                                       ast_log(LOG_ERROR, "Client cannot be configured to be both a component and to distribute events!  Event distribution will be disabled.\n");
+                                       client->distribute_events = 0;
+                               }
+                       }
+               } else if (!strcasecmp(var->name, "distribute_events")) {
+                       if (ast_true(var->value)) {
+                               if (client->component) {
+                                       ast_log(LOG_ERROR, "Client cannot be configured to be both a component and to distribute events!  Event distribution will be disabled.\n");
+                               } else {
+                                       if (ast_test_flag(&pubsubflags, AJI_PUBSUB)) {
+                                               ast_log(LOG_ERROR, "Only one connection can be configured for distributed events.\n");
+                                       } else {
+                                               ast_set_flag(&pubsubflags, AJI_PUBSUB);
+                                               client->distribute_events = 1;
+                                       }
+                               }
+                       }
+               } else if (!strcasecmp(var->name, "pubsub_node")) {
+                       ast_copy_string(client->pubsub_node, var->value, sizeof(client->pubsub_node));
                } else if (!strcasecmp(var->name, "usetls")) {
                        client->usetls = (ast_false(var->value)) ? 0 : 1;
                } else if (!strcasecmp(var->name, "usesasl")) {
                        client->usesasl = (ast_false(var->value)) ? 0 : 1;
-               } else if (!strcasecmp(var->name, "forceoldssl"))
+               } else if (!strcasecmp(var->name, "forceoldssl")) {
                        client->forcessl = (ast_false(var->value)) ? 0 : 1;
-               else if (!strcasecmp(var->name, "keepalive"))
+               } else if (!strcasecmp(var->name, "keepalive")) {
                        client->keepalive = (ast_false(var->value)) ? 0 : 1;
-               else if (!strcasecmp(var->name, "autoprune"))
+               } else if (!strcasecmp(var->name, "autoprune")) {
                        ast_set2_flag(&client->flags, ast_true(var->value), AJI_AUTOPRUNE);
-               else if (!strcasecmp(var->name, "autoregister"))
+               } else if (!strcasecmp(var->name, "autoregister")) {
                        ast_set2_flag(&client->flags, ast_true(var->value), AJI_AUTOREGISTER);
-               else if (!strcasecmp(var->name, "auth_policy")) {
+               } else if (!strcasecmp(var->name, "auth_policy")) {
                        if (!strcasecmp(var->value, "accept")) {
                                ast_set_flag(&client->flags, AJI_AUTOACCEPT);
                        } else {
                                ast_clear_flag(&client->flags, AJI_AUTOACCEPT);
                        }
-               }
-               else if (!strcasecmp(var->name, "buddy"))
+               } else if (!strcasecmp(var->name, "buddy")) {
                        aji_create_buddy((char *)var->value, client);
-               else if (!strcasecmp(var->name, "priority"))
+               } else if (!strcasecmp(var->name, "priority")) {
                        client->priority = atoi(var->value);
-               else if (!strcasecmp(var->name, "status")) {
-                       if (!strcasecmp(var->value, "unavailable"))
+               } else if (!strcasecmp(var->name, "status")) {
+                       if (!strcasecmp(var->value, "unavailable")) {
                                client->status = IKS_SHOW_UNAVAILABLE;
-                       else
-                       if (!strcasecmp(var->value, "available")
-                        || !strcasecmp(var->value, "online"))
+                       } else if (!strcasecmp(var->value, "available")
+                        || !strcasecmp(var->value, "online")) {
                                client->status = IKS_SHOW_AVAILABLE;
-                       else
-                       if (!strcasecmp(var->value, "chat")
-                        || !strcasecmp(var->value, "chatty"))
+                       } else if (!strcasecmp(var->value, "chat")
+                        || !strcasecmp(var->value, "chatty")) {
                                client->status = IKS_SHOW_CHAT;
-                       else
-                       if (!strcasecmp(var->value, "away"))
+                       } else if (!strcasecmp(var->value, "away")) {
                                client->status = IKS_SHOW_AWAY;
-                       else
-                       if (!strcasecmp(var->value, "xa")
-                        || !strcasecmp(var->value, "xaway"))
+                       } else if (!strcasecmp(var->value, "xa")
+                        || !strcasecmp(var->value, "xaway")) {
                                client->status = IKS_SHOW_XA;
-                       else
-                       if (!strcasecmp(var->value, "dnd"))
+                       } else if (!strcasecmp(var->value, "dnd")) {
                                client->status = IKS_SHOW_DND;
-                       else
-                       if (!strcasecmp(var->value, "invisible"))
+                       } else if (!strcasecmp(var->value, "invisible")) {
                        #ifdef IKS_SHOW_INVISIBLE
                                client->status = IKS_SHOW_INVISIBLE;
                        #else
-                       {
                                ast_log(LOG_WARNING, "Your iksemel doesn't support invisible status: falling back to DND\n");
                                client->status = IKS_SHOW_DND;
-                       }
                        #endif
-                       else
+                       } else {
                                ast_log(LOG_WARNING, "Unknown presence status: %s\n", var->value);
+                       }
                }
        /* no transport support in this version */
        /*      else if (!strcasecmp(var->name, "transport"))
@@ -3504,8 +4376,9 @@ static int aji_create_client(char *label, struct ast_variable *var, int debug)
                        client->jid = iks_id_new(client->stack, resource);
                        ast_free(resource);
                }
-       } else
+       } else {
                client->jid = iks_id_new(client->stack, client->user);
+       }
        if (client->component) {
                iks_filter_add_rule(client->f, aji_dinfo_handler, client, IKS_RULE_NS, "http://jabber.org/protocol/disco#info", IKS_RULE_DONE);
                iks_filter_add_rule(client->f, aji_ditems_handler, client, IKS_RULE_NS, "http://jabber.org/protocol/disco#items", IKS_RULE_DONE);
@@ -3514,12 +4387,15 @@ static int aji_create_client(char *label, struct ast_variable *var, int debug)
        } else {
                iks_filter_add_rule(client->f, aji_client_info_handler, client, IKS_RULE_NS, "http://jabber.org/protocol/disco#info", IKS_RULE_DONE);
        }
+
        iks_set_log_hook(client->p, aji_log_hook);
        ASTOBJ_UNLOCK(client);
-       ASTOBJ_CONTAINER_LINK(&clients,client);
+       ASTOBJ_CONTAINER_LINK(&clients, client);
        return 1;
 }
 
+
+
 #if 0
 /*!
  * \brief creates transport.
@@ -3582,7 +4458,7 @@ static int aji_create_buddy(char *label, struct aji_client *client)
 {
        struct aji_buddy *buddy = NULL;
        int flag = 0;
-       buddy = ASTOBJ_CONTAINER_FIND(&client->buddies,label);
+       buddy = ASTOBJ_CONTAINER_FIND(&client->buddies, label);
        if (!buddy) {
                flag = 1;
                buddy = ast_calloc(1, sizeof(*buddy));
@@ -3595,9 +4471,9 @@ static int aji_create_buddy(char *label, struct aji_client *client)
        ASTOBJ_WRLOCK(buddy);
        ast_copy_string(buddy->name, label, sizeof(buddy->name));
        ASTOBJ_UNLOCK(buddy);
-       if (flag)
+       if (flag) {
                ASTOBJ_CONTAINER_LINK(&client->buddies, buddy);
-       else {
+       } else {
                ASTOBJ_UNMARK(buddy);
                ASTOBJ_UNREF(buddy, aji_buddy_destroy);
        }
@@ -3613,8 +4489,9 @@ static int aji_load_config(int reload)
        struct ast_variable *var = NULL;
        struct ast_flags config_flags = { reload ? CONFIG_FLAG_FILEUNCHANGED : 0 };
 
-       if ((cfg = ast_config_load(JABBER_CONFIG, config_flags)) == CONFIG_STATUS_FILEUNCHANGED)
+       if ((cfg = ast_config_load(JABBER_CONFIG, config_flags)) == CONFIG_STATUS_FILEUNCHANGED) {
                return -1;
+       }
 
        /* Reset flags to default value */
        ast_set_flag(&globalflags, AJI_AUTOREGISTER | AJI_AUTOACCEPT);
@@ -3632,6 +4509,10 @@ static int aji_load_config(int reload)
                        ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOPRUNE);
                } else if (!strcasecmp(var->name, "autoregister")) {
                        ast_set2_flag(&globalflags, ast_true(var->value), AJI_AUTOREGISTER);
+               } else if (!strcasecmp(var->name, "collection_nodes")) {
+                       ast_set2_flag(&pubsubflags, ast_true(var->value), AJI_XEP0248);
+               } else if (!strcasecmp(var->name, "pubsub_autocreate")) {
+                       ast_set2_flag(&pubsubflags, ast_true(var->value), AJI_PUBSUB_AUTOCREATE);
                } else if (!strcasecmp(var->name, "auth_policy")) {
                        if (!strcasecmp(var->value, "accept")) {
                                ast_set_flag(&globalflags, AJI_AUTOACCEPT);
@@ -3643,8 +4524,8 @@ static int aji_load_config(int reload)
 
        while (cat) {
                if (strcasecmp(cat, "general")) {
-                               var = ast_variable_browse(cfg, cat);
-                               aji_create_client(cat, var, debug);
+                       var = ast_variable_browse(cfg, cat);
+                       aji_create_client(cat, var, debug);
                }
                cat = ast_category_browse(cfg, cat);
        }
@@ -3653,9 +4534,9 @@ static int aji_load_config(int reload)
 }
 
 /*!
- * \brief grab a aji_client structure by label name or JID 
+ * \brief grab a aji_client structure by label name or JID
  * (without the resource string)
- * \param name label or JID 
+ * \param name label or JID
  * \return aji_client.
  */
 struct aji_client *ast_aji_get_client(const char *name)
@@ -3673,7 +4554,7 @@ struct aji_client *ast_aji_get_client(const char *name)
                        }
                        if (!strncasecmp(aux, name, strlen(aux))) {
                                client = iterator;
-                       }                               
+                       }
                });
        }
 
@@ -3685,9 +4566,9 @@ struct aji_client_container *ast_aji_get_clients(void)
        return &clients;
 }
 
-/*! 
+/*!
  * \internal
- * \brief  Send a Jabber Message via call from the Manager 
+ * \brief  Send a Jabber Message via call from the Manager
  * \param s mansession Manager session
  * \param m message Message to send
  * \return  0
@@ -3695,10 +4576,10 @@ struct aji_client_container *ast_aji_get_clients(void)
 static int manager_jabber_send(struct mansession *s, const struct message *m)
 {
        struct aji_client *client = NULL;
-       const char *id = astman_get_header(m,"ActionID");
-       const char *jabber = astman_get_header(m,"Jabber");
-       const char *screenname = astman_get_header(m,"ScreenName");
-       const char *message = astman_get_header(m,"Message");
+       const char *id = astman_get_header(m, "ActionID");
+       const char *jabber = astman_get_header(m, "Jabber");
+       const char *screenname = astman_get_header(m, "ScreenName");
+       const char *message = astman_get_header(m, "Message");
 
        if (ast_strlen_zero(jabber)) {
                astman_send_error(s, m, "No transport specified");
@@ -3726,13 +4607,13 @@ static int manager_jabber_send(struct mansession *s, const struct message *m)
                astman_append(s, "Response: Error\r\n");
        }
        if (!ast_strlen_zero(id)) {
-               astman_append(s, "ActionID: %s\r\n",id);
+               astman_append(s, "ActionID: %s\r\n", id);
        }
        astman_append(s, "\r\n");
        return 0;
 }
 
-/*! 
+/*!
  * \internal
  * \brief Reload the jabber module
  */
@@ -3753,11 +4634,16 @@ static int aji_reload(int reload)
                if (iterator->state == AJI_DISCONNECTED) {
                        if (!iterator->thread)
                                ast_pthread_create_background(&iterator->thread, NULL, aji_recv_loop, iterator);
-               } else if (iterator->state == AJI_CONNECTING)
+               } else if (iterator->state == AJI_CONNECTING) {
                        aji_get_roster(iterator);
+                       if (iterator->distribute_events) {
+                               aji_init_event_distribution(iterator);
+                       }
+               }
+
                ASTOBJ_UNLOCK(iterator);
        });
-       
+
        return 1;
 }
 
@@ -3776,6 +4662,12 @@ static int unload_module(void)
        ast_unregister_application(app_ajileave);
        ast_manager_unregister("JabberSend");
        ast_custom_function_unregister(&jabberstatus_function);
+       if (mwi_sub) {
+               ast_event_unsubscribe(mwi_sub);
+       }
+       if (device_state_sub) {
+               ast_event_unsubscribe(device_state_sub);
+       }
        ast_custom_function_unregister(&jabberreceive_function);
 
        ASTOBJ_CONTAINER_TRAVERSE(&clients, 1, {
@@ -3809,8 +4701,8 @@ static int load_module(void)
        ast_register_application_xml(app_ajisend, aji_send_exec);
        ast_register_application_xml(app_ajisendgroup, aji_sendgroup_exec);
        ast_register_application_xml(app_ajistatus, aji_status_exec);
-       ast_register_application_xml(app_ajijoin, aji_join_exec);
-       ast_register_application_xml(app_ajileave, aji_leave_exec);
+       ast_register_application_xml(app_ajijoin, aji_join_exec);
+       ast_register_application_xml(app_ajileave, aji_leave_exec);
        ast_cli_register_multiple(aji_cli, ARRAY_LEN(aji_cli));
        ast_custom_function_register(&jabberstatus_function);
        ast_custom_function_register(&jabberreceive_function);