1f24de050bc2783a65e311f1838a90bfa8d0f2b4
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/app.h"
35 #include "asterisk/res_pjsip_pubsub.h"
36 #include "asterisk/module.h"
37 #include "asterisk/linkedlists.h"
38 #include "asterisk/astobj2.h"
39 #include "asterisk/datastore.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/taskprocessor.h"
42 #include "asterisk/sched.h"
43 #include "asterisk/res_pjsip.h"
44 #include "asterisk/callerid.h"
45 #include "asterisk/manager.h"
46 #include "asterisk/cli.h"
47 #include "asterisk/test.h"
48 #include "res_pjsip/include/res_pjsip_private.h"
49 #include "asterisk/res_pjsip_presence_xml.h"
50
51 /*** DOCUMENTATION
52         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
53                 <synopsis>
54                         Lists subscriptions.
55                 </synopsis>
56                 <syntax />
57                 <description>
58                         <para>
59                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
60                         is issued for each subscription object.  Once all detail events are completed an
61                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
62                         </para>
63                 </description>
64         </manager>
65         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
66                 <synopsis>
67                         Lists subscriptions.
68                 </synopsis>
69                 <syntax />
70                 <description>
71                         <para>
72                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
73                         is issued for each subscription object.  Once all detail events are completed an
74                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
75                         </para>
76                 </description>
77         </manager>
78         <manager name="PJSIPShowResourceLists" language="en_US">
79                 <synopsis>
80                         Displays settings for configured resource lists.
81                 </synopsis>
82                 <syntax />
83                 <description>
84                         <para>
85                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
86                         is issued for each resource list object.  Once all detail events are completed a
87                         <literal>ResourceListDetailComplete</literal> event is issued.
88                         </para>
89                 </description>
90         </manager>
91
92         <configInfo name="res_pjsip_pubsub" language="en_US">
93                 <synopsis>Module that implements publish and subscribe support.</synopsis>
94                 <configFile name="pjsip.conf">
95                         <configObject name="subscription_persistence">
96                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
97                                 <configOption name="packet">
98                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
99                                 </configOption>
100                                 <configOption name="src_name">
101                                         <synopsis>The source address of the subscription</synopsis>
102                                 </configOption>
103                                 <configOption name="src_port">
104                                         <synopsis>The source port of the subscription</synopsis>
105                                 </configOption>
106                                 <configOption name="transport_key">
107                                         <synopsis>The type of transport the subscription was received on</synopsis>
108                                 </configOption>
109                                 <configOption name="local_name">
110                                         <synopsis>The local address the subscription was received on</synopsis>
111                                 </configOption>
112                                 <configOption name="local_port">
113                                         <synopsis>The local port the subscription was received on</synopsis>
114                                 </configOption>
115                                 <configOption name="cseq">
116                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
117                                 </configOption>
118                                 <configOption name="tag">
119                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
120                                 </configOption>
121                                 <configOption name="endpoint">
122                                         <synopsis>The name of the endpoint that subscribed</synopsis>
123                                 </configOption>
124                                 <configOption name="expires">
125                                         <synopsis>The time at which the subscription expires</synopsis>
126                                 </configOption>
127                                 <configOption name="contact_uri">
128                                         <synopsis>The Contact URI of the dialog for the subscription</synopsis>
129                                 </configOption>
130                         </configObject>
131                         <configObject name="resource_list">
132                                 <synopsis>Resource list configuration parameters.</synopsis>
133                                 <description>
134                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
135                                         to be specified. This can be useful to decrease the amount of subscription traffic
136                                         that a server has to process.</para>
137                                         <note>
138                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
139                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
140                                                 will need to make adjustments.</para>
141                                         </note>
142                                 </description>
143                                 <configOption name="type">
144                                         <synopsis>Must be of type 'resource_list'</synopsis>
145                                 </configOption>
146                                 <configOption name="event">
147                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
148                                         <description><para>
149                                                 The SIP event package describes the types of resources that Asterisk reports
150                                                 the state of.
151                                         </para>
152                                                 <enumlist>
153                                                         <enum name="presence"><para>
154                                                                 Device state and presence reporting.
155                                                         </para></enum>
156                                                         <enum name="dialog"><para>
157                                                                 This is identical to <replaceable>presence</replaceable>.
158                                                         </para></enum>
159                                                         <enum name="message-summary"><para>
160                                                                 Message-waiting indication (MWI) reporting.
161                                                         </para></enum>
162                                                 </enumlist>
163                                         </description>
164                                 </configOption>
165                                 <configOption name="list_item">
166                                         <synopsis>The name of a resource to report state on</synopsis>
167                                         <description>
168                                                 <para>In general Asterisk looks up list items in the following way:</para>
169                                                 <para>1. Check if the list item refers to another configured resource list.</para>
170                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
171                                                    to find the specified resource.</para>
172                                                 <para>The second part means that the way the list item is specified depends
173                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
174                                                 set to <literal>presence</literal>, then list items should be in the form of
175                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
176                                                 names should be listed.</para>
177                                         </description>
178                                 </configOption>
179                                 <configOption name="full_state" default="no">
180                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
181                                         <description>
182                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
183                                                 a notification that contains the state of all resources in the list. If the option is
184                                                 disabled, Asterisk will construct a notification that only contains the states of
185                                                 resources that have changed.</para>
186                                                 <note>
187                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
188                                                         to send a notification with the states of all resources in the list. When a subscriber
189                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
190                                                         notification.</para>
191                                                 </note>
192                                         </description>
193                                 </configOption>
194                                 <configOption name="notification_batch_interval" default="0">
195                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
196                                         <description>
197                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
198                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
199                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
200                                                 many notifications.</para>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                         <configObject name="inbound-publication">
205                                 <synopsis>The configuration for inbound publications</synopsis>
206                                 <configOption name="endpoint" default="">
207                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
208                                 </configOption>
209                                 <configOption name="type">
210                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
211                                 </configOption>
212                         </configObject>
213                 </configFile>
214         </configInfo>
215  ***/
216
217 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
218
219 static struct pjsip_module pubsub_module = {
220         .name = { "PubSub Module", 13 },
221         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
222         .on_rx_request = pubsub_on_rx_request,
223 };
224
225 #define MOD_DATA_PERSISTENCE "sub_persistence"
226 #define MOD_DATA_MSG "sub_msg"
227
228 static const pj_str_t str_event_name = { "Event", 5 };
229
230 /*! \brief Scheduler used for automatically expiring publications */
231 static struct ast_sched_context *sched;
232
233 /*! \brief Number of buckets for publications (on a per handler) */
234 #define PUBLICATIONS_BUCKETS 37
235
236 /*! \brief Default expiration time for PUBLISH if one is not specified */
237 #define DEFAULT_PUBLISH_EXPIRES 3600
238
239 /*! \brief Number of buckets for subscription datastore */
240 #define DATASTORE_BUCKETS 53
241
242 /*! \brief Default expiration for subscriptions */
243 #define DEFAULT_EXPIRES 3600
244
245 /*! \brief Defined method for PUBLISH */
246 const pjsip_method pjsip_publish_method =
247 {
248         PJSIP_OTHER_METHOD,
249         { "PUBLISH", 7 }
250 };
251
252 /*!
253  * \brief The types of PUBLISH messages defined in RFC 3903
254  */
255 enum sip_publish_type {
256         /*!
257          * \brief Unknown
258          *
259          * \details
260          * This actually is not defined in RFC 3903. We use this as a constant
261          * to indicate that an incoming PUBLISH does not fit into any of the
262          * other categories and is thus invalid.
263          */
264         SIP_PUBLISH_UNKNOWN,
265
266         /*!
267          * \brief Initial
268          *
269          * \details
270          * The first PUBLISH sent. This will contain a non-zero Expires header
271          * as well as a body that indicates the current state of the endpoint
272          * that has sent the message. The initial PUBLISH is the only type
273          * of PUBLISH to not contain a Sip-If-Match header in it.
274          */
275         SIP_PUBLISH_INITIAL,
276
277         /*!
278          * \brief Refresh
279          *
280          * \details
281          * Used to keep a published state from expiring. This will contain a
282          * non-zero Expires header but no body since its purpose is not to
283          * update state.
284          */
285         SIP_PUBLISH_REFRESH,
286
287         /*!
288          * \brief Modify
289          *
290          * \details
291          * Used to change state from its previous value. This will contain
292          * a body updating the published state. May or may not contain an
293          * Expires header.
294          */
295         SIP_PUBLISH_MODIFY,
296
297         /*!
298          * \brief Remove
299          *
300          * \details
301          * Used to remove published state from an ESC. This will contain
302          * an Expires header set to 0 and likely no body.
303          */
304         SIP_PUBLISH_REMOVE,
305 };
306
307 /*!
308  * \brief A vector of strings commonly used throughout this module
309  */
310 AST_VECTOR(resources, const char *);
311
312 /*!
313  * \brief Resource list configuration item
314  */
315 struct resource_list {
316         SORCERY_OBJECT(details);
317         /*! SIP event package the list uses. */
318         char event[32];
319         /*! Strings representing resources in the list. */
320         struct resources items;
321         /*! Indicates if Asterisk sends full or partial state on notifications. */
322         unsigned int full_state;
323         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
324         unsigned int notification_batch_interval;
325 };
326
327 /*!
328  * Used to create new entity IDs by ESCs.
329  */
330 static int esc_etag_counter;
331
332 /*!
333  * \brief Structure representing a SIP publication
334  */
335 struct ast_sip_publication {
336         /*! Publication datastores set up by handlers */
337         struct ao2_container *datastores;
338         /*! \brief Entity tag for the publication */
339         int entity_tag;
340         /*! \brief Handler for this publication */
341         struct ast_sip_publish_handler *handler;
342         /*! \brief The endpoint with which the subscription is communicating */
343         struct ast_sip_endpoint *endpoint;
344         /*! \brief Expiration time of the publication */
345         int expires;
346         /*! \brief Scheduled item for expiration of publication */
347         int sched_id;
348         /*! \brief The resource the publication is to */
349         char *resource;
350         /*! \brief The name of the event type configuration */
351         char *event_configuration_name;
352         /*! \brief Data containing the above */
353         char data[0];
354 };
355
356
357 /*!
358  * \brief Structure used for persisting an inbound subscription
359  */
360 struct subscription_persistence {
361         /*! Sorcery object details */
362         SORCERY_OBJECT(details);
363         /*! The name of the endpoint involved in the subscription */
364         char *endpoint;
365         /*! SIP message that creates the subscription */
366         char packet[PJSIP_MAX_PKT_LEN];
367         /*! Source address of the message */
368         char src_name[PJ_INET6_ADDRSTRLEN];
369         /*! Source port of the message */
370         int src_port;
371         /*! Local transport key type */
372         char transport_key[32];
373         /*! Local transport address */
374         char local_name[PJ_INET6_ADDRSTRLEN];
375         /*! Local transport port */
376         int local_port;
377         /*! Next CSeq to use for message */
378         unsigned int cseq;
379         /*! Local tag of the dialog */
380         char *tag;
381         /*! When this subscription expires */
382         struct timeval expires;
383         /*! Contact URI */
384         char contact_uri[PJSIP_MAX_URL_SIZE];
385 };
386
387 /*!
388  * \brief The state of the subscription tree
389  */
390 enum sip_subscription_tree_state {
391         /*! Normal operation */
392         SIP_SUB_TREE_NORMAL = 0,
393         /*! A terminate has been requested by Asterisk, the client, or pjproject */
394         SIP_SUB_TREE_TERMINATE_PENDING,
395         /*! The terminate is in progress */
396         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
397         /*! The terminate process has finished and the subscription tree is no longer valid */
398         SIP_SUB_TREE_TERMINATED,
399 };
400
401 static char *sub_tree_state_description[] = {
402         "Normal",
403         "TerminatePending",
404         "TerminateInProgress",
405         "Terminated"
406 };
407
408 /*!
409  * \brief A tree of SIP subscriptions
410  *
411  * Because of the ability to subscribe to resource lists, a SIP
412  * subscription can result in a tree of subscriptions being created.
413  * This structure represents the information relevant to the subscription
414  * as a whole, to include the underlying PJSIP structure for the
415  * subscription.
416  */
417 struct sip_subscription_tree {
418         /*! The endpoint with which the subscription is communicating */
419         struct ast_sip_endpoint *endpoint;
420         /*! Serializer on which to place operations for this subscription */
421         struct ast_taskprocessor *serializer;
422         /*! The role for this subscription */
423         enum ast_sip_subscription_role role;
424         /*! Persistence information */
425         struct subscription_persistence *persistence;
426         /*! The underlying PJSIP event subscription structure */
427         pjsip_evsub *evsub;
428         /*! The underlying PJSIP dialog */
429         pjsip_dialog *dlg;
430         /*! Interval to use for batching notifications */
431         unsigned int notification_batch_interval;
432         /*! Scheduler ID for batched notification */
433         int notify_sched_id;
434         /*! Indicator if scheduled batched notification should be sent */
435         unsigned int send_scheduled_notify;
436         /*! The root of the subscription tree */
437         struct ast_sip_subscription *root;
438         /*! Is this subscription to a list? */
439         int is_list;
440         /*! Next item in the list */
441         AST_LIST_ENTRY(sip_subscription_tree) next;
442         /*! Subscription tree state */
443         enum sip_subscription_tree_state state;
444         /*! On asterisk restart, this is the task data used
445          * to restart the expiration timer if pjproject isn't
446          * capable of restarting the timer.
447          */
448         struct ast_sip_sched_task *expiration_task;
449 };
450
451 /*!
452  * \brief Structure representing a "virtual" SIP subscription.
453  *
454  * This structure serves a dual purpose. Structurally, it is
455  * the constructed tree of subscriptions based on the resources
456  * being subscribed to. API-wise, this serves as the handle that
457  * subscription handlers use in order to interact with the pubsub API.
458  */
459 struct ast_sip_subscription {
460         /*! Subscription datastores set up by handlers */
461         struct ao2_container *datastores;
462         /*! The handler for this subscription */
463         const struct ast_sip_subscription_handler *handler;
464         /*! Pointer to the base of the tree */
465         struct sip_subscription_tree *tree;
466         /*! Body generaator for NOTIFYs */
467         struct ast_sip_pubsub_body_generator *body_generator;
468         /*! Vector of child subscriptions */
469         AST_VECTOR(, struct ast_sip_subscription *) children;
470         /*! Saved NOTIFY body text for this subscription */
471         struct ast_str *body_text;
472         /*! Indicator that the body text has changed since the last notification */
473         int body_changed;
474         /*! The current state of the subscription */
475         pjsip_evsub_state subscription_state;
476         /*! For lists, the current version to place in the RLMI body */
477         unsigned int version;
478         /*! For lists, indicates if full state should always be communicated. */
479         unsigned int full_state;
480         /*! URI associated with the subscription */
481         pjsip_sip_uri *uri;
482         /*! Name of resource being subscribed to */
483         char resource[0];
484 };
485
486 /*!
487  * \brief Structure representing a publication resource
488  */
489 struct ast_sip_publication_resource {
490         /*! \brief Sorcery object details */
491         SORCERY_OBJECT(details);
492         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
493         char *endpoint;
494         /*! \brief Mapping for event types to configuration */
495         struct ast_variable *events;
496 };
497
498 static const char *sip_subscription_roles_map[] = {
499         [AST_SIP_SUBSCRIBER] = "Subscriber",
500         [AST_SIP_NOTIFIER] = "Notifier"
501 };
502
503 enum sip_persistence_update_type {
504         /*! Called from send request */
505         SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0,
506         /*! Subscription created from initial client request */
507         SUBSCRIPTION_PERSISTENCE_CREATED,
508         /*! Subscription recreated by asterisk on startup */
509         SUBSCRIPTION_PERSISTENCE_RECREATED,
510         /*! Subscription created from client refresh */
511         SUBSCRIPTION_PERSISTENCE_REFRESHED,
512 };
513
514 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
515
516 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
517 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
518
519 static pjsip_media_type rlmi_media_type;
520
521 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
522 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
523                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
524 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
525                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
526 static void pubsub_on_client_refresh(pjsip_evsub *sub);
527 static void pubsub_on_server_timeout(pjsip_evsub *sub);
528
529 static pjsip_evsub_user pubsub_cb = {
530         .on_evsub_state = pubsub_on_evsub_state,
531         .on_rx_refresh = pubsub_on_rx_refresh,
532         .on_rx_notify = pubsub_on_rx_notify,
533         .on_client_refresh = pubsub_on_client_refresh,
534         .on_server_timeout = pubsub_on_server_timeout,
535 };
536
537 /*! \brief Destructor for publication resource */
538 static void publication_resource_destroy(void *obj)
539 {
540         struct ast_sip_publication_resource *resource = obj;
541
542         ast_free(resource->endpoint);
543         ast_variables_destroy(resource->events);
544 }
545
546 /*! \brief Allocator for publication resource */
547 static void *publication_resource_alloc(const char *name)
548 {
549         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
550 }
551
552 /*! \brief Destructor for subscription persistence */
553 static void subscription_persistence_destroy(void *obj)
554 {
555         struct subscription_persistence *persistence = obj;
556
557         ast_free(persistence->endpoint);
558         ast_free(persistence->tag);
559 }
560
561 /*! \brief Allocator for subscription persistence */
562 static void *subscription_persistence_alloc(const char *name)
563 {
564         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
565 }
566
567 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
568 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
569 {
570         char tag[PJ_GUID_STRING_LENGTH + 1];
571
572         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
573          * look it up by id at all.
574          */
575         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
576                 "subscription_persistence", NULL);
577
578         pjsip_dialog *dlg = sub_tree->dlg;
579
580         if (!persistence) {
581                 return NULL;
582         }
583
584         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
585         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
586         persistence->tag = ast_strdup(tag);
587
588         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
589         return persistence;
590 }
591
592 /*! \brief Function which updates persistence information of a subscription in sorcery */
593 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
594         pjsip_rx_data *rdata, enum sip_persistence_update_type type)
595 {
596         pjsip_dialog *dlg;
597
598         if (!sub_tree->persistence) {
599                 return;
600         }
601
602         ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
603                 sub_tree->root->resource);
604
605         dlg = sub_tree->dlg;
606         sub_tree->persistence->cseq = dlg->local.cseq;
607
608         if (rdata) {
609                 int expires;
610                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
611                 pjsip_contact_hdr *contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
612
613                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
614                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
615
616                 if (contact_hdr) {
617                         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
618                                         sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
619                 } else {
620                         ast_log(LOG_WARNING, "Contact not updated due to missing contact header\n");
621                 }
622
623                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
624                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
625                  * will always point to the proper SIP message that is to be processed. When updating subscription
626                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
627                  * only ever have a single SIP message on it, and so we base persistence on that.
628                  */
629                 if (type == SUBSCRIPTION_PERSISTENCE_CREATED
630                         || type == SUBSCRIPTION_PERSISTENCE_RECREATED) {
631                         if (rdata->msg_info.msg_buf) {
632                                 ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
633                                                 MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
634                         } else {
635                                 ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
636                                                 sizeof(sub_tree->persistence->packet));
637                         }
638                 }
639                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
640                                 sizeof(sub_tree->persistence->src_name));
641                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
642                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
643                         sizeof(sub_tree->persistence->transport_key));
644                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
645                         sizeof(sub_tree->persistence->local_name));
646                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
647         }
648
649         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
650 }
651
652 /*! \brief Function which removes persistence of a subscription from sorcery */
653 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
654 {
655         if (!sub_tree->persistence) {
656                 return;
657         }
658
659         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
660         ao2_ref(sub_tree->persistence, -1);
661         sub_tree->persistence = NULL;
662 }
663
664
665 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
666 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
667                 size_t num_accept, const char *body_type);
668
669 /*! \brief Retrieve a handler using the Event header of an rdata message */
670 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
671 {
672         pjsip_event_hdr *event_header;
673         char event[32];
674         struct ast_sip_subscription_handler *handler;
675
676         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
677         if (!event_header) {
678                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
679                 return NULL;
680         }
681         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
682
683         handler = find_sub_handler_for_event_name(event);
684         if (!handler) {
685                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
686         }
687
688         return handler;
689 }
690
691 /*!
692  * \brief Accept headers that are exceptions to the rule
693  *
694  * Typically, when a SUBSCRIBE arrives, we attempt to find a
695  * body generator that matches one of the Accept headers in
696  * the request. When subscribing to a single resource, this works
697  * great. However, when subscribing to a list, things work
698  * differently. Most Accept header values are fine, but there
699  * are a couple that are endemic to resource lists that need
700  * to be ignored when searching for a body generator to use
701  * for the individual resources of the subscription.
702  */
703 const char *accept_exceptions[] =  {
704         "multipart/related",
705         "application/rlmi+xml",
706 };
707
708 /*!
709  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
710  *
711  * \retval 1 This Accept header value is an exception to the rule.
712  * \retval 0 This Accept header is not an exception to the rule.
713  */
714 static int exceptional_accept(const pj_str_t *accept)
715 {
716         int i;
717
718         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
719                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
720                         return 1;
721                 }
722         }
723
724         return 0;
725 }
726
727 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
728 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
729         const struct ast_sip_subscription_handler *handler)
730 {
731         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
732         char accept[AST_SIP_MAX_ACCEPT][64];
733         size_t num_accept_headers = 0;
734
735         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
736                 int i;
737
738                 for (i = 0; i < accept_header->count; ++i) {
739                         if (!exceptional_accept(&accept_header->values[i])) {
740                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
741                                 ++num_accept_headers;
742                         }
743                 }
744         }
745
746         if (num_accept_headers == 0) {
747                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
748                  * the default accept type for the event package is to be used.
749                  */
750                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
751                 num_accept_headers = 1;
752         }
753
754         return find_body_generator(accept, num_accept_headers, handler->body_type);
755 }
756
757 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
758  *
759  *  \retval 1 rdata has an eventlist containing supported header
760  *  \retval 0 rdata doesn't have an eventlist containing supported header
761  */
762 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
763 {
764         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
765
766         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
767                 int i;
768
769                 for (i = 0; i < supported_header->count; i++) {
770                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
771                                 return 1;
772                         }
773                 }
774         }
775
776         return 0;
777 }
778
779 struct resource_tree;
780
781 /*!
782  * \brief A node for a resource tree.
783  */
784 struct tree_node {
785         AST_VECTOR(, struct tree_node *) children;
786         unsigned int full_state;
787         char resource[0];
788 };
789
790 /*!
791  * \brief Helper function for retrieving a resource list for a given event.
792  *
793  * This will retrieve a resource list that corresponds to the resource and event provided.
794  *
795  * \param resource The name of the resource list to retrieve
796  * \param event The expected event name on the resource list
797  */
798 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
799 {
800         struct resource_list *list;
801
802         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
803         if (!list) {
804                 return NULL;
805         }
806
807         if (strcmp(list->event, event)) {
808                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
809                                 resource, list->event, event);
810                 ao2_cleanup(list);
811                 return NULL;
812         }
813
814         return list;
815 }
816
817 /*!
818  * \brief Allocate a tree node
819  *
820  * In addition to allocating and initializing the tree node, the node is also added
821  * to the vector of visited resources. See \ref build_resource_tree for more information
822  * on the visited resources.
823  *
824  * \param resource The name of the resource for this tree node.
825  * \param visited The vector of resources that have been visited.
826  * \param if allocating a list, indicate whether full state is requested in notifications.
827  * \retval NULL Allocation failure.
828  * \retval non-NULL The newly-allocated tree_node
829  */
830 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
831 {
832         struct tree_node *node;
833
834         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
835         if (!node) {
836                 return NULL;
837         }
838
839         strcpy(node->resource, resource);
840         if (AST_VECTOR_INIT(&node->children, 4)) {
841                 ast_free(node);
842                 return NULL;
843         }
844         node->full_state = full_state;
845
846         if (visited) {
847                 AST_VECTOR_APPEND(visited, resource);
848         }
849         return node;
850 }
851
852 /*!
853  * \brief Destructor for a tree node
854  *
855  * This function calls recursively in order to destroy
856  * all nodes lower in the tree from the given node in
857  * addition to the node itself.
858  *
859  * \param node The node to destroy.
860  */
861 static void tree_node_destroy(struct tree_node *node)
862 {
863         int i;
864         if (!node) {
865                 return;
866         }
867
868         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
869                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
870         }
871         AST_VECTOR_FREE(&node->children);
872         ast_free(node);
873 }
874
875 /*!
876  * \brief Determine if this resource has been visited already
877  *
878  * See \ref build_resource_tree for more information
879  *
880  * \param resource The resource currently being visited
881  * \param visited The resources that have previously been visited
882  */
883 static int have_visited(const char *resource, struct resources *visited)
884 {
885         int i;
886
887         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
888                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
889                         return 1;
890                 }
891         }
892
893         return 0;
894 }
895
896 /*!
897  * \brief Build child nodes for a given parent.
898  *
899  * This iterates through the items on a resource list and creates tree nodes for each one. The
900  * tree nodes created are children of the supplied parent node. If an item in the resource
901  * list is itself a list, then this function is called recursively to provide children for
902  * the new node.
903  *
904  * If an item in a resource list is not a list, then the supplied subscription handler is
905  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
906  * is used to determine if the node can be added to the tree or not.
907  *
908  * If a parent node ends up having no child nodes added under it, then the parent node is
909  * pruned from the tree.
910  *
911  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
912  * \param handler The subscription handler for leaf nodes in the tree.
913  * \param list The configured resource list from which the child node is being built.
914  * \param parent The parent node for these children.
915  * \param visited The resources that have already been visited.
916  */
917 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
918                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
919 {
920         int i;
921
922         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
923                 struct tree_node *current;
924                 struct resource_list *child_list;
925                 const char *resource = AST_VECTOR_GET(&list->items, i);
926
927                 if (have_visited(resource, visited)) {
928                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
929                         continue;
930                 }
931
932                 child_list = retrieve_resource_list(resource, list->event);
933                 if (!child_list) {
934                         int resp = handler->notifier->new_subscribe(endpoint, resource);
935                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
936                                 current = tree_node_alloc(resource, visited, 0);
937                                 if (!current) {
938                                         ast_debug(1,
939                                                 "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n",
940                                                 resource);
941                                         continue;
942                                 }
943                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
944                                                 resource, parent->resource);
945                                 if (AST_VECTOR_APPEND(&parent->children, current)) {
946                                         tree_node_destroy(current);
947                                 }
948                         } else {
949                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
950                                                 resource, resp);
951                         }
952                 } else {
953                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
954                         current = tree_node_alloc(resource, visited, child_list->full_state);
955                         if (!current) {
956                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
957                                 continue;
958                         }
959                         build_node_children(endpoint, handler, child_list, current, visited);
960                         if (AST_VECTOR_SIZE(&current->children) > 0) {
961                                 ast_debug(1, "List %s had no successful children.\n", resource);
962                                 if (AST_VECTOR_APPEND(&parent->children, current)) {
963                                         tree_node_destroy(current);
964                                 }
965                         } else {
966                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
967                                                 resource, parent->resource);
968                                 tree_node_destroy(current);
969                         }
970                         ao2_cleanup(child_list);
971                 }
972         }
973 }
974
975 /*!
976  * \brief A resource tree
977  *
978  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
979  * be a resource list. If this is the case, the resource list may contain resources
980  * that are themselves lists. The structure needed to hold the resources is
981  * a tree.
982  *
983  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
984  * to the individual resources in the tree would be successful or not. Any successful
985  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
986  * result in no node being created.
987  *
988  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
989  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
990  */
991 struct resource_tree {
992         struct tree_node *root;
993         unsigned int notification_batch_interval;
994 };
995
996 /*!
997  * \brief Destroy a resource tree.
998  *
999  * This function makes no assumptions about how the tree itself was
1000  * allocated and does not attempt to free the tree itself. Callers
1001  * of this function are responsible for freeing the tree.
1002  *
1003  * \param tree The tree to destroy.
1004  */
1005 static void resource_tree_destroy(struct resource_tree *tree)
1006 {
1007         if (tree) {
1008                 tree_node_destroy(tree->root);
1009         }
1010 }
1011
1012 /*!
1013  * \brief Build a resource tree
1014  *
1015  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
1016  *
1017  * This function also creates a container that has all resources that have been visited during
1018  * creation of the tree, whether those resources resulted in a tree node being created or not.
1019  * Keeping this container of visited resources allows for misconfigurations such as loops in
1020  * the tree or duplicated resources to be detected.
1021  *
1022  * \param endpoint The endpoint that sent the SUBSCRIBE request.
1023  * \param handler The subscription handler for leaf nodes in the tree.
1024  * \param resource The resource requested in the SUBSCRIBE request.
1025  * \param tree The tree that is to be built.
1026  * \param has_eventlist_support
1027  *
1028  * \retval 200-299 Successfully subscribed to at least one resource.
1029  * \retval 300-699 Failure to subscribe to requested resource.
1030  */
1031 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
1032                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
1033 {
1034         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
1035         struct resources visited;
1036
1037         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
1038                 ast_debug(2, "Subscription '%s->%s' is not to a list\n",
1039                         ast_sorcery_object_get_id(endpoint), resource);
1040                 tree->root = tree_node_alloc(resource, NULL, 0);
1041                 if (!tree->root) {
1042                         return 500;
1043                 }
1044                 return handler->notifier->new_subscribe(endpoint, resource);
1045         }
1046
1047         ast_debug(2, "Subscription '%s->%s' is a list\n",
1048                 ast_sorcery_object_get_id(endpoint), resource);
1049         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
1050                 return 500;
1051         }
1052
1053         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1054         if (!tree->root) {
1055                 AST_VECTOR_FREE(&visited);
1056                 return 500;
1057         }
1058
1059         tree->notification_batch_interval = list->notification_batch_interval;
1060
1061         build_node_children(endpoint, handler, list, tree->root, &visited);
1062         AST_VECTOR_FREE(&visited);
1063
1064         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1065                 return 200;
1066         } else {
1067                 return 500;
1068         }
1069 }
1070
1071 static void add_subscription(struct sip_subscription_tree *obj)
1072 {
1073         AST_RWLIST_WRLOCK(&subscriptions);
1074         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1075         AST_RWLIST_UNLOCK(&subscriptions);
1076 }
1077
1078 static void remove_subscription(struct sip_subscription_tree *obj)
1079 {
1080         struct sip_subscription_tree *i;
1081
1082         AST_RWLIST_WRLOCK(&subscriptions);
1083         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1084                 if (i == obj) {
1085                         AST_RWLIST_REMOVE_CURRENT(next);
1086                         if (i->root) {
1087                                 ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n",
1088                                         ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root));
1089                         }
1090                         break;
1091                 }
1092         }
1093         AST_RWLIST_TRAVERSE_SAFE_END;
1094         AST_RWLIST_UNLOCK(&subscriptions);
1095 }
1096
1097 static void destroy_subscription(struct ast_sip_subscription *sub)
1098 {
1099         ast_debug(3, "Destroying SIP subscription from '%s->%s'\n",
1100                 sub->tree && sub->tree->endpoint ? ast_sorcery_object_get_id(sub->tree->endpoint) : "Unknown",
1101                 sub->resource);
1102
1103         ast_free(sub->body_text);
1104
1105         AST_VECTOR_FREE(&sub->children);
1106         ao2_cleanup(sub->datastores);
1107         ast_free(sub);
1108 }
1109
1110 static void destroy_subscriptions(struct ast_sip_subscription *root)
1111 {
1112         int i;
1113
1114         if (!root) {
1115                 return;
1116         }
1117
1118         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1119                 struct ast_sip_subscription *child;
1120
1121                 child = AST_VECTOR_GET(&root->children, i);
1122                 destroy_subscriptions(child);
1123         }
1124
1125         destroy_subscription(root);
1126 }
1127
1128 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1129                 const char *resource, struct sip_subscription_tree *tree)
1130 {
1131         struct ast_sip_subscription *sub;
1132         pjsip_sip_uri *contact_uri;
1133
1134         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1135         if (!sub) {
1136                 return NULL;
1137         }
1138         strcpy(sub->resource, resource); /* Safe */
1139
1140         sub->datastores = ast_datastores_alloc();
1141         if (!sub->datastores) {
1142                 destroy_subscription(sub);
1143                 return NULL;
1144         }
1145
1146         sub->body_text = ast_str_create(128);
1147         if (!sub->body_text) {
1148                 destroy_subscription(sub);
1149                 return NULL;
1150         }
1151
1152         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1153         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1154         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1155         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1156
1157         sub->handler = handler;
1158         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1159         sub->tree = ao2_bump(tree);
1160
1161         return sub;
1162 }
1163
1164 /*!
1165  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1166  *
1167  * \param handler The handler to supply to leaf subscriptions.
1168  * \param resource The requested resource for this subscription.
1169  * \param generator Body generator to use for leaf subscriptions.
1170  * \param tree The root of the subscription tree.
1171  * \param current The tree node that corresponds to the subscription being created.
1172  */
1173 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1174                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1175                 struct sip_subscription_tree *tree, struct tree_node *current)
1176 {
1177         int i;
1178         struct ast_sip_subscription *sub;
1179
1180         sub = allocate_subscription(handler, resource, tree);
1181         if (!sub) {
1182                 return NULL;
1183         }
1184
1185         sub->full_state = current->full_state;
1186         sub->body_generator = generator;
1187         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1188
1189         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1190                 struct ast_sip_subscription *child;
1191                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1192
1193                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1194                                 tree, child_node);
1195
1196                 if (!child) {
1197                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1198                                         child_node->resource);
1199                         continue;
1200                 }
1201
1202                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1203                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1204                                         child_node->resource);
1205                         destroy_subscription(child);
1206                         /* Have to release tree here too because a ref was added
1207                          * to child that destroy_subscription() doesn't release. */
1208                         ao2_cleanup(tree);
1209                 }
1210         }
1211
1212         return sub;
1213 }
1214
1215 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1216 {
1217         int i;
1218
1219         if (!sub) {
1220                 return;
1221         }
1222
1223         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1224                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1225                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1226                 }
1227                 return;
1228         }
1229
1230         /* We notify subscription shutdown only on the tree leaves. */
1231         if (sub->handler->subscription_shutdown) {
1232                 sub->handler->subscription_shutdown(sub);
1233         }
1234 }
1235 static int subscription_unreference_dialog(void *obj)
1236 {
1237         struct sip_subscription_tree *sub_tree = obj;
1238
1239         /* This is why we keep the dialog on the subscription. When the subscription
1240          * is destroyed, there is no guarantee that the underlying dialog is ready
1241          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1242          * either. The dialog could be destroyed before our subscription is. We fix
1243          * this problem by keeping a reference to the dialog until it is time to
1244          * destroy the subscription. We need to have the dialog available when the
1245          * subscription is destroyed so that we can guarantee that our attempt to
1246          * remove the serializer will be successful.
1247          */
1248         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1249         sub_tree->dlg = NULL;
1250
1251         return 0;
1252 }
1253
1254 static void subscription_tree_destructor(void *obj)
1255 {
1256         struct sip_subscription_tree *sub_tree = obj;
1257
1258         ast_debug(3, "Destroying subscription tree %p '%s->%s'\n",
1259                 sub_tree,
1260                 sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
1261                 sub_tree->root ? sub_tree->root->resource : "Unknown");
1262
1263         destroy_subscriptions(sub_tree->root);
1264
1265         if (sub_tree->dlg) {
1266                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1267         }
1268
1269         ao2_cleanup(sub_tree->endpoint);
1270
1271         ast_taskprocessor_unreference(sub_tree->serializer);
1272         ast_module_unref(ast_module_info->self);
1273 }
1274
1275 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1276 {
1277         ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n",
1278                 sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree);
1279         ao2_cleanup(sub->tree);
1280 }
1281
1282 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1283 {
1284         sub_tree->dlg = dlg;
1285         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1286         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1287         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1288         pjsip_dlg_inc_session(dlg, &pubsub_module);
1289 }
1290
1291 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1292 {
1293         struct sip_subscription_tree *sub_tree;
1294
1295         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1296         if (!sub_tree) {
1297                 return NULL;
1298         }
1299
1300         ast_module_ref(ast_module_info->self);
1301
1302         if (rdata) {
1303                 /*
1304                  * We must continue using the serializer that the original
1305                  * SUBSCRIBE came in on for the dialog.  There may be
1306                  * retransmissions already enqueued in the original
1307                  * serializer that can result in reentrancy and message
1308                  * sequencing problems.
1309                  */
1310                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1311         } else {
1312                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1313
1314                 /* Create name with seq number appended. */
1315                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1316                         ast_sorcery_object_get_id(endpoint));
1317
1318                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1319         }
1320         if (!sub_tree->serializer) {
1321                 ao2_ref(sub_tree, -1);
1322                 return NULL;
1323         }
1324
1325         sub_tree->endpoint = ao2_bump(endpoint);
1326         sub_tree->notify_sched_id = -1;
1327
1328         return sub_tree;
1329 }
1330
1331 /*!
1332  * \brief Create a subscription tree based on a resource tree.
1333  *
1334  * Using the previously-determined valid resources in the provided resource tree,
1335  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1336  * subscription tree is a real subscription, and the rest in the tree are
1337  * virtual subscriptions.
1338  *
1339  * \param handler The handler to use for leaf subscriptions
1340  * \param endpoint The endpoint that sent the SUBSCRIBE request
1341  * \param rdata The SUBSCRIBE content
1342  * \param resource The requested resource in the SUBSCRIBE request
1343  * \param generator The body generator to use in leaf subscriptions
1344  * \param tree The resource tree on which the subscription tree is based
1345  * \param dlg_status[out] The result of attempting to create a dialog.
1346  *
1347  * \retval NULL Could not create the subscription tree
1348  * \retval non-NULL The root of the created subscription tree
1349  */
1350
1351 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1352                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1353                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1354                 pj_status_t *dlg_status)
1355 {
1356         struct sip_subscription_tree *sub_tree;
1357         pjsip_dialog *dlg;
1358         struct subscription_persistence *persistence;
1359
1360         sub_tree = allocate_subscription_tree(endpoint, rdata);
1361         if (!sub_tree) {
1362                 *dlg_status = PJ_ENOMEM;
1363                 return NULL;
1364         }
1365         sub_tree->role = AST_SIP_NOTIFIER;
1366
1367         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1368         if (!dlg) {
1369                 if (*dlg_status != PJ_EEXISTS) {
1370                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1371                 }
1372                 ao2_ref(sub_tree, -1);
1373                 return NULL;
1374         }
1375
1376         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1377                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1378         if (persistence) {
1379                 /* Update the created dialog with the persisted information */
1380                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1381                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1382                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1383                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1384                 dlg->local.cseq = persistence->cseq;
1385         }
1386
1387         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1388         subscription_setup_dialog(sub_tree, dlg);
1389
1390 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1391         pjsip_evsub_add_ref(sub_tree->evsub);
1392 #endif
1393
1394         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1395                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1396
1397         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1398
1399         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1400         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1401                 sub_tree->is_list = 1;
1402         }
1403
1404         add_subscription(sub_tree);
1405
1406         return sub_tree;
1407 }
1408
1409 /*! Wrapper structure for initial_notify_task */
1410 struct initial_notify_data {
1411         struct sip_subscription_tree *sub_tree;
1412         int expires;
1413 };
1414
1415 static int initial_notify_task(void *obj);
1416 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1417
1418 /*! Persistent subscription recreation continuation under distributor serializer data */
1419 struct persistence_recreate_data {
1420         struct subscription_persistence *persistence;
1421         pjsip_rx_data *rdata;
1422 };
1423
1424 /*!
1425  * \internal
1426  * \brief subscription_persistence_recreate continuation under distributor serializer.
1427  * \since 13.10.0
1428  *
1429  * \retval 0 on success.
1430  * \retval -1 on error.
1431  */
1432 static int sub_persistence_recreate(void *obj)
1433 {
1434         struct persistence_recreate_data *recreate_data = obj;
1435         struct subscription_persistence *persistence = recreate_data->persistence;
1436         pjsip_rx_data *rdata = recreate_data->rdata;
1437         struct ast_sip_endpoint *endpoint;
1438         struct sip_subscription_tree *sub_tree;
1439         struct ast_sip_pubsub_body_generator *generator;
1440         struct ast_sip_subscription_handler *handler;
1441         char *resource;
1442         pjsip_sip_uri *request_uri;
1443         size_t resource_size;
1444         int resp;
1445         struct resource_tree tree;
1446         pjsip_expires_hdr *expires_header;
1447
1448         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1449         resource_size = pj_strlen(&request_uri->user) + 1;
1450         resource = ast_alloca(resource_size);
1451         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1452
1453         /*
1454          * We may want to match without any user options getting
1455          * in the way.
1456          */
1457         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
1458
1459         handler = subscription_get_handler_from_rdata(rdata);
1460         if (!handler || !handler->notifier) {
1461                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1462                         persistence->endpoint);
1463                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1464                 return 0;
1465         }
1466
1467         generator = subscription_get_generator_from_rdata(rdata, handler);
1468         if (!generator) {
1469                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1470                         persistence->endpoint);
1471                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1472                 return 0;
1473         }
1474
1475         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1476                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1477
1478         /* Getting the endpoint may take some time that can affect the expiration. */
1479         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1480                 persistence->endpoint);
1481         if (!endpoint) {
1482                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1483                         persistence->endpoint);
1484                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1485                 return 0;
1486         }
1487
1488         /* Update the expiration header with the new expiration */
1489         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1490                 rdata->msg_info.msg->hdr.next);
1491         if (!expires_header) {
1492                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1493                 if (!expires_header) {
1494                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1495                                 persistence->endpoint);
1496                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1497                         ao2_ref(endpoint, -1);
1498                         return 0;
1499                 }
1500                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1501         }
1502
1503         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1504         if (expires_header->ivalue <= 0) {
1505                 /* The subscription expired since we started recreating the subscription. */
1506                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1507                         persistence->endpoint, persistence->tag);
1508                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1509                 ao2_ref(endpoint, -1);
1510                 return 0;
1511         }
1512
1513         memset(&tree, 0, sizeof(tree));
1514         resp = build_resource_tree(endpoint, handler, resource, &tree,
1515                 ast_sip_pubsub_has_eventlist_support(rdata));
1516         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1517                 pj_status_t dlg_status;
1518
1519                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1520                         &tree, &dlg_status);
1521                 if (!sub_tree) {
1522                         if (dlg_status != PJ_EEXISTS) {
1523                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1524                                         persistence->endpoint);
1525                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1526                         }
1527                 } else {
1528                         struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
1529
1530                         if (!ind) {
1531                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1532                                 goto error;
1533                         }
1534
1535                         ind->sub_tree = ao2_bump(sub_tree);
1536                         ind->expires = expires_header->ivalue;
1537
1538                         sub_tree->persistence = ao2_bump(persistence);
1539                         subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED);
1540                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
1541                                 /* Could not send initial subscribe NOTIFY */
1542                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1543                                 ao2_ref(sub_tree, -1);
1544                                 ast_free(ind);
1545                         }
1546                 }
1547         } else {
1548                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1549         }
1550
1551 error:
1552         resource_tree_destroy(&tree);
1553         ao2_ref(endpoint, -1);
1554
1555         return 0;
1556 }
1557
1558 /*! \brief Callback function to perform the actual recreation of a subscription */
1559 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1560 {
1561         struct subscription_persistence *persistence = obj;
1562         pj_pool_t *pool = arg;
1563         struct ast_taskprocessor *serializer;
1564         pjsip_rx_data rdata;
1565         struct persistence_recreate_data recreate_data;
1566
1567         /* If this subscription has already expired remove it */
1568         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1569                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1570                         persistence->endpoint, persistence->tag);
1571                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1572                 return 0;
1573         }
1574
1575         memset(&rdata, 0, sizeof(rdata));
1576         pj_pool_reset(pool);
1577         rdata.tp_info.pool = pool;
1578
1579         if (ast_sip_create_rdata_with_contact(&rdata, persistence->packet, persistence->src_name,
1580                 persistence->src_port, persistence->transport_key, persistence->local_name,
1581                 persistence->local_port, persistence->contact_uri)) {
1582                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1583                         persistence->endpoint);
1584                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1585                 return 0;
1586         }
1587
1588         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1589                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1590                         persistence->endpoint);
1591                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1592                 return 0;
1593         }
1594
1595         /* Continue the remainder in the distributor serializer */
1596         serializer = ast_sip_get_distributor_serializer(&rdata);
1597         if (!serializer) {
1598                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1599                         persistence->endpoint);
1600                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1601                 return 0;
1602         }
1603         recreate_data.persistence = persistence;
1604         recreate_data.rdata = &rdata;
1605         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1606                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1607                         persistence->endpoint);
1608                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1609         }
1610         ast_taskprocessor_unreference(serializer);
1611
1612         return 0;
1613 }
1614
1615 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1616 static int subscription_persistence_load(void *data)
1617 {
1618         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1619                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1620         pj_pool_t *pool;
1621
1622         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1623                 PJSIP_POOL_RDATA_INC);
1624         if (!pool) {
1625                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1626                 return 0;
1627         }
1628
1629         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1630
1631         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1632
1633         ao2_ref(persisted_subscriptions, -1);
1634         return 0;
1635 }
1636
1637 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1638 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1639 {
1640         struct ast_json_payload *payload;
1641         const char *type;
1642
1643         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1644                 return;
1645         }
1646
1647         payload = stasis_message_data(message);
1648         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1649
1650         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1651          * recreate SIP subscriptions.
1652          */
1653         if (strcmp(type, "FullyBooted")) {
1654                 return;
1655         }
1656
1657         /* This has to be here so the subscription is recreated when the body generator is available */
1658         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1659
1660         /* Once the system is fully booted we don't care anymore */
1661         stasis_unsubscribe(sub);
1662 }
1663
1664 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1665
1666 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1667 {
1668         int num = 0;
1669         struct sip_subscription_tree *i;
1670
1671         if (!on_subscription) {
1672                 return num;
1673         }
1674
1675         AST_RWLIST_RDLOCK(&subscriptions);
1676         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1677                 if (on_subscription(i, arg)) {
1678                         break;
1679                 }
1680                 ++num;
1681         }
1682         AST_RWLIST_UNLOCK(&subscriptions);
1683         return num;
1684 }
1685
1686 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1687                                     struct ast_str **buf)
1688 {
1689         char str[256];
1690         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1691
1692         ast_str_append(buf, 0, "Role: %s\r\n",
1693                        sip_subscription_roles_map[sub_tree->role]);
1694         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1695                        ast_sorcery_object_get_id(sub_tree->endpoint));
1696
1697         if (sub_tree->dlg) {
1698                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1699         } else {
1700                 ast_copy_string(str, "<unknown>", sizeof(str));
1701         }
1702         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1703
1704         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1705
1706         ast_callerid_merge(str, sizeof(str),
1707                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1708                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1709                            "Unknown");
1710
1711         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1712
1713         /* XXX This needs to be done recursively for lists */
1714         if (sub_tree->root->handler->to_ami) {
1715                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1716         }
1717 }
1718
1719
1720 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1721 {
1722         pjsip_dialog *dlg;
1723         pjsip_msg *msg;
1724         pj_str_t name;
1725
1726         dlg = sub->tree->dlg;
1727         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1728         pj_cstr(&name, header);
1729
1730         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1731 }
1732
1733 /* XXX This function is not used. */
1734 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1735                 struct ast_sip_endpoint *endpoint, const char *resource)
1736 {
1737         struct ast_sip_subscription *sub;
1738         pjsip_dialog *dlg;
1739         struct ast_sip_contact *contact;
1740         pj_str_t event;
1741         pjsip_tx_data *tdata;
1742         pjsip_evsub *evsub;
1743         struct sip_subscription_tree *sub_tree = NULL;
1744
1745         sub_tree = allocate_subscription_tree(endpoint, NULL);
1746         if (!sub_tree) {
1747                 return NULL;
1748         }
1749
1750         sub = allocate_subscription(handler, resource, sub_tree);
1751         if (!sub) {
1752                 ao2_cleanup(sub_tree);
1753                 return NULL;
1754         }
1755
1756         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1757         if (!contact || ast_strlen_zero(contact->uri)) {
1758                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1759                                 ast_sorcery_object_get_id(endpoint));
1760                 ao2_ref(sub_tree, -1);
1761                 ao2_cleanup(contact);
1762                 return NULL;
1763         }
1764
1765         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1766         ao2_cleanup(contact);
1767         if (!dlg) {
1768                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1769                 ao2_ref(sub_tree, -1);
1770                 return NULL;
1771         }
1772
1773         pj_cstr(&event, handler->event_name);
1774         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1775         subscription_setup_dialog(sub_tree, dlg);
1776
1777         evsub = sub_tree->evsub;
1778
1779         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1780                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
1781         } else {
1782                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1783                  * being called and terminating the subscription. Therefore, we don't
1784                  * need to decrease the reference count of sub here.
1785                  */
1786                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1787                 ao2_ref(sub_tree, -1);
1788                 return NULL;
1789         }
1790
1791         add_subscription(sub_tree);
1792
1793         return sub;
1794 }
1795
1796 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1797 {
1798         ast_assert(sub->tree->dlg != NULL);
1799         return sub->tree->dlg;
1800 }
1801
1802 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1803 {
1804         ast_assert(sub->tree->endpoint != NULL);
1805         return ao2_bump(sub->tree->endpoint);
1806 }
1807
1808 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1809 {
1810         ast_assert(sub->tree->serializer != NULL);
1811         return sub->tree->serializer;
1812 }
1813
1814 /*!
1815  * \brief Pre-allocate a buffer for the transmission
1816  *
1817  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1818  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1819  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1820  * packet, then we get told the message is too long to be sent.
1821  *
1822  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1823  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1824  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1825  * if the message will fit, and resizing the buffer as required.
1826  *
1827  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1828  * it at 64000 for a couple of reasons:
1829  * 1) Allocating more than 64K at a time is hard to justify
1830  * 2) If the message goes through proxies, those proxies will want to add Via and
1831  *    Record-Route headers, making the message even larger. Giving some space for
1832  *    those headers is a nice thing to do.
1833  *
1834  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1835  * going to impose the same 64K limit as a memory savings.
1836  *
1837  * \param tdata The tdata onto which to allocate a buffer
1838  * \retval 0 Success
1839  * \retval -1 The message is too large
1840  */
1841 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1842 {
1843         int buf_size;
1844         int size = -1;
1845         char *buf;
1846
1847         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1848                 buf = pj_pool_alloc(tdata->pool, buf_size);
1849                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1850         }
1851
1852         if (size == -1) {
1853                 return -1;
1854         }
1855
1856         tdata->buf.start = buf;
1857         tdata->buf.cur = tdata->buf.start;
1858         tdata->buf.end = tdata->buf.start + buf_size;
1859
1860         return 0;
1861 }
1862
1863 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1864 {
1865 #ifdef TEST_FRAMEWORK
1866         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1867         pjsip_evsub *evsub = sub_tree->evsub;
1868 #endif
1869         int res;
1870
1871         if (allocate_tdata_buffer(tdata)) {
1872                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1873                 pjsip_tx_data_dec_ref(tdata);
1874                 return -1;
1875         }
1876
1877         res = pjsip_evsub_send_request(sub_tree->evsub, tdata);
1878
1879         subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST);
1880
1881         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1882                 "StateText: %s\r\n"
1883                 "Endpoint: %s\r\n",
1884                 pjsip_evsub_get_state_name(evsub),
1885                 ast_sorcery_object_get_id(endpoint));
1886
1887         return (res == PJ_SUCCESS ? 0 : -1);
1888 }
1889
1890 /*!
1891  * \brief Add a resource XML element to an RLMI body
1892  *
1893  * Each resource element represents a subscribed resource in the list. This function currently
1894  * will unconditionally add an instance element to each created resource element. Instance
1895  * elements refer to later parts in the multipart body.
1896  *
1897  * \param pool PJLIB allocation pool
1898  * \param cid Content-ID header of the resource
1899  * \param resource_name Name of the resource
1900  * \param resource_uri URI of the resource
1901  * \param state State of the subscribed resource
1902  */
1903 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1904                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1905 {
1906         static pj_str_t cid_name = { "cid", 3 };
1907         pj_xml_node *resource;
1908         pj_xml_node *name;
1909         pj_xml_node *instance;
1910         pj_xml_attr *cid_attr;
1911         char id[6];
1912         char uri[PJSIP_MAX_URL_SIZE];
1913
1914         /* This creates a string representing the Content-ID without the enclosing < > */
1915         const pj_str_t cid_stripped = {
1916                 .ptr = cid->hvalue.ptr + 1,
1917                 .slen = cid->hvalue.slen - 2,
1918         };
1919
1920         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1921         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1922         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1923
1924         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1925         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1926
1927         pj_strdup2(pool, &name->content, resource_name);
1928
1929         ast_generate_random_string(id, sizeof(id));
1930
1931         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1932         ast_sip_presence_xml_create_attr(pool, instance, "state",
1933                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1934
1935         /* Use the PJLIB-util XML library directly here since we are using a
1936          * pj_str_t
1937          */
1938
1939         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1940         pj_xml_add_attr(instance, cid_attr);
1941 }
1942
1943 /*!
1944  * \brief A multipart body part and meta-information
1945  *
1946  * When creating a multipart body part, the end result (the
1947  * pjsip_multipart_part) is hard to inspect without undoing
1948  * a lot of what was done to create it. Therefore, we use this
1949  * structure to store meta-information about the body part.
1950  *
1951  * The main consumer of this is the creator of the RLMI body
1952  * part of a multipart resource list body.
1953  */
1954 struct body_part {
1955         /*! Content-ID header for the body part */
1956         pjsip_generic_string_hdr *cid;
1957         /*! Subscribed resource represented in the body part */
1958         const char *resource;
1959         /*! URI for the subscribed body part */
1960         pjsip_sip_uri *uri;
1961         /*! Subscription state of the resource represented in the body part */
1962         pjsip_evsub_state state;
1963         /*! The actual body part that will be present in the multipart body */
1964         pjsip_multipart_part *part;
1965 };
1966
1967 /*!
1968  * \brief Type declaration for container of body part structures
1969  */
1970 AST_VECTOR(body_part_list, struct body_part *);
1971
1972 /*!
1973  * \brief Create a Content-ID header
1974  *
1975  * Content-ID headers are required by RFC2387 for multipart/related
1976  * bodies. They serve as identifiers for each part of the multipart body.
1977  *
1978  * \param pool PJLIB allocation pool
1979  * \param sub Subscription to a resource
1980  */
1981 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1982                 const struct ast_sip_subscription *sub)
1983 {
1984         static const pj_str_t cid_name = { "Content-ID", 10 };
1985         pjsip_generic_string_hdr *cid;
1986         char id[6];
1987         size_t alloc_size;
1988         pj_str_t cid_value;
1989
1990         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1991         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1992         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1993         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1994                         ast_generate_random_string(id, sizeof(id)),
1995                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1996         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1997
1998         return cid;
1999 }
2000
2001 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
2002 {
2003         int num_printed;
2004         pj_xml_node *rlmi = msg_body->data;
2005
2006         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
2007         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
2008                 return -1;
2009         }
2010
2011         return num_printed;
2012 }
2013
2014 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
2015 {
2016         const pj_xml_node *rlmi = data;
2017
2018         return pj_xml_clone(pool, rlmi);
2019 }
2020
2021 /*!
2022  * \brief Create an RLMI body part for a multipart resource list body
2023  *
2024  * RLMI (Resource list meta information) is a special body type that lists
2025  * the subscribed resources and tells subscribers the number of subscribed
2026  * resources and what other body parts are in the multipart body. The
2027  * RLMI body also has a version number that a subscriber can use to ensure
2028  * that the locally-stored state corresponds to server state.
2029  *
2030  * \param pool The allocation pool
2031  * \param sub The subscription representing the subscribed resource list
2032  * \param body_parts A container of body parts that RLMI will refer to
2033  * \param full_state Indicates whether this is a full or partial state notification
2034  * \return The multipart part representing the RLMI body
2035  */
2036 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2037                 struct body_part_list *body_parts, unsigned int full_state)
2038 {
2039         pj_xml_node *rlmi;
2040         pj_xml_node *name;
2041         pjsip_multipart_part *rlmi_part;
2042         char version_str[32];
2043         char uri[PJSIP_MAX_URL_SIZE];
2044         pjsip_generic_string_hdr *cid;
2045         int i;
2046
2047         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
2048         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
2049
2050         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
2051         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
2052
2053         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
2054         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
2055         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
2056
2057         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
2058         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
2059
2060         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
2061                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
2062
2063                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
2064         }
2065
2066         rlmi_part = pjsip_multipart_create_part(pool);
2067
2068         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
2069         pjsip_media_type_cp(pool, &rlmi_part->body->content_type, &rlmi_media_type);
2070
2071         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2072         rlmi_part->body->clone_data = rlmi_clone_data;
2073         rlmi_part->body->print_body = rlmi_print_body;
2074
2075         cid = generate_content_id_hdr(pool, sub);
2076         pj_list_insert_before(&rlmi_part->hdr, cid);
2077
2078         return rlmi_part;
2079 }
2080
2081 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2082                 unsigned int force_full_state);
2083
2084 /*!
2085  * \brief Destroy a list of body parts
2086  *
2087  * \param parts The container of parts to destroy
2088  */
2089 static void free_body_parts(struct body_part_list *parts)
2090 {
2091         int i;
2092
2093         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2094                 struct body_part *part = AST_VECTOR_GET(parts, i);
2095                 ast_free(part);
2096         }
2097
2098         AST_VECTOR_FREE(parts);
2099 }
2100
2101 /*!
2102  * \brief Allocate and initialize a body part structure
2103  *
2104  * \param pool PJLIB allocation pool
2105  * \param sub Subscription representing a subscribed resource
2106  */
2107 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2108 {
2109         struct body_part *bp;
2110
2111         bp = ast_calloc(1, sizeof(*bp));
2112         if (!bp) {
2113                 return NULL;
2114         }
2115
2116         bp->cid = generate_content_id_hdr(pool, sub);
2117         bp->resource = sub->resource;
2118         bp->state = sub->subscription_state;
2119         bp->uri = sub->uri;
2120
2121         return bp;
2122 }
2123
2124 /*!
2125  * \brief Create a multipart body part for a subscribed resource
2126  *
2127  * \param pool PJLIB allocation pool
2128  * \param sub The subscription representing a subscribed resource
2129  * \param parts A vector of parts to append the created part to.
2130  * \param use_full_state Unused locally, but may be passed to other functions
2131  */
2132 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2133                 struct body_part_list *parts, unsigned int use_full_state)
2134 {
2135         struct body_part *bp;
2136         pjsip_msg_body *body;
2137
2138         bp = allocate_body_part(pool, sub);
2139         if (!bp) {
2140                 return;
2141         }
2142
2143         body = generate_notify_body(pool, sub, use_full_state);
2144         if (!body) {
2145                 /* Partial state was requested and the resource has not changed state */
2146                 ast_free(bp);
2147                 return;
2148         }
2149
2150         bp->part = pjsip_multipart_create_part(pool);
2151         bp->part->body = body;
2152         pj_list_insert_before(&bp->part->hdr, bp->cid);
2153
2154         if (AST_VECTOR_APPEND(parts, bp)) {
2155                 ast_free(bp);
2156         }
2157 }
2158
2159 /*!
2160  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2161  *
2162  * \param pool
2163  * \return The multipart message body
2164  */
2165 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2166 {
2167         pjsip_media_type media_type;
2168         pjsip_param *media_type_param;
2169         char boundary[6];
2170         pj_str_t pj_boundary;
2171
2172         pjsip_media_type_init2(&media_type, "multipart", "related");
2173
2174         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2175         pj_list_init(media_type_param);
2176
2177         pj_strdup2(pool, &media_type_param->name, "type");
2178         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2179
2180         pj_list_insert_before(&media_type.param, media_type_param);
2181
2182         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2183         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2184 }
2185
2186 /*!
2187  * \brief Create a resource list body for NOTIFY requests
2188  *
2189  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2190  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2191  * convey state of individual subscribed resources.
2192  *
2193  * \param pool PJLIB allocation pool
2194  * \param sub Subscription details from which to generate body
2195  * \param force_full_state If true, ignore resource list settings and send a full state notification
2196  * \return The generated multipart/related body
2197  */
2198 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2199                 unsigned int force_full_state)
2200 {
2201         int i;
2202         pjsip_multipart_part *rlmi_part;
2203         pjsip_msg_body *multipart;
2204         struct body_part_list body_parts;
2205         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2206
2207         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2208                 return NULL;
2209         }
2210
2211         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2212                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2213         }
2214
2215         /* This can happen if issuing partial state and no children of the list have changed state */
2216         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2217                 free_body_parts(&body_parts);
2218                 return NULL;
2219         }
2220
2221         multipart = create_multipart_body(pool);
2222
2223         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2224         if (!rlmi_part) {
2225                 free_body_parts(&body_parts);
2226                 return NULL;
2227         }
2228         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2229
2230         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2231                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2232         }
2233
2234         free_body_parts(&body_parts);
2235         return multipart;
2236 }
2237
2238 /*!
2239  * \brief Create the body for a NOTIFY request.
2240  *
2241  * \param pool The pool used for allocations
2242  * \param root The root of the subscription tree
2243  * \param force_full_state If true, ignore resource list settings and send a full state notification
2244  */
2245 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2246                 unsigned int force_full_state)
2247 {
2248         pjsip_msg_body *body;
2249
2250         if (AST_VECTOR_SIZE(&root->children) == 0) {
2251                 if (force_full_state || root->body_changed) {
2252                         /* Not a list. We've already generated the body and saved it on the subscription.
2253                          * Use that directly.
2254                          */
2255                         pj_str_t type;
2256                         pj_str_t subtype;
2257                         pj_str_t text;
2258
2259                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2260                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2261                         pj_cstr(&text, ast_str_buffer(root->body_text));
2262
2263                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2264                         root->body_changed = 0;
2265                 } else {
2266                         body = NULL;
2267                 }
2268         } else {
2269                 body = generate_list_body(pool, root, force_full_state);
2270         }
2271
2272         return body;
2273 }
2274
2275 /*!
2276  * \brief Shortcut method to create a Require: eventlist header
2277  */
2278 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2279 {
2280         pjsip_require_hdr *require;
2281
2282         require = pjsip_require_hdr_create(pool);
2283         pj_strdup2(pool, &require->values[0], "eventlist");
2284         require->count = 1;
2285
2286         return require;
2287 }
2288
2289 /*!
2290  * \brief Send a NOTIFY request to a subscriber
2291  *
2292  * \pre sub_tree->dlg is locked
2293  *
2294  * \param sub_tree The subscription tree representing the subscription
2295  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2296  * \retval 0 Success
2297  * \retval non-zero Failure
2298  */
2299 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2300 {
2301         pjsip_evsub *evsub = sub_tree->evsub;
2302         pjsip_tx_data *tdata;
2303
2304         if (ast_shutdown_final()
2305                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2306                 && sub_tree->persistence) {
2307                 return 0;
2308         }
2309
2310         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2311                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2312                 return -1;
2313         }
2314
2315         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2316         if (!tdata->msg->body) {
2317                 pjsip_tx_data_dec_ref(tdata);
2318                 return -1;
2319         }
2320
2321         if (sub_tree->is_list) {
2322                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2323                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2324         }
2325
2326         if (sip_subscription_send_request(sub_tree, tdata)) {
2327                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2328                 return -1;
2329         }
2330
2331         sub_tree->send_scheduled_notify = 0;
2332
2333         return 0;
2334 }
2335
2336 static int serialized_send_notify(void *userdata)
2337 {
2338         struct sip_subscription_tree *sub_tree = userdata;
2339         pjsip_dialog *dlg = sub_tree->dlg;
2340
2341         pjsip_dlg_inc_lock(dlg);
2342
2343         /* It's possible that between when the notification was scheduled
2344          * and now a new SUBSCRIBE arrived requiring full state to be
2345          * sent out in an immediate NOTIFY. It's also possible that we're
2346          * already processing a terminate.  If that has happened, we need to
2347          * bail out here instead of sending the batched NOTIFY.
2348          */
2349
2350         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2351                 || !sub_tree->send_scheduled_notify) {
2352                 pjsip_dlg_dec_lock(dlg);
2353                 ao2_cleanup(sub_tree);
2354                 return 0;
2355         }
2356
2357         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2358                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2359         }
2360
2361         send_notify(sub_tree, 0);
2362
2363         ast_test_suite_event_notify(
2364                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2365                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2366                 "Resource: %s", sub_tree->root->resource);
2367
2368         sub_tree->notify_sched_id = -1;
2369         pjsip_dlg_dec_lock(dlg);
2370         ao2_cleanup(sub_tree);
2371         return 0;
2372 }
2373
2374 static int sched_cb(const void *data)
2375 {
2376         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2377
2378         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2379         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2380                 ao2_cleanup(sub_tree);
2381         }
2382
2383         return 0;
2384 }
2385
2386 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2387 {
2388         /* There's already a notification scheduled */
2389         if (sub_tree->notify_sched_id > -1) {
2390                 return 0;
2391         }
2392
2393         sub_tree->send_scheduled_notify = 1;
2394         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2395         if (sub_tree->notify_sched_id < 0) {
2396                 ao2_cleanup(sub_tree);
2397                 return -1;
2398         }
2399
2400         return 0;
2401 }
2402
2403 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2404                 int terminate)
2405 {
2406         int res;
2407         pjsip_dialog *dlg = sub->tree->dlg;
2408
2409         pjsip_dlg_inc_lock(dlg);
2410
2411         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2412                 pjsip_dlg_dec_lock(dlg);
2413                 return 0;
2414         }
2415
2416         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2417                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2418                 pjsip_dlg_dec_lock(dlg);
2419                 return -1;
2420         }
2421
2422         sub->body_changed = 1;
2423         if (terminate) {
2424                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2425                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2426         }
2427
2428         if (sub->tree->notification_batch_interval) {
2429                 res = schedule_notification(sub->tree);
2430         } else {
2431                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2432                 ao2_ref(sub->tree, +1);
2433                 if (terminate) {
2434                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2435                 }
2436                 res = send_notify(sub->tree, 0);
2437                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2438                                 "Resource: %s",
2439                                 sub->tree->root->resource);
2440                 ao2_ref(sub->tree, -1);
2441         }
2442
2443         pjsip_dlg_dec_lock(dlg);
2444         return res;
2445 }
2446
2447 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2448 {
2449         return sub->uri;
2450 }
2451
2452 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2453 {
2454         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2455 }
2456
2457 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2458 {
2459         pjsip_dialog *dlg;
2460         pjsip_sip_uri *uri;
2461
2462         dlg = sub->tree->dlg;
2463         uri = pjsip_uri_get_uri(dlg->remote.info->uri);
2464
2465         if (pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, uri, buf, size) < 0) {
2466                 *buf = '\0';
2467         }
2468 }
2469
2470 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2471 {
2472         return sub->resource;
2473 }
2474
2475 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2476 {
2477         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2478 }
2479
2480 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2481 {
2482         pjsip_hdr res_hdr;
2483
2484         /* If this is a persistence recreation the subscription has already been accepted */
2485         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2486                 return 0;
2487         }
2488
2489         pj_list_init(&res_hdr);
2490         if (sub_tree->is_list) {
2491                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2492                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2493         }
2494
2495         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2496 }
2497
2498 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2499 {
2500         return ast_datastores_alloc_datastore(info, uid);
2501 }
2502
2503 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2504 {
2505         return ast_datastores_add(subscription->datastores, datastore);
2506 }
2507
2508 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2509 {
2510         return ast_datastores_find(subscription->datastores, name);
2511 }
2512
2513 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2514 {
2515         ast_datastores_remove(subscription->datastores, name);
2516 }
2517
2518 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2519 {
2520         return subscription->datastores;
2521 }
2522
2523 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2524 {
2525         return ast_datastores_add(publication->datastores, datastore);
2526 }
2527
2528 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2529 {
2530         return ast_datastores_find(publication->datastores, name);
2531 }
2532
2533 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2534 {
2535         ast_datastores_remove(publication->datastores, name);
2536 }
2537
2538 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2539 {
2540         return publication->datastores;
2541 }
2542
2543 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2544
2545 static int publication_hash_fn(const void *obj, const int flags)
2546 {
2547         const struct ast_sip_publication *publication = obj;
2548         const int *entity_tag = obj;
2549
2550         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2551 }
2552
2553 static int publication_cmp_fn(void *obj, void *arg, int flags)
2554 {
2555         const struct ast_sip_publication *publication1 = obj;
2556         const struct ast_sip_publication *publication2 = arg;
2557         const int *entity_tag = arg;
2558
2559         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2560                 CMP_MATCH | CMP_STOP : 0);
2561 }
2562
2563 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2564 {
2565         AST_RWLIST_WRLOCK(&publish_handlers);
2566         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2567         AST_RWLIST_UNLOCK(&publish_handlers);
2568 }
2569
2570 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2571 {
2572         if (ast_strlen_zero(handler->event_name)) {
2573                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2574                 return -1;
2575         }
2576
2577         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2578                 publication_hash_fn, publication_cmp_fn))) {
2579                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2580                         handler->event_name);
2581                 return -1;
2582         }
2583
2584         publish_add_handler(handler);
2585
2586         ast_module_ref(ast_module_info->self);
2587
2588         return 0;
2589 }
2590
2591 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2592 {
2593         struct ast_sip_publish_handler *iter;
2594
2595         AST_RWLIST_WRLOCK(&publish_handlers);
2596         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2597                 if (handler == iter) {
2598                         AST_RWLIST_REMOVE_CURRENT(next);
2599                         ao2_cleanup(handler->publications);
2600                         ast_module_unref(ast_module_info->self);
2601                         break;
2602                 }
2603         }
2604         AST_RWLIST_TRAVERSE_SAFE_END;
2605         AST_RWLIST_UNLOCK(&publish_handlers);
2606 }
2607
2608 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2609
2610 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2611 {
2612         AST_RWLIST_WRLOCK(&subscription_handlers);
2613         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2614         ast_module_ref(ast_module_info->self);
2615         AST_RWLIST_UNLOCK(&subscription_handlers);
2616 }
2617
2618 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2619 {
2620         struct ast_sip_subscription_handler *iter;
2621
2622         AST_RWLIST_RDLOCK(&subscription_handlers);
2623         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2624                 if (!strcmp(iter->event_name, event_name)) {
2625                         break;
2626                 }
2627         }
2628         AST_RWLIST_UNLOCK(&subscription_handlers);
2629         return iter;
2630 }
2631
2632 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2633 {
2634         pj_str_t event;
2635         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2636         struct ast_sip_subscription_handler *existing;
2637         int i = 0;
2638
2639         if (ast_strlen_zero(handler->event_name)) {
2640                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2641                 return -1;
2642         }
2643
2644         existing = find_sub_handler_for_event_name(handler->event_name);
2645         if (existing) {
2646                 ast_log(LOG_ERROR,
2647                         "Unable to register subscription handler for event %s.  A handler is already registered\n",
2648                         handler->event_name);
2649                 return -1;
2650         }
2651
2652         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2653                 pj_cstr(&accept[i], handler->accept[i]);
2654         }
2655
2656         pj_cstr(&event, handler->event_name);
2657
2658         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2659
2660         sub_add_handler(handler);
2661
2662         return 0;
2663 }
2664
2665 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2666 {
2667         struct ast_sip_subscription_handler *iter;
2668
2669         AST_RWLIST_WRLOCK(&subscription_handlers);
2670         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2671                 if (handler == iter) {
2672                         AST_RWLIST_REMOVE_CURRENT(next);
2673                         ast_module_unref(ast_module_info->self);
2674                         break;
2675                 }
2676         }
2677         AST_RWLIST_TRAVERSE_SAFE_END;
2678         AST_RWLIST_UNLOCK(&subscription_handlers);
2679 }
2680
2681 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2682 {
2683         struct ast_sip_pubsub_body_generator *gen;
2684
2685         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2686                 if (!strcmp(gen->type, type)
2687                         && !strcmp(gen->subtype, subtype)) {
2688                         break;
2689                 }
2690         }
2691
2692         return gen;
2693 }
2694
2695 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2696 {
2697         struct ast_sip_pubsub_body_generator *gen;
2698
2699         AST_RWLIST_RDLOCK(&body_generators);
2700         gen = find_body_generator_type_subtype_nolock(type, subtype);
2701         AST_RWLIST_UNLOCK(&body_generators);
2702         return gen;
2703 }
2704
2705 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2706 {
2707         char *accept_copy = ast_strdupa(accept);
2708         char *subtype = accept_copy;
2709         char *type = strsep(&subtype, "/");
2710
2711         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2712                 return NULL;
2713         }
2714
2715         return find_body_generator_type_subtype(type, subtype);
2716 }
2717
2718 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2719                 size_t num_accept, const char *body_type)
2720 {
2721         int i;
2722         struct ast_sip_pubsub_body_generator *generator = NULL;
2723
2724         for (i = 0; i < num_accept; ++i) {
2725                 generator = find_body_generator_accept(accept[i]);
2726                 if (generator) {
2727                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2728                         if (strcmp(generator->body_type, body_type)) {
2729                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2730                                                 generator->type, generator->subtype, generator);
2731                                 generator = NULL;
2732                                 continue;
2733                         }
2734                         break;
2735                 } else {
2736                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2737                 }
2738         }
2739
2740         return generator;
2741 }
2742
2743 static int generate_initial_notify(struct ast_sip_subscription *sub)
2744 {
2745         void *notify_data;
2746         int res;
2747         struct ast_sip_body_data data = {
2748                 .body_type = sub->handler->body_type,
2749         };
2750
2751         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2752                 int i;
2753
2754                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2755                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2756                                 return -1;
2757                         }
2758                 }
2759
2760                 return 0;
2761         }
2762
2763         /* We notify subscription establishment only on the tree leaves. */
2764         if (sub->handler->notifier->subscription_established(sub)) {
2765                 return -1;
2766         }
2767
2768         notify_data = sub->handler->notifier->get_notify_data(sub);
2769         if (!notify_data) {
2770                 return -1;
2771         }
2772
2773         data.body_data = notify_data;
2774
2775         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2776                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2777
2778         ao2_cleanup(notify_data);
2779
2780         return res;
2781 }
2782
2783 static int pubsub_on_refresh_timeout(void *userdata);
2784
2785 static int initial_notify_task(void * obj)
2786 {
2787         struct initial_notify_data *ind = obj;
2788
2789         if (generate_initial_notify(ind->sub_tree->root)) {
2790                 pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE);
2791         } else {
2792                 send_notify(ind->sub_tree, 1);
2793                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2794                         "Resource: %s",
2795                         ind->sub_tree->root->resource);
2796         }
2797
2798         if (ind->expires > -1) {
2799                 char *name = ast_alloca(strlen("->/ ") +
2800                         strlen(ind->sub_tree->persistence->endpoint) +
2801                         strlen(ind->sub_tree->root->resource) +
2802                         strlen(ind->sub_tree->root->handler->event_name) +
2803                         ind->sub_tree->dlg->call_id->id.slen + 1);
2804
2805                 sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint,
2806                         ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name,
2807                         (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr);
2808
2809                 ast_debug(3, "Scheduling timer: %s\n", name);
2810                 ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer,
2811                         ind->expires * 1000, pubsub_on_refresh_timeout, name,
2812                         ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2);
2813                 if (!ind->sub_tree->expiration_task) {
2814                         ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n",
2815                                 ind->expires, name);
2816                 }
2817         }
2818
2819         ao2_ref(ind->sub_tree, -1);
2820         ast_free(ind);
2821
2822         return 0;
2823 }
2824
2825 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2826 {
2827         pjsip_expires_hdr *expires_header;
2828         struct ast_sip_subscription_handler *handler;
2829         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2830         struct sip_subscription_tree *sub_tree;
2831         struct ast_sip_pubsub_body_generator *generator;
2832         char *resource;
2833         pjsip_uri *request_uri;
2834         pjsip_sip_uri *request_uri_sip;
2835         size_t resource_size;
2836         int resp;
2837         struct resource_tree tree;
2838         pj_status_t dlg_status;
2839
2840         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2841         ast_assert(endpoint != NULL);
2842
2843         if (!endpoint->subscription.allow) {
2844                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2845                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2846                 return PJ_TRUE;
2847         }
2848
2849         request_uri = rdata->msg_info.msg->line.req.uri;
2850
2851         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2852                 char uri_str[PJSIP_MAX_URL_SIZE];
2853
2854                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2855                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2856                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2857                 return PJ_TRUE;
2858         }
2859
2860         request_uri_sip = pjsip_uri_get_uri(request_uri);
2861         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2862         resource = ast_alloca(resource_size);
2863         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2864
2865         /*
2866          * We may want to match without any user options getting
2867          * in the way.
2868          */
2869         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
2870
2871         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2872         if (expires_header) {
2873                 if (expires_header->ivalue == 0) {
2874                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2875                                 ast_sorcery_object_get_id(endpoint));
2876                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2877                                 return PJ_TRUE;
2878                 }
2879                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2880                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2881                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2882                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2883                         return PJ_TRUE;
2884                 }
2885         }
2886
2887         handler = subscription_get_handler_from_rdata(rdata);
2888         if (!handler) {
2889                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2890                 return PJ_TRUE;
2891         }
2892
2893         generator = subscription_get_generator_from_rdata(rdata, handler);
2894         if (!generator) {
2895                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2896                 return PJ_TRUE;
2897         }
2898
2899         memset(&tree, 0, sizeof(tree));
2900         resp = build_resource_tree(endpoint, handler, resource, &tree,
2901                 ast_sip_pubsub_has_eventlist_support(rdata));
2902         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2903                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2904                 resource_tree_destroy(&tree);
2905                 return PJ_TRUE;
2906         }
2907
2908         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2909         if (!sub_tree) {
2910                 if (dlg_status != PJ_EEXISTS) {
2911                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2912                 }
2913         } else {
2914                 struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
2915
2916                 if (!ind) {
2917                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2918                         resource_tree_destroy(&tree);
2919                         return PJ_TRUE;
2920                 }
2921
2922                 ind->sub_tree = ao2_bump(sub_tree);
2923                 /* Since this is a normal subscribe, pjproject takes care of the timer */
2924                 ind->expires = -1;
2925
2926                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2927                 subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
2928                 sip_subscription_accept(sub_tree, rdata, resp);
2929                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
2930                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2931                         ao2_ref(sub_tree, -1);
2932                         ast_free(ind);
2933                 }
2934         }
2935
2936         resource_tree_destroy(&tree);
2937         return PJ_TRUE;
2938 }
2939
2940 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2941 {
2942         struct ast_sip_publish_handler *iter = NULL;
2943
2944         AST_RWLIST_RDLOCK(&publish_handlers);
2945         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2946                 if (strcmp(event, iter->event_name)) {
2947                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2948                         continue;
2949                 }
2950                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2951                 break;
2952         }
2953         AST_RWLIST_UNLOCK(&publish_handlers);
2954
2955         return iter;
2956 }
2957
2958 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2959         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2960 {
2961         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2962
2963         if (etag_hdr) {
2964                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2965
2966                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2967
2968                 if (sscanf(etag, "%30d", entity_id) != 1) {
2969                         return SIP_PUBLISH_UNKNOWN;
2970                 }
2971         }
2972
2973         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2974
2975         if (!(*expires)) {
2976                 return SIP_PUBLISH_REMOVE;
2977         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2978                 return SIP_PUBLISH_INITIAL;
2979         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2980                 return SIP_PUBLISH_REFRESH;
2981         } else if (etag_hdr && rdata->msg_info.msg->body) {
2982                 return SIP_PUBLISH_MODIFY;
2983         }
2984
2985         return SIP_PUBLISH_UNKNOWN;
2986 }
2987
2988 /*! \brief Internal destructor for publications */
2989 static void publication_destroy_fn(void *obj)
2990 {
2991         struct ast_sip_publication *publication = obj;
2992
2993         ast_debug(3, "Destroying SIP publication\n");
2994
2995         ao2_cleanup(publication->datastores);
2996         ao2_cleanup(publication->endpoint);
2997 }
2998
2999 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
3000         const char *resource, const char *event_configuration_name)
3001 {
3002         struct ast_sip_publication *publication;
3003         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
3004         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
3005         char *dst;
3006
3007         ast_assert(endpoint != NULL);
3008
3009         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
3010                 return NULL;
3011         }
3012
3013         if (!(publication->datastores = ast_datastores_alloc())) {
3014                 ao2_ref(publication, -1);
3015                 return NULL;
3016         }
3017
3018         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3019         ao2_ref(endpoint, +1);
3020         publication->endpoint = endpoint;
3021         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
3022         publication->sched_id = -1;
3023         dst = publication->data;
3024         publication->resource = strcpy(dst, resource);
3025         dst += resource_len;
3026         publication->event_configuration_name = strcpy(dst, event_configuration_name);
3027
3028         return publication;
3029 }
3030
3031 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
3032                 pjsip_rx_data *rdata)
3033 {
3034         pjsip_tx_data *tdata;
3035         pjsip_transaction *tsx;
3036
3037         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
3038                 return -1;
3039         }
3040
3041         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
3042                 char buf[30];
3043
3044                 snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
3045                 ast_sip_add_header(tdata, "SIP-ETag", buf);
3046
3047                 snprintf(buf, sizeof(buf), "%d", pub->expires);
3048                 ast_sip_add_header(tdata, "Expires", buf);
3049         }
3050
3051         if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
3052                 pjsip_tx_data_dec_ref(tdata);
3053                 return -1;
3054         }
3055
3056         pjsip_tsx_recv_msg(tsx, rdata);
3057
3058         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
3059                 pjsip_tx_data_dec_ref(tdata);
3060                 return -1;
3061         }
3062
3063         return 0;
3064 }
3065
3066 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
3067         struct ast_sip_publish_handler *handler)
3068 {
3069         struct ast_sip_publication *publication;
3070         char *resource_name;
3071         size_t resource_size;
3072         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
3073         struct ast_variable *event_configuration_name = NULL;
3074         pjsip_uri *request_uri;
3075         pjsip_sip_uri *request_uri_sip;
3076         int resp;
3077
3078         request_uri = rdata->msg_info.msg->line.req.uri;
3079
3080         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
3081                 char uri_str[PJSIP_MAX_URL_SIZE];
3082
3083                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
3084                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
3085                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
3086                 return NULL;
3087         }
3088
3089         request_uri_sip = pjsip_uri_get_uri(request_uri);
3090         resource_size = pj_strlen(&request_uri_sip->user) + 1;
3091         resource_name = ast_alloca(resource_size);
3092         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
3093
3094         /*
3095          * We may want to match without any user options getting
3096          * in the way.
3097          */
3098         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
3099
3100         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
3101         if (!resource) {
3102                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
3103                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3104                 return NULL;
3105         }
3106
3107         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
3108                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
3109                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
3110                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
3111                 return NULL;
3112         }
3113
3114         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
3115                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
3116                         break;
3117                 }
3118         }
3119
3120         if (!event_configuration_name) {
3121                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
3122                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3123                 return NULL;
3124         }
3125
3126         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
3127
3128         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
3129                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
3130                 return NULL;
3131         }
3132
3133         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3134
3135         if (!publication) {
3136                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3137                 return NULL;
3138         }
3139
3140         publication->handler = handler;
3141         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3142                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3143                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3144                 ao2_cleanup(publication);
3145                 return NULL;
3146         }
3147
3148         sip_publication_respond(publication, resp, rdata);
3149
3150         return publication;
3151 }
3152
3153 static int publish_expire_callback(void *data)
3154 {
3155         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3156
3157         if (publication->handler->publish_expire) {
3158                 publication->handler->publish_expire(publication);
3159         }
3160
3161         return 0;
3162 }
3163
3164 static int publish_expire(const void *data)
3165 {
3166         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3167
3168         ao2_unlink(publication->handler->publications, publication);
3169         publication->sched_id = -1;
3170
3171         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3172                 ao2_cleanup(publication);
3173         }
3174
3175         return 0;
3176 }
3177
3178 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3179 {
3180         pjsip_event_hdr *event_header;
3181         struct ast_sip_publish_handler *handler;
3182         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3183         char event[32];
3184         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3185         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3186         enum sip_publish_type publish_type;
3187         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3188         int expires = 0, entity_id, response = 0;
3189
3190         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3191         ast_assert(endpoint != NULL);
3192
3193         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3194         if (!event_header) {
3195                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3196                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3197                 return PJ_TRUE;
3198         }
3199         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3200
3201         handler = find_pub_handler(event);
3202         if (!handler) {
3203                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3204                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3205                 return PJ_TRUE;
3206         }
3207
3208         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3209
3210         /* If this is not an initial publish ensure that a publication is present */
3211         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3212                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3213                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3214
3215                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3216                                 NULL, NULL);
3217                         return PJ_TRUE;
3218                 }
3219
3220                 /* Per the RFC every response has to have a new entity tag */
3221                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3222
3223                 /* Update the expires here so that the created responses will contain the correct value */
3224                 publication->expires = expires;
3225         }
3226
3227         switch (publish_type) {
3228                 case SIP_PUBLISH_INITIAL:
3229                         publication = publish_request_initial(endpoint, rdata, handler);
3230                         break;
3231                 case SIP_PUBLISH_REFRESH:
3232                 case SIP_PUBLISH_MODIFY:
3233                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3234                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3235                                 /* If an error occurs we want to terminate the publication */
3236                                 expires = 0;
3237                         }
3238                         response = 200;
3239                         break;
3240                 case SIP_PUBLISH_REMOVE:
3241                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3242                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3243                         response = 200;
3244                         break;
3245                 case SIP_PUBLISH_UNKNOWN:
3246                 default:
3247                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3248                         break;
3249         }
3250
3251         if (publication) {
3252                 if (expires) {
3253                         ao2_link(handler->publications, publication);
3254
3255                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3256                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3257                 } else {
3258                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3259                 }
3260         }
3261
3262         if (response) {
3263                 sip_publication_respond(publication, response, rdata);
3264         }
3265
3266         return PJ_TRUE;
3267 }
3268
3269 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3270 {
3271         return pub->endpoint;
3272 }
3273
3274 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3275 {
3276         return pub->resource;
3277 }
3278
3279 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3280 {
3281         return pub->event_configuration_name;
3282 }
3283
3284 int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype)
3285 {
3286         return !!find_body_generator_type_subtype(type, subtype);
3287 }
3288
3289 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3290 {
3291         struct ast_sip_pubsub_body_generator *existing;
3292         pj_str_t accept;
3293         pj_size_t accept_len;
3294
3295         AST_RWLIST_WRLOCK(&body_generators);
3296         existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype);
3297         if (existing) {
3298                 AST_RWLIST_UNLOCK(&body_generators);
3299                 ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n",
3300                         generator->type, generator->subtype);
3301                 return -1;
3302         }
3303         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3304         AST_RWLIST_UNLOCK(&body_generators);
3305
3306         /* Lengths of type and subtype plus a slash. */
3307         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3308
3309         /* Add room for null terminator that sprintf() will set. */
3310         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3311         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3312
3313         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3314                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3315
3316         return 0;
3317 }
3318
3319 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3320 {
3321         struct ast_sip_pubsub_body_generator *iter;
3322
3323         AST_RWLIST_WRLOCK(&body_generators);
3324         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3325                 if (iter == generator) {
3326                         AST_LIST_REMOVE_CURRENT(list);
3327                         break;
3328                 }
3329         }
3330         AST_RWLIST_TRAVERSE_SAFE_END;
3331         AST_RWLIST_UNLOCK(&body_generators);
3332 }
3333
3334 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3335 {
3336         AST_RWLIST_WRLOCK(&body_supplements);
3337         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3338         AST_RWLIST_UNLOCK(&body_supplements);
3339
3340         return 0;
3341 }
3342
3343 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3344 {
3345         struct ast_sip_pubsub_body_supplement *iter;
3346
3347         AST_RWLIST_WRLOCK(&body_supplements);
3348         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3349                 if (iter == supplement) {
3350                         AST_LIST_REMOVE_CURRENT(list);
3351                         break;
3352                 }
3353         }
3354         AST_RWLIST_TRAVERSE_SAFE_END;
3355         AST_RWLIST_UNLOCK(&body_supplements);
3356 }
3357
3358 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3359 {
3360         return sub->body_generator->type;
3361 }
3362
3363 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3364 {
3365         return sub->body_generator->subtype;
3366 }
3367
3368 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3369                 struct ast_sip_body_data *data, struct ast_str **str)
3370 {
3371         struct ast_sip_pubsub_body_supplement *supplement;
3372         struct ast_sip_pubsub_body_generator *generator;
3373         int res = 0;
3374         void *body;
3375
3376         generator = find_body_generator_type_subtype(type, subtype);
3377         if (!generator) {
3378                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3379                                 type, subtype);
3380                 return -1;
3381         }
3382
3383         if (strcmp(data->body_type, generator->body_type)) {
3384                 ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n",
3385                         type, subtype);
3386                 return -1;
3387         }
3388
3389         body = generator->allocate_body(data->body_data);
3390         if (!body) {
3391                 ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n",
3392                         type, subtype);
3393                 return -1;
3394         }
3395
3396         if (generator->generate_body_content(body, data->body_data)) {
3397                 res = -1;
3398                 goto end;
3399         }
3400
3401         AST_RWLIST_RDLOCK(&body_supplements);
3402         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3403                 if (!strcmp(generator->type, supplement->type) &&
3404                                 !strcmp(generator->subtype, supplement->subtype)) {
3405                         res = supplement->supplement_body(body, data->body_data);
3406                         if (res) {
3407                                 break;
3408                         }
3409                 }
3410         }
3411         AST_RWLIST_UNLOCK(&body_supplements);
3412
3413         if (!res) {
3414                 generator->to_string(body, str);
3415         }
3416
3417 end:
3418         if (generator->destroy_body) {
3419                 generator->destroy_body(body);
3420         }
3421
3422         return res;
3423 }
3424
3425 struct simple_message_summary {
3426         int messages_waiting;
3427         int voice_messages_new;
3428         int voice_messages_old;
3429         int voice_messages_urgent_new;
3430         int voice_messages_urgent_old;
3431         char message_account[PJSIP_MAX_URL_SIZE];
3432 };
3433
3434 static int parse_simple_message_summary(char *body,
3435         struct simple_message_summary *summary)
3436 {
3437         char *line;
3438         char *buffer;
3439         int found_counts = 0;
3440
3441         if (ast_strlen_zero(body) || !summary) {
3442                 return -1;
3443         }
3444
3445         buffer = ast_strdupa(body);
3446         memset(summary, 0, sizeof(*summary));
3447
3448         while ((line = ast_read_line_from_buffer(&buffer))) {
3449                 line = ast_str_to_lower(line);
3450
3451                 if (sscanf(line, "voice-message: %d/%d (%d/%d)",
3452                         &summary->voice_messages_new, &summary->voice_messages_old,
3453                         &summary->voice_messages_urgent_new, &summary->voice_messages_urgent_old)) {
3454                         found_counts = 1;
3455                 } else {
3456                         sscanf(line, "message-account: %s", summary->message_account);
3457                 }
3458         }
3459
3460         return !found_counts;
3461 }
3462
3463 static pj_bool_t pubsub_on_rx_mwi_notify_request(pjsip_rx_data *rdata)
3464 {
3465         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3466         struct simple_message_summary summary;
3467         const char *endpoint_name;
3468         char *atsign;
3469         char *context;
3470         char *body;
3471         char *mailbox;
3472         int rc;
3473
3474         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3475         if (!endpoint) {
3476                 ast_debug(1, "Incoming MWI: Endpoint not found in rdata (%p)\n", rdata);
3477                 rc = 404;
3478                 goto error;
3479         }
3480
3481         endpoint_name = ast_sorcery_object_get_id(endpoint);
3482         ast_debug(1, "Incoming MWI: Found endpoint: %s\n", endpoint_name);
3483         if (ast_strlen_zero(endpoint->incoming_mwi_mailbox)) {
3484                 ast_debug(1, "Incoming MWI: No incoming mailbox specified for endpoint '%s'\n", endpoint_name);
3485                 ast_test_suite_event_notify("PUBSUB_NO_INCOMING_MWI_MAILBOX",
3486                         "Endpoint: %s", endpoint_name);
3487                 rc = 404;
3488                 goto error;
3489         }
3490
3491         mailbox = ast_strdupa(endpoint->incoming_mwi_mailbox);
3492         atsign = strchr(mailbox, '@');
3493         if (!atsign) {
3494                 ast_debug(1, "Incoming MWI: No '@' found in endpoint %s's incoming mailbox '%s'.  Can't parse context\n",
3495                         endpoint_name, endpoint->incoming_mwi_mailbox);
3496                 rc = 404;
3497                 goto error;
3498         }
3499
3500         *atsign = '\0';
3501         context = atsign + 1;
3502
3503         body = ast_alloca(rdata->msg_info.msg->body->len + 1);
3504         rdata->msg_info.msg->body->print_body(rdata->msg_info.msg->body, body,
3505                 rdata->msg_info.msg->body->len + 1);
3506
3507         if (parse_simple_message_summary(body, &summary) != 0) {
3508                 ast_debug(1, "Incoming MWI: Endpoint: '%s' There was an issue getting message info from body '%s'\n",
3509                         ast_sorcery_object_get_id(endpoint), body);
3510                 rc = 404;
3511                 goto error;
3512         }
3513
3514         if (ast_publish_mwi_state(mailbox, context,
3515                 summary.voice_messages_new, summary.voice_messages_old)) {
3516                 ast_log(LOG_ERROR, "Incoming MWI: Endpoint: '%s' Could not publish MWI to stasis.  "
3517                         "Mailbox: %s Message-Account: %s Voice-Messages: %d/%d (%d/%d)\n",
3518                         endpoint_name, endpoint->incoming_mwi_mailbox, summary.message_account,
3519                         summary.voice_messages_new, summary.voice_messages_old,
3520                         summary.voice_messages_urgent_new, summary.voice_messages_urgent_old);
3521                 rc = 404;
3522         } else {
3523                 ast_debug(1, "Incoming MWI: Endpoint: '%s' Mailbox: %s Message-Account: %s Voice-Messages: %d/%d (%d/%d)\n",
3524                         endpoint_name, endpoint->incoming_mwi_mailbox, summary.message_account,
3525                         summary.voice_messages_new, summary.voice_messages_old,
3526                         summary.voice_messages_urgent_new, summary.voice_messages_urgent_old);
3527                 ast_test_suite_event_notify("PUBSUB_INCOMING_MWI_PUBLISH",
3528                         "Endpoint: %s\r\n"
3529                         "Mailbox: %s\r\n"
3530                         "MessageAccount: %s\r\n"
3531                         "VoiceMessagesNew: %d\r\n"
3532                         "VoiceMessagesOld: %d\r\n"
3533                         "VoiceMessagesUrgentNew: %d\r\n"
3534                         "VoiceMessagesUrgentOld: %d",
3535                                 endpoint_name, endpoint->incoming_mwi_mailbox, summary.message_account,
3536                                 summary.voice_messages_new, summary.voice_messages_old,
3537                                 summary.voice_messages_urgent_new, summary.voice_messages_urgent_old);
3538                 rc = 200;
3539         }
3540
3541 error:
3542         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, rc, NULL, NULL, NULL);
3543         return PJ_TRUE;
3544 }
3545
3546 static pj_bool_t pubsub_on_rx_notify_request(pjsip_rx_data *rdata)
3547 {
3548         if (rdata->msg_info.msg->body &&
3549                 ast_sip_is_content_type(&rdata->msg_info.msg->body->content_type,
3550                                                                 "application", "simple-message-summary")) {
3551                 return pubsub_on_rx_mwi_notify_request(rdata);
3552         }
3553         return PJ_FALSE;
3554 }
3555
3556 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3557 {
3558         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3559                 return pubsub_on_rx_subscribe_request(rdata);
3560         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3561                 return pubsub_on_rx_publish_request(rdata);
3562         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_notify_method)) {
3563                 return pubsub_on_rx_notify_request(rdata);
3564         }
3565
3566         return PJ_FALSE;
3567 }
3568
3569 static void set_state_terminated(struct ast_sip_subscription *sub)
3570 {
3571         int i;
3572
3573         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3574         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3575                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3576         }
3577 }
3578
3579 /*!
3580  * \brief Callback sequence for subscription terminate:
3581  *
3582  * * Client initiated:
3583  *     pjproject receives SUBSCRIBE on the subscription's serializer thread
3584  *         calls pubsub_on_rx_refresh with dialog locked
3585  *             pubsub_on_rx_refresh sets TERMINATE_PENDING
3586  *             pushes serialized_pubsub_on_refresh_timeout
3587  *             returns to pjproject
3588  *         pjproject calls pubsub_on_evsub_state
3589  *             pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no)
3590  *             ignore and return
3591  *         pjproject unlocks dialog
3592  *     serialized_pubsub_on_refresh_timeout starts (1)
3593  *       locks dialog
3594  *       checks state == TERMINATE_PENDING
3595  *       sets TERMINATE_IN_PROGRESS
3596  *       calls send_notify (2)
3597  *           send_notify ultimately calls pjsip_evsub_send_request
3598  *               pjsip_evsub_send_request calls evsub's set_state
3599  *                   set_state calls pubsub_evsub_set_state
3600  *                       pubsub_on_evsub_state checks state == TERMINATE_IN_PROGRESS
3601  *                       removes the subscriptions
3602  *                       cleans up references to evsub
3603  *                       sets state = TERMINATED
3604  *       serialized_pubsub_on_refresh_timeout unlocks dialog
3605  *
3606  * * Subscription timer expires:
3607  *     pjproject timer expires
3608  *         locks dialog
3609  *         calls pubsub_on_server_timeout
3610  *             pubsub_on_server_timeout checks state == NORMAL
3611  *             sets TERMINATE_PENDING
3612  *             pushes serialized_pubsub_on_refresh_timeout
3613  *             returns to pjproject
3614  *         pjproject unlocks dialog
3615  *     serialized_pubsub_on_refresh_timeout starts
3616  *         See (1) Above
3617  *
3618  * * Transmission failure sending NOTIFY or error response from client
3619  *     pjproject transaction timer expires or non OK response
3620  *         pjproject locks dialog
3621  *         calls pubsub_on_evsub_state with event TSX_STATE
3622  *             pubsub_on_evsub_state checks event == TSX_STATE
3623  *             removes the subscriptions
3624  *             cleans up references to evsub
3625  *             sets state = TERMINATED
3626  *         pjproject unlocks dialog
3627  *
3628  * * ast_sip_subscription_notify is called
3629  *       checks state == NORMAL
3630  *       if not batched...
3631  *           sets TERMINATE_IN_PROGRESS (if terminate is requested)
3632  *           calls send_notify
3633  *               See (2) Above
3634  *       if batched...
3635  *           sets TERMINATE_PENDING
3636  *           schedules task
3637  *       scheduler runs sched_task
3638  *           sched_task pushes serialized_send_notify
3639  *       serialized_send_notify starts
3640  *           checks state <= TERMINATE_PENDING
3641  *           if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS
3642  *           call send_notify
3643  *               See (2) Above
3644  *
3645  */
3646
3647 /*!
3648  * \brief PJSIP callback when underlying SIP subscription changes state
3649  *
3650  * Although this function is called for every state change, we only care
3651  * about the TERMINATED state, and only when we're actually processing the final
3652  * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS) OR when a transmission failure
3653  * occurs (PJSIP_EVENT_TSX_STATE).  In this case, we do all the subscription tree
3654  * cleanup tasks and decrement the evsub reference.
3655  */
3656 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3657 {
3658         struct sip_subscription_tree *sub_tree =
3659                 pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3660
3661         ast_debug(3, "evsub %p state %s event %s sub_tree %p sub_tree state %s\n", evsub,
3662                 pjsip_evsub_get_state_name(evsub), pjsip_event_str(event->type), sub_tree,
3663                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3664
3665         if (!sub_tree || pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3666                 return;
3667         }
3668
3669         /* It's easier to write this as what we WANT to process, then negate it. */
3670         if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS
3671                 || (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL)
3672                 )) {
3673                 ast_debug(3, "Do nothing.\n");
3674                 return;
3675         }
3676
3677         if (sub_tree->expiration_task) {
3678                 char task_name[256];
3679
3680                 ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name));
3681                 ast_debug(3, "Cancelling timer: %s\n", task_name);
3682                 ast_sip_sched_task_cancel(sub_tree->expiration_task);
3683                 ao2_cleanup(sub_tree->expiration_task);
3684                 sub_tree->expiration_task = NULL;
3685         }
3686
3687         remove_subscription(sub_tree);
3688
3689         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3690
3691 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
3692         pjsip_evsub_dec_ref(sub_tree->evsub);
3693 #endif
3694
3695         sub_tree->evsub = NULL;
3696
3697         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
3698         ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
3699
3700         subscription_persistence_remove(sub_tree);
3701         shutdown_subscriptions(sub_tree->root);
3702
3703         sub_tree->state = SIP_SUB_TREE_TERMINATED;
3704         /* Remove evsub's reference to the sub_tree */
3705         ao2_ref(sub_tree, -1);
3706 }
3707
3708 static int pubsub_on_refresh_timeout(void *userdata)
3709 {
3710         struct sip_subscription_tree *sub_tree = userdata;
3711         pjsip_dialog *dlg = sub_tree->dlg;
3712
3713         ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
3714                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3715
3716         pjsip_dlg_inc_lock(dlg);
3717         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3718                 pjsip_dlg_dec_lock(dlg);
3719                 return 0;
3720         }
3721
3722         if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) {
3723                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
3724                 set_state_terminated(sub_tree->root);
3725         }
3726
3727         send_notify(sub_tree, 1);
3728
3729         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3730                                 "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3731                                 "Resource: %s", sub_tree->root->resource);
3732
3733         pjsip_dlg_dec_lock(dlg);
3734
3735         return 0;
3736 }
3737
3738 static int serialized_pubsub_on_refresh_timeout(void *userdata)
3739 {
3740         struct sip_subscription_tree *sub_tree = userdata;
3741
3742         ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
3743                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3744
3745         pubsub_on_refresh_timeout(userdata);
3746         ao2_cleanup(sub_tree);
3747