res_pjsip: Fix leak on error in ast_sip_auth_vector_init.
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/app.h"
35 #include "asterisk/res_pjsip_pubsub.h"
36 #include "asterisk/module.h"
37 #include "asterisk/linkedlists.h"
38 #include "asterisk/astobj2.h"
39 #include "asterisk/datastore.h"
40 #include "asterisk/uuid.h"
41 #include "asterisk/taskprocessor.h"
42 #include "asterisk/sched.h"
43 #include "asterisk/res_pjsip.h"
44 #include "asterisk/callerid.h"
45 #include "asterisk/manager.h"
46 #include "asterisk/cli.h"
47 #include "asterisk/test.h"
48 #include "res_pjsip/include/res_pjsip_private.h"
49 #include "asterisk/res_pjsip_presence_xml.h"
50
51 /*** DOCUMENTATION
52         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
53                 <synopsis>
54                         Lists subscriptions.
55                 </synopsis>
56                 <syntax />
57                 <description>
58                         <para>
59                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
60                         is issued for each subscription object.  Once all detail events are completed an
61                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
62                         </para>
63                 </description>
64         </manager>
65         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
66                 <synopsis>
67                         Lists subscriptions.
68                 </synopsis>
69                 <syntax />
70                 <description>
71                         <para>
72                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
73                         is issued for each subscription object.  Once all detail events are completed an
74                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
75                         </para>
76                 </description>
77         </manager>
78         <manager name="PJSIPShowResourceLists" language="en_US">
79                 <synopsis>
80                         Displays settings for configured resource lists.
81                 </synopsis>
82                 <syntax />
83                 <description>
84                         <para>
85                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
86                         is issued for each resource list object.  Once all detail events are completed a
87                         <literal>ResourceListDetailComplete</literal> event is issued.
88                         </para>
89                 </description>
90         </manager>
91
92         <configInfo name="res_pjsip_pubsub" language="en_US">
93                 <synopsis>Module that implements publish and subscribe support.</synopsis>
94                 <configFile name="pjsip.conf">
95                         <configObject name="subscription_persistence">
96                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
97                                 <configOption name="packet">
98                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
99                                 </configOption>
100                                 <configOption name="src_name">
101                                         <synopsis>The source address of the subscription</synopsis>
102                                 </configOption>
103                                 <configOption name="src_port">
104                                         <synopsis>The source port of the subscription</synopsis>
105                                 </configOption>
106                                 <configOption name="transport_key">
107                                         <synopsis>The type of transport the subscription was received on</synopsis>
108                                 </configOption>
109                                 <configOption name="local_name">
110                                         <synopsis>The local address the subscription was received on</synopsis>
111                                 </configOption>
112                                 <configOption name="local_port">
113                                         <synopsis>The local port the subscription was received on</synopsis>
114                                 </configOption>
115                                 <configOption name="cseq">
116                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
117                                 </configOption>
118                                 <configOption name="tag">
119                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
120                                 </configOption>
121                                 <configOption name="endpoint">
122                                         <synopsis>The name of the endpoint that subscribed</synopsis>
123                                 </configOption>
124                                 <configOption name="expires">
125                                         <synopsis>The time at which the subscription expires</synopsis>
126                                 </configOption>
127                                 <configOption name="contact_uri">
128                                         <synopsis>The Contact URI of the dialog for the subscription</synopsis>
129                                 </configOption>
130                         </configObject>
131                         <configObject name="resource_list">
132                                 <synopsis>Resource list configuration parameters.</synopsis>
133                                 <description>
134                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
135                                         to be specified. This can be useful to decrease the amount of subscription traffic
136                                         that a server has to process.</para>
137                                         <note>
138                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
139                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
140                                                 will need to make adjustments.</para>
141                                         </note>
142                                 </description>
143                                 <configOption name="type">
144                                         <synopsis>Must be of type 'resource_list'</synopsis>
145                                 </configOption>
146                                 <configOption name="event">
147                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
148                                         <description><para>
149                                                 The SIP event package describes the types of resources that Asterisk reports
150                                                 the state of.
151                                         </para>
152                                                 <enumlist>
153                                                         <enum name="presence"><para>
154                                                                 Device state and presence reporting.
155                                                         </para></enum>
156                                                         <enum name="dialog"><para>
157                                                                 This is identical to <replaceable>presence</replaceable>.
158                                                         </para></enum>
159                                                         <enum name="message-summary"><para>
160                                                                 Message-waiting indication (MWI) reporting.
161                                                         </para></enum>
162                                                 </enumlist>
163                                         </description>
164                                 </configOption>
165                                 <configOption name="list_item">
166                                         <synopsis>The name of a resource to report state on</synopsis>
167                                         <description>
168                                                 <para>In general Asterisk looks up list items in the following way:</para>
169                                                 <para>1. Check if the list item refers to another configured resource list.</para>
170                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
171                                                    to find the specified resource.</para>
172                                                 <para>The second part means that the way the list item is specified depends
173                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
174                                                 set to <literal>presence</literal>, then list items should be in the form of
175                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
176                                                 names should be listed.</para>
177                                         </description>
178                                 </configOption>
179                                 <configOption name="full_state" default="no">
180                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
181                                         <description>
182                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
183                                                 a notification that contains the state of all resources in the list. If the option is
184                                                 disabled, Asterisk will construct a notification that only contains the states of
185                                                 resources that have changed.</para>
186                                                 <note>
187                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
188                                                         to send a notification with the states of all resources in the list. When a subscriber
189                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
190                                                         notification.</para>
191                                                 </note>
192                                         </description>
193                                 </configOption>
194                                 <configOption name="notification_batch_interval" default="0">
195                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
196                                         <description>
197                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
198                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
199                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
200                                                 many notifications.</para>
201                                         </description>
202                                 </configOption>
203                         </configObject>
204                         <configObject name="inbound-publication">
205                                 <synopsis>The configuration for inbound publications</synopsis>
206                                 <configOption name="endpoint" default="">
207                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
208                                 </configOption>
209                                 <configOption name="type">
210                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
211                                 </configOption>
212                         </configObject>
213                 </configFile>
214         </configInfo>
215  ***/
216
217 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
218
219 static struct pjsip_module pubsub_module = {
220         .name = { "PubSub Module", 13 },
221         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
222         .on_rx_request = pubsub_on_rx_request,
223 };
224
225 #define MOD_DATA_PERSISTENCE "sub_persistence"
226 #define MOD_DATA_MSG "sub_msg"
227
228 static const pj_str_t str_event_name = { "Event", 5 };
229
230 /*! \brief Scheduler used for automatically expiring publications */
231 static struct ast_sched_context *sched;
232
233 /*! \brief Number of buckets for publications (on a per handler) */
234 #define PUBLICATIONS_BUCKETS 37
235
236 /*! \brief Default expiration time for PUBLISH if one is not specified */
237 #define DEFAULT_PUBLISH_EXPIRES 3600
238
239 /*! \brief Number of buckets for subscription datastore */
240 #define DATASTORE_BUCKETS 53
241
242 /*! \brief Default expiration for subscriptions */
243 #define DEFAULT_EXPIRES 3600
244
245 /*! \brief Defined method for PUBLISH */
246 const pjsip_method pjsip_publish_method =
247 {
248         PJSIP_OTHER_METHOD,
249         { "PUBLISH", 7 }
250 };
251
252 /*!
253  * \brief The types of PUBLISH messages defined in RFC 3903
254  */
255 enum sip_publish_type {
256         /*!
257          * \brief Unknown
258          *
259          * \details
260          * This actually is not defined in RFC 3903. We use this as a constant
261          * to indicate that an incoming PUBLISH does not fit into any of the
262          * other categories and is thus invalid.
263          */
264         SIP_PUBLISH_UNKNOWN,
265
266         /*!
267          * \brief Initial
268          *
269          * \details
270          * The first PUBLISH sent. This will contain a non-zero Expires header
271          * as well as a body that indicates the current state of the endpoint
272          * that has sent the message. The initial PUBLISH is the only type
273          * of PUBLISH to not contain a Sip-If-Match header in it.
274          */
275         SIP_PUBLISH_INITIAL,
276
277         /*!
278          * \brief Refresh
279          *
280          * \details
281          * Used to keep a published state from expiring. This will contain a
282          * non-zero Expires header but no body since its purpose is not to
283          * update state.
284          */
285         SIP_PUBLISH_REFRESH,
286
287         /*!
288          * \brief Modify
289          *
290          * \details
291          * Used to change state from its previous value. This will contain
292          * a body updating the published state. May or may not contain an
293          * Expires header.
294          */
295         SIP_PUBLISH_MODIFY,
296
297         /*!
298          * \brief Remove
299          *
300          * \details
301          * Used to remove published state from an ESC. This will contain
302          * an Expires header set to 0 and likely no body.
303          */
304         SIP_PUBLISH_REMOVE,
305 };
306
307 /*!
308  * \brief A vector of strings commonly used throughout this module
309  */
310 AST_VECTOR(resources, const char *);
311
312 /*!
313  * \brief Resource list configuration item
314  */
315 struct resource_list {
316         SORCERY_OBJECT(details);
317         /*! SIP event package the list uses. */
318         char event[32];
319         /*! Strings representing resources in the list. */
320         struct resources items;
321         /*! Indicates if Asterisk sends full or partial state on notifications. */
322         unsigned int full_state;
323         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
324         unsigned int notification_batch_interval;
325 };
326
327 /*!
328  * Used to create new entity IDs by ESCs.
329  */
330 static int esc_etag_counter;
331
332 /*!
333  * \brief Structure representing a SIP publication
334  */
335 struct ast_sip_publication {
336         /*! Publication datastores set up by handlers */
337         struct ao2_container *datastores;
338         /*! \brief Entity tag for the publication */
339         int entity_tag;
340         /*! \brief Handler for this publication */
341         struct ast_sip_publish_handler *handler;
342         /*! \brief The endpoint with which the subscription is communicating */
343         struct ast_sip_endpoint *endpoint;
344         /*! \brief Expiration time of the publication */
345         int expires;
346         /*! \brief Scheduled item for expiration of publication */
347         int sched_id;
348         /*! \brief The resource the publication is to */
349         char *resource;
350         /*! \brief The name of the event type configuration */
351         char *event_configuration_name;
352         /*! \brief Data containing the above */
353         char data[0];
354 };
355
356
357 /*!
358  * \brief Structure used for persisting an inbound subscription
359  */
360 struct subscription_persistence {
361         /*! Sorcery object details */
362         SORCERY_OBJECT(details);
363         /*! The name of the endpoint involved in the subscription */
364         char *endpoint;
365         /*! SIP message that creates the subscription */
366         char packet[PJSIP_MAX_PKT_LEN];
367         /*! Source address of the message */
368         char src_name[PJ_INET6_ADDRSTRLEN];
369         /*! Source port of the message */
370         int src_port;
371         /*! Local transport key type */
372         char transport_key[32];
373         /*! Local transport address */
374         char local_name[PJ_INET6_ADDRSTRLEN];
375         /*! Local transport port */
376         int local_port;
377         /*! Next CSeq to use for message */
378         unsigned int cseq;
379         /*! Local tag of the dialog */
380         char *tag;
381         /*! When this subscription expires */
382         struct timeval expires;
383         /*! Contact URI */
384         char contact_uri[PJSIP_MAX_URL_SIZE];
385 };
386
387 /*!
388  * \brief The state of the subscription tree
389  */
390 enum sip_subscription_tree_state {
391         /*! Normal operation */
392         SIP_SUB_TREE_NORMAL = 0,
393         /*! A terminate has been requested by Asterisk, the client, or pjproject */
394         SIP_SUB_TREE_TERMINATE_PENDING,
395         /*! The terminate is in progress */
396         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
397         /*! The terminate process has finished and the subscription tree is no longer valid */
398         SIP_SUB_TREE_TERMINATED,
399 };
400
401 static char *sub_tree_state_description[] = {
402         "Normal",
403         "TerminatePending",
404         "TerminateInProgress",
405         "Terminated"
406 };
407
408 /*!
409  * \brief A tree of SIP subscriptions
410  *
411  * Because of the ability to subscribe to resource lists, a SIP
412  * subscription can result in a tree of subscriptions being created.
413  * This structure represents the information relevant to the subscription
414  * as a whole, to include the underlying PJSIP structure for the
415  * subscription.
416  */
417 struct sip_subscription_tree {
418         /*! The endpoint with which the subscription is communicating */
419         struct ast_sip_endpoint *endpoint;
420         /*! Serializer on which to place operations for this subscription */
421         struct ast_taskprocessor *serializer;
422         /*! The role for this subscription */
423         enum ast_sip_subscription_role role;
424         /*! Persistence information */
425         struct subscription_persistence *persistence;
426         /*! The underlying PJSIP event subscription structure */
427         pjsip_evsub *evsub;
428         /*! The underlying PJSIP dialog */
429         pjsip_dialog *dlg;
430         /*! Interval to use for batching notifications */
431         unsigned int notification_batch_interval;
432         /*! Scheduler ID for batched notification */
433         int notify_sched_id;
434         /*! Indicator if scheduled batched notification should be sent */
435         unsigned int send_scheduled_notify;
436         /*! The root of the subscription tree */
437         struct ast_sip_subscription *root;
438         /*! Is this subscription to a list? */
439         int is_list;
440         /*! Next item in the list */
441         AST_LIST_ENTRY(sip_subscription_tree) next;
442         /*! Subscription tree state */
443         enum sip_subscription_tree_state state;
444         /*! On asterisk restart, this is the task data used
445          * to restart the expiration timer if pjproject isn't
446          * capable of restarting the timer.
447          */
448         struct ast_sip_sched_task *expiration_task;
449 };
450
451 /*!
452  * \brief Structure representing a "virtual" SIP subscription.
453  *
454  * This structure serves a dual purpose. Structurally, it is
455  * the constructed tree of subscriptions based on the resources
456  * being subscribed to. API-wise, this serves as the handle that
457  * subscription handlers use in order to interact with the pubsub API.
458  */
459 struct ast_sip_subscription {
460         /*! Subscription datastores set up by handlers */
461         struct ao2_container *datastores;
462         /*! The handler for this subscription */
463         const struct ast_sip_subscription_handler *handler;
464         /*! Pointer to the base of the tree */
465         struct sip_subscription_tree *tree;
466         /*! Body generaator for NOTIFYs */
467         struct ast_sip_pubsub_body_generator *body_generator;
468         /*! Vector of child subscriptions */
469         AST_VECTOR(, struct ast_sip_subscription *) children;
470         /*! Saved NOTIFY body text for this subscription */
471         struct ast_str *body_text;
472         /*! Indicator that the body text has changed since the last notification */
473         int body_changed;
474         /*! The current state of the subscription */
475         pjsip_evsub_state subscription_state;
476         /*! For lists, the current version to place in the RLMI body */
477         unsigned int version;
478         /*! For lists, indicates if full state should always be communicated. */
479         unsigned int full_state;
480         /*! URI associated with the subscription */
481         pjsip_sip_uri *uri;
482         /*! Name of resource being subscribed to */
483         char resource[0];
484 };
485
486 /*!
487  * \brief Structure representing a publication resource
488  */
489 struct ast_sip_publication_resource {
490         /*! \brief Sorcery object details */
491         SORCERY_OBJECT(details);
492         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
493         char *endpoint;
494         /*! \brief Mapping for event types to configuration */
495         struct ast_variable *events;
496 };
497
498 static const char *sip_subscription_roles_map[] = {
499         [AST_SIP_SUBSCRIBER] = "Subscriber",
500         [AST_SIP_NOTIFIER] = "Notifier"
501 };
502
503 enum sip_persistence_update_type {
504         /*! Called from send request */
505         SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0,
506         /*! Subscription created from initial client request */
507         SUBSCRIPTION_PERSISTENCE_CREATED,
508         /*! Subscription recreated by asterisk on startup */
509         SUBSCRIPTION_PERSISTENCE_RECREATED,
510         /*! Subscription created from client refresh */
511         SUBSCRIPTION_PERSISTENCE_REFRESHED,
512 };
513
514 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
515
516 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
517 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
518
519 static pjsip_media_type rlmi_media_type;
520
521 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
522 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
523                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
524 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
525                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
526 static void pubsub_on_client_refresh(pjsip_evsub *sub);
527 static void pubsub_on_server_timeout(pjsip_evsub *sub);
528  
529 static pjsip_evsub_user pubsub_cb = {
530         .on_evsub_state = pubsub_on_evsub_state,
531         .on_rx_refresh = pubsub_on_rx_refresh,
532         .on_rx_notify = pubsub_on_rx_notify,
533         .on_client_refresh = pubsub_on_client_refresh,
534         .on_server_timeout = pubsub_on_server_timeout,
535 };
536
537 /*! \brief Destructor for publication resource */
538 static void publication_resource_destroy(void *obj)
539 {
540         struct ast_sip_publication_resource *resource = obj;
541
542         ast_free(resource->endpoint);
543         ast_variables_destroy(resource->events);
544 }
545
546 /*! \brief Allocator for publication resource */
547 static void *publication_resource_alloc(const char *name)
548 {
549         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
550 }
551
552 /*! \brief Destructor for subscription persistence */
553 static void subscription_persistence_destroy(void *obj)
554 {
555         struct subscription_persistence *persistence = obj;
556
557         ast_free(persistence->endpoint);
558         ast_free(persistence->tag);
559 }
560
561 /*! \brief Allocator for subscription persistence */
562 static void *subscription_persistence_alloc(const char *name)
563 {
564         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
565 }
566
567 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
568 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
569 {
570         char tag[PJ_GUID_STRING_LENGTH + 1];
571
572         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
573          * look it up by id at all.
574          */
575         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
576                 "subscription_persistence", NULL);
577
578         pjsip_dialog *dlg = sub_tree->dlg;
579
580         if (!persistence) {
581                 return NULL;
582         }
583
584         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
585         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
586         persistence->tag = ast_strdup(tag);
587
588         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
589         return persistence;
590 }
591
592 /*! \brief Function which updates persistence information of a subscription in sorcery */
593 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
594         pjsip_rx_data *rdata, enum sip_persistence_update_type type)
595 {
596         pjsip_dialog *dlg;
597
598         if (!sub_tree->persistence) {
599                 return;
600         }
601
602         ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
603                 sub_tree->root->resource);
604
605         dlg = sub_tree->dlg;
606         sub_tree->persistence->cseq = dlg->local.cseq;
607
608         if (rdata) {
609                 int expires;
610                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
611                 pjsip_contact_hdr *contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
612
613                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
614                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
615
616                 pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
617                         sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
618
619                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
620                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
621                  * will always point to the proper SIP message that is to be processed. When updating subscription
622                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
623                  * only ever have a single SIP message on it, and so we base persistence on that.
624                  */
625                 if (type == SUBSCRIPTION_PERSISTENCE_CREATED
626                         || type == SUBSCRIPTION_PERSISTENCE_RECREATED) {
627                         if (rdata->msg_info.msg_buf) {
628                                 ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
629                                                 MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
630                         } else {
631                                 ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
632                                                 sizeof(sub_tree->persistence->packet));
633                         }
634                 }
635                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
636                                 sizeof(sub_tree->persistence->src_name));
637                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
638                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
639                         sizeof(sub_tree->persistence->transport_key));
640                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
641                         sizeof(sub_tree->persistence->local_name));
642                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
643         }
644
645         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
646 }
647
648 /*! \brief Function which removes persistence of a subscription from sorcery */
649 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
650 {
651         if (!sub_tree->persistence) {
652                 return;
653         }
654
655         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
656         ao2_ref(sub_tree->persistence, -1);
657         sub_tree->persistence = NULL;
658 }
659
660
661 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
662 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
663                 size_t num_accept, const char *body_type);
664
665 /*! \brief Retrieve a handler using the Event header of an rdata message */
666 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
667 {
668         pjsip_event_hdr *event_header;
669         char event[32];
670         struct ast_sip_subscription_handler *handler;
671
672         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
673         if (!event_header) {
674                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
675                 return NULL;
676         }
677         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
678
679         handler = find_sub_handler_for_event_name(event);
680         if (!handler) {
681                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
682         }
683
684         return handler;
685 }
686
687 /*!
688  * \brief Accept headers that are exceptions to the rule
689  *
690  * Typically, when a SUBSCRIBE arrives, we attempt to find a
691  * body generator that matches one of the Accept headers in
692  * the request. When subscribing to a single resource, this works
693  * great. However, when subscribing to a list, things work
694  * differently. Most Accept header values are fine, but there
695  * are a couple that are endemic to resource lists that need
696  * to be ignored when searching for a body generator to use
697  * for the individual resources of the subscription.
698  */
699 const char *accept_exceptions[] =  {
700         "multipart/related",
701         "application/rlmi+xml",
702 };
703
704 /*!
705  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
706  *
707  * \retval 1 This Accept header value is an exception to the rule.
708  * \retval 0 This Accept header is not an exception to the rule.
709  */
710 static int exceptional_accept(const pj_str_t *accept)
711 {
712         int i;
713
714         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
715                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
716                         return 1;
717                 }
718         }
719
720         return 0;
721 }
722
723 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
724 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
725         const struct ast_sip_subscription_handler *handler)
726 {
727         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
728         char accept[AST_SIP_MAX_ACCEPT][64];
729         size_t num_accept_headers = 0;
730
731         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
732                 int i;
733
734                 for (i = 0; i < accept_header->count; ++i) {
735                         if (!exceptional_accept(&accept_header->values[i])) {
736                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
737                                 ++num_accept_headers;
738                         }
739                 }
740         }
741
742         if (num_accept_headers == 0) {
743                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
744                  * the default accept type for the event package is to be used.
745                  */
746                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
747                 num_accept_headers = 1;
748         }
749
750         return find_body_generator(accept, num_accept_headers, handler->body_type);
751 }
752
753 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
754  *
755  *  \retval 1 rdata has an eventlist containing supported header
756  *  \retval 0 rdata doesn't have an eventlist containing supported header
757  */
758 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
759 {
760         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
761
762         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
763                 int i;
764
765                 for (i = 0; i < supported_header->count; i++) {
766                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
767                                 return 1;
768                         }
769                 }
770         }
771
772         return 0;
773 }
774
775 struct resource_tree;
776
777 /*!
778  * \brief A node for a resource tree.
779  */
780 struct tree_node {
781         AST_VECTOR(, struct tree_node *) children;
782         unsigned int full_state;
783         char resource[0];
784 };
785
786 /*!
787  * \brief Helper function for retrieving a resource list for a given event.
788  *
789  * This will retrieve a resource list that corresponds to the resource and event provided.
790  *
791  * \param resource The name of the resource list to retrieve
792  * \param event The expected event name on the resource list
793  */
794 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
795 {
796         struct resource_list *list;
797
798         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
799         if (!list) {
800                 return NULL;
801         }
802
803         if (strcmp(list->event, event)) {
804                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
805                                 resource, list->event, event);
806                 ao2_cleanup(list);
807                 return NULL;
808         }
809
810         return list;
811 }
812
813 /*!
814  * \brief Allocate a tree node
815  *
816  * In addition to allocating and initializing the tree node, the node is also added
817  * to the vector of visited resources. See \ref build_resource_tree for more information
818  * on the visited resources.
819  *
820  * \param resource The name of the resource for this tree node.
821  * \param visited The vector of resources that have been visited.
822  * \param if allocating a list, indicate whether full state is requested in notifications.
823  * \retval NULL Allocation failure.
824  * \retval non-NULL The newly-allocated tree_node
825  */
826 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
827 {
828         struct tree_node *node;
829
830         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
831         if (!node) {
832                 return NULL;
833         }
834
835         strcpy(node->resource, resource);
836         if (AST_VECTOR_INIT(&node->children, 4)) {
837                 ast_free(node);
838                 return NULL;
839         }
840         node->full_state = full_state;
841
842         if (visited) {
843                 AST_VECTOR_APPEND(visited, resource);
844         }
845         return node;
846 }
847
848 /*!
849  * \brief Destructor for a tree node
850  *
851  * This function calls recursively in order to destroy
852  * all nodes lower in the tree from the given node in
853  * addition to the node itself.
854  *
855  * \param node The node to destroy.
856  */
857 static void tree_node_destroy(struct tree_node *node)
858 {
859         int i;
860         if (!node) {
861                 return;
862         }
863
864         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
865                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
866         }
867         AST_VECTOR_FREE(&node->children);
868         ast_free(node);
869 }
870
871 /*!
872  * \brief Determine if this resource has been visited already
873  *
874  * See \ref build_resource_tree for more information
875  *
876  * \param resource The resource currently being visited
877  * \param visited The resources that have previously been visited
878  */
879 static int have_visited(const char *resource, struct resources *visited)
880 {
881         int i;
882
883         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
884                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
885                         return 1;
886                 }
887         }
888
889         return 0;
890 }
891
892 /*!
893  * \brief Build child nodes for a given parent.
894  *
895  * This iterates through the items on a resource list and creates tree nodes for each one. The
896  * tree nodes created are children of the supplied parent node. If an item in the resource
897  * list is itself a list, then this function is called recursively to provide children for
898  * the the new node.
899  *
900  * If an item in a resource list is not a list, then the supplied subscription handler is
901  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
902  * is used to determine if the node can be added to the tree or not.
903  *
904  * If a parent node ends up having no child nodes added under it, then the parent node is
905  * pruned from the tree.
906  *
907  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
908  * \param handler The subscription handler for leaf nodes in the tree.
909  * \param list The configured resource list from which the child node is being built.
910  * \param parent The parent node for these children.
911  * \param visited The resources that have already been visited.
912  */
913 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
914                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
915 {
916         int i;
917
918         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
919                 struct tree_node *current;
920                 struct resource_list *child_list;
921                 const char *resource = AST_VECTOR_GET(&list->items, i);
922
923                 if (have_visited(resource, visited)) {
924                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
925                         continue;
926                 }
927
928                 child_list = retrieve_resource_list(resource, list->event);
929                 if (!child_list) {
930                         int resp = handler->notifier->new_subscribe(endpoint, resource);
931                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
932                                 current = tree_node_alloc(resource, visited, 0);
933                                 if (!current) {
934                                         ast_debug(1,
935                                                 "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n",
936                                                 resource);
937                                         continue;
938                                 }
939                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
940                                                 resource, parent->resource);
941                                 AST_VECTOR_APPEND(&parent->children, current);
942                         } else {
943                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
944                                                 resource, resp);
945                         }
946                 } else {
947                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
948                         current = tree_node_alloc(resource, visited, child_list->full_state);
949                         if (!current) {
950                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
951                                 continue;
952                         }
953                         build_node_children(endpoint, handler, child_list, current, visited);
954                         if (AST_VECTOR_SIZE(&current->children) > 0) {
955                                 ast_debug(1, "List %s had no successful children.\n", resource);
956                                 AST_VECTOR_APPEND(&parent->children, current);
957                         } else {
958                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
959                                                 resource, parent->resource);
960                                 tree_node_destroy(current);
961                         }
962                         ao2_cleanup(child_list);
963                 }
964         }
965 }
966
967 /*!
968  * \brief A resource tree
969  *
970  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
971  * be a resource list. If this is the case, the resource list may contain resources
972  * that are themselves lists. The structure needed to hold the resources is
973  * a tree.
974  *
975  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
976  * to the individual resources in the tree would be successful or not. Any successful
977  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
978  * result in no node being created.
979  *
980  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
981  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
982  */
983 struct resource_tree {
984         struct tree_node *root;
985         unsigned int notification_batch_interval;
986 };
987
988 /*!
989  * \brief Destroy a resource tree.
990  *
991  * This function makes no assumptions about how the tree itself was
992  * allocated and does not attempt to free the tree itself. Callers
993  * of this function are responsible for freeing the tree.
994  *
995  * \param tree The tree to destroy.
996  */
997 static void resource_tree_destroy(struct resource_tree *tree)
998 {
999         if (tree) {
1000                 tree_node_destroy(tree->root);
1001         }
1002 }
1003
1004 /*!
1005  * \brief Build a resource tree
1006  *
1007  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
1008  *
1009  * This function also creates a container that has all resources that have been visited during
1010  * creation of the tree, whether those resources resulted in a tree node being created or not.
1011  * Keeping this container of visited resources allows for misconfigurations such as loops in
1012  * the tree or duplicated resources to be detected.
1013  *
1014  * \param endpoint The endpoint that sent the SUBSCRIBE request.
1015  * \param handler The subscription handler for leaf nodes in the tree.
1016  * \param resource The resource requested in the SUBSCRIBE request.
1017  * \param tree The tree that is to be built.
1018  * \param has_eventlist_support
1019  *
1020  * \retval 200-299 Successfully subscribed to at least one resource.
1021  * \retval 300-699 Failure to subscribe to requested resource.
1022  */
1023 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
1024                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
1025 {
1026         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
1027         struct resources visited;
1028
1029         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
1030                 ast_debug(2, "Subscription '%s->%s' is not to a list\n",
1031                         ast_sorcery_object_get_id(endpoint), resource);
1032                 tree->root = tree_node_alloc(resource, NULL, 0);
1033                 if (!tree->root) {
1034                         return 500;
1035                 }
1036                 return handler->notifier->new_subscribe(endpoint, resource);
1037         }
1038
1039         ast_debug(2, "Subscription '%s->%s' is a list\n",
1040                 ast_sorcery_object_get_id(endpoint), resource);
1041         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
1042                 return 500;
1043         }
1044
1045         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1046         if (!tree->root) {
1047                 AST_VECTOR_FREE(&visited);
1048                 return 500;
1049         }
1050
1051         tree->notification_batch_interval = list->notification_batch_interval;
1052
1053         build_node_children(endpoint, handler, list, tree->root, &visited);
1054         AST_VECTOR_FREE(&visited);
1055
1056         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1057                 return 200;
1058         } else {
1059                 return 500;
1060         }
1061 }
1062
1063 static void add_subscription(struct sip_subscription_tree *obj)
1064 {
1065         AST_RWLIST_WRLOCK(&subscriptions);
1066         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1067         AST_RWLIST_UNLOCK(&subscriptions);
1068 }
1069
1070 static void remove_subscription(struct sip_subscription_tree *obj)
1071 {
1072         struct sip_subscription_tree *i;
1073
1074         AST_RWLIST_WRLOCK(&subscriptions);
1075         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1076                 if (i == obj) {
1077                         AST_RWLIST_REMOVE_CURRENT(next);
1078                         if (i->root) {
1079                                 ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n",
1080                                         ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root));
1081                         }
1082                         break;
1083                 }
1084         }
1085         AST_RWLIST_TRAVERSE_SAFE_END;
1086         AST_RWLIST_UNLOCK(&subscriptions);
1087 }
1088
1089 static void destroy_subscription(struct ast_sip_subscription *sub)
1090 {
1091         ast_debug(3, "Destroying SIP subscription from '%s->%s'\n",
1092                 sub->tree && sub->tree->endpoint ? ast_sorcery_object_get_id(sub->tree->endpoint) : "Unknown",
1093                 sub->resource);
1094
1095         ast_free(sub->body_text);
1096
1097         AST_VECTOR_FREE(&sub->children);
1098         ao2_cleanup(sub->datastores);
1099         ast_free(sub);
1100 }
1101
1102 static void destroy_subscriptions(struct ast_sip_subscription *root)
1103 {
1104         int i;
1105
1106         if (!root) {
1107                 return;
1108         }
1109
1110         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1111                 struct ast_sip_subscription *child;
1112
1113                 child = AST_VECTOR_GET(&root->children, i);
1114                 destroy_subscriptions(child);
1115         }
1116
1117         destroy_subscription(root);
1118 }
1119
1120 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1121                 const char *resource, struct sip_subscription_tree *tree)
1122 {
1123         struct ast_sip_subscription *sub;
1124         pjsip_sip_uri *contact_uri;
1125
1126         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1127         if (!sub) {
1128                 return NULL;
1129         }
1130         strcpy(sub->resource, resource); /* Safe */
1131
1132         sub->datastores = ast_datastores_alloc();
1133         if (!sub->datastores) {
1134                 destroy_subscription(sub);
1135                 return NULL;
1136         }
1137
1138         sub->body_text = ast_str_create(128);
1139         if (!sub->body_text) {
1140                 destroy_subscription(sub);
1141                 return NULL;
1142         }
1143
1144         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1145         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1146         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1147         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1148
1149         sub->handler = handler;
1150         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1151         sub->tree = ao2_bump(tree);
1152
1153         return sub;
1154 }
1155
1156 /*!
1157  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1158  *
1159  * \param handler The handler to supply to leaf subscriptions.
1160  * \param resource The requested resource for this subscription.
1161  * \param generator Body generator to use for leaf subscriptions.
1162  * \param tree The root of the subscription tree.
1163  * \param current The tree node that corresponds to the subscription being created.
1164  */
1165 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1166                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1167                 struct sip_subscription_tree *tree, struct tree_node *current)
1168 {
1169         int i;
1170         struct ast_sip_subscription *sub;
1171
1172         sub = allocate_subscription(handler, resource, tree);
1173         if (!sub) {
1174                 return NULL;
1175         }
1176
1177         sub->full_state = current->full_state;
1178         sub->body_generator = generator;
1179         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1180
1181         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1182                 struct ast_sip_subscription *child;
1183                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1184
1185                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1186                                 tree, child_node);
1187
1188                 if (!child) {
1189                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1190                                         child_node->resource);
1191                         continue;
1192                 }
1193
1194                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1195                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1196                                         child_node->resource);
1197                 }
1198         }
1199
1200         return sub;
1201 }
1202
1203 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1204 {
1205         int i;
1206
1207         if (!sub) {
1208                 return;
1209         }
1210
1211         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1212                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1213                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1214                 }
1215                 return;
1216         }
1217
1218         /* We notify subscription shutdown only on the tree leaves. */
1219         if (sub->handler->subscription_shutdown) {
1220                 sub->handler->subscription_shutdown(sub);
1221         }
1222 }
1223 static int subscription_unreference_dialog(void *obj)
1224 {
1225         struct sip_subscription_tree *sub_tree = obj;
1226
1227         /* This is why we keep the dialog on the subscription. When the subscription
1228          * is destroyed, there is no guarantee that the underlying dialog is ready
1229          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1230          * either. The dialog could be destroyed before our subscription is. We fix
1231          * this problem by keeping a reference to the dialog until it is time to
1232          * destroy the subscription. We need to have the dialog available when the
1233          * subscription is destroyed so that we can guarantee that our attempt to
1234          * remove the serializer will be successful.
1235          */
1236         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1237         sub_tree->dlg = NULL;
1238
1239         return 0;
1240 }
1241
1242 static void subscription_tree_destructor(void *obj)
1243 {
1244         struct sip_subscription_tree *sub_tree = obj;
1245
1246         ast_debug(3, "Destroying subscription tree %p '%s->%s'\n",
1247                 sub_tree,
1248                 sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
1249                 sub_tree->root ? sub_tree->root->resource : "Unknown");
1250
1251         destroy_subscriptions(sub_tree->root);
1252
1253         if (sub_tree->dlg) {
1254                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1255         }
1256
1257         ao2_cleanup(sub_tree->endpoint);
1258
1259         ast_taskprocessor_unreference(sub_tree->serializer);
1260         ast_module_unref(ast_module_info->self);
1261 }
1262
1263 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1264 {
1265         ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n",
1266                 sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree);
1267         ao2_cleanup(sub->tree);
1268 }
1269
1270 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1271 {
1272         sub_tree->dlg = dlg;
1273         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1274         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1275         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1276         pjsip_dlg_inc_session(dlg, &pubsub_module);
1277 }
1278
1279 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1280 {
1281         struct sip_subscription_tree *sub_tree;
1282
1283         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1284         if (!sub_tree) {
1285                 return NULL;
1286         }
1287
1288         ast_module_ref(ast_module_info->self);
1289
1290         if (rdata) {
1291                 /*
1292                  * We must continue using the serializer that the original
1293                  * SUBSCRIBE came in on for the dialog.  There may be
1294                  * retransmissions already enqueued in the original
1295                  * serializer that can result in reentrancy and message
1296                  * sequencing problems.
1297                  */
1298                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1299         } else {
1300                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1301
1302                 /* Create name with seq number appended. */
1303                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1304                         ast_sorcery_object_get_id(endpoint));
1305
1306                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1307         }
1308         if (!sub_tree->serializer) {
1309                 ao2_ref(sub_tree, -1);
1310                 return NULL;
1311         }
1312
1313         sub_tree->endpoint = ao2_bump(endpoint);
1314         sub_tree->notify_sched_id = -1;
1315
1316         return sub_tree;
1317 }
1318
1319 /*!
1320  * \brief Create a subscription tree based on a resource tree.
1321  *
1322  * Using the previously-determined valid resources in the provided resource tree,
1323  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1324  * subscription tree is a real subscription, and the rest in the tree are
1325  * virtual subscriptions.
1326  *
1327  * \param handler The handler to use for leaf subscriptions
1328  * \param endpoint The endpoint that sent the SUBSCRIBE request
1329  * \param rdata The SUBSCRIBE content
1330  * \param resource The requested resource in the SUBSCRIBE request
1331  * \param generator The body generator to use in leaf subscriptions
1332  * \param tree The resource tree on which the subscription tree is based
1333  * \param dlg_status[out] The result of attempting to create a dialog.
1334  *
1335  * \retval NULL Could not create the subscription tree
1336  * \retval non-NULL The root of the created subscription tree
1337  */
1338
1339 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1340                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1341                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1342                 pj_status_t *dlg_status)
1343 {
1344         struct sip_subscription_tree *sub_tree;
1345         pjsip_dialog *dlg;
1346         struct subscription_persistence *persistence;
1347
1348         sub_tree = allocate_subscription_tree(endpoint, rdata);
1349         if (!sub_tree) {
1350                 *dlg_status = PJ_ENOMEM;
1351                 return NULL;
1352         }
1353         sub_tree->role = AST_SIP_NOTIFIER;
1354
1355         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1356         if (!dlg) {
1357                 if (*dlg_status != PJ_EEXISTS) {
1358                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1359                 }
1360                 ao2_ref(sub_tree, -1);
1361                 return NULL;
1362         }
1363
1364         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1365                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1366         if (persistence) {
1367                 /* Update the created dialog with the persisted information */
1368                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1369                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1370                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1371                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1372                 dlg->local.cseq = persistence->cseq;
1373         }
1374
1375         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1376         subscription_setup_dialog(sub_tree, dlg);
1377
1378 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1379         pjsip_evsub_add_ref(sub_tree->evsub);
1380 #endif
1381
1382         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1383                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1384
1385         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1386
1387         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1388         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1389                 sub_tree->is_list = 1;
1390         }
1391
1392         add_subscription(sub_tree);
1393
1394         return sub_tree;
1395 }
1396
1397 /*! Wrapper structure for initial_notify_task */
1398 struct initial_notify_data {
1399         struct sip_subscription_tree *sub_tree;
1400         int expires;
1401 };
1402
1403 static int initial_notify_task(void *obj);
1404 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1405
1406 /*! Persistent subscription recreation continuation under distributor serializer data */
1407 struct persistence_recreate_data {
1408         struct subscription_persistence *persistence;
1409         pjsip_rx_data *rdata;
1410 };
1411
1412 /*!
1413  * \internal
1414  * \brief subscription_persistence_recreate continuation under distributor serializer.
1415  * \since 13.10.0
1416  *
1417  * \retval 0 on success.
1418  * \retval -1 on error.
1419  */
1420 static int sub_persistence_recreate(void *obj)
1421 {
1422         struct persistence_recreate_data *recreate_data = obj;
1423         struct subscription_persistence *persistence = recreate_data->persistence;
1424         pjsip_rx_data *rdata = recreate_data->rdata;
1425         struct ast_sip_endpoint *endpoint;
1426         struct sip_subscription_tree *sub_tree;
1427         struct ast_sip_pubsub_body_generator *generator;
1428         struct ast_sip_subscription_handler *handler;
1429         char *resource;
1430         pjsip_sip_uri *request_uri;
1431         size_t resource_size;
1432         int resp;
1433         struct resource_tree tree;
1434         pjsip_expires_hdr *expires_header;
1435
1436         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1437         resource_size = pj_strlen(&request_uri->user) + 1;
1438         resource = ast_alloca(resource_size);
1439         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1440
1441         /*
1442          * We may want to match without any user options getting
1443          * in the way.
1444          */
1445         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
1446
1447         handler = subscription_get_handler_from_rdata(rdata);
1448         if (!handler || !handler->notifier) {
1449                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1450                         persistence->endpoint);
1451                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1452                 return 0;
1453         }
1454
1455         generator = subscription_get_generator_from_rdata(rdata, handler);
1456         if (!generator) {
1457                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1458                         persistence->endpoint);
1459                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1460                 return 0;
1461         }
1462
1463         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1464                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1465
1466         /* Getting the endpoint may take some time that can affect the expiration. */
1467         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1468                 persistence->endpoint);
1469         if (!endpoint) {
1470                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1471                         persistence->endpoint);
1472                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1473                 return 0;
1474         }
1475
1476         /* Update the expiration header with the new expiration */
1477         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1478                 rdata->msg_info.msg->hdr.next);
1479         if (!expires_header) {
1480                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1481                 if (!expires_header) {
1482                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1483                                 persistence->endpoint);
1484                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1485                         ao2_ref(endpoint, -1);
1486                         return 0;
1487                 }
1488                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1489         }
1490
1491         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1492         if (expires_header->ivalue <= 0) {
1493                 /* The subscription expired since we started recreating the subscription. */
1494                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1495                         persistence->endpoint, persistence->tag);
1496                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1497                 ao2_ref(endpoint, -1);
1498                 return 0;
1499         }
1500
1501         memset(&tree, 0, sizeof(tree));
1502         resp = build_resource_tree(endpoint, handler, resource, &tree,
1503                 ast_sip_pubsub_has_eventlist_support(rdata));
1504         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1505                 pj_status_t dlg_status;
1506
1507                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1508                         &tree, &dlg_status);
1509                 if (!sub_tree) {
1510                         if (dlg_status != PJ_EEXISTS) {
1511                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1512                                         persistence->endpoint);
1513                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1514                         }
1515                 } else {
1516                         struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
1517
1518                         if (!ind) {
1519                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1520                                 goto error;
1521                         }
1522
1523                         ind->sub_tree = ao2_bump(sub_tree);
1524                         ind->expires = expires_header->ivalue;
1525
1526                         sub_tree->persistence = ao2_bump(persistence);
1527                         subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED);
1528                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
1529                                 /* Could not send initial subscribe NOTIFY */
1530                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1531                                 ao2_ref(sub_tree, -1);
1532                                 ast_free(ind);
1533                         }
1534                 }
1535         } else {
1536                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1537         }
1538
1539 error:
1540         resource_tree_destroy(&tree);
1541         ao2_ref(endpoint, -1);
1542
1543         return 0;
1544 }
1545
1546 /*! \brief Callback function to perform the actual recreation of a subscription */
1547 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1548 {
1549         struct subscription_persistence *persistence = obj;
1550         pj_pool_t *pool = arg;
1551         struct ast_taskprocessor *serializer;
1552         pjsip_rx_data rdata;
1553         struct persistence_recreate_data recreate_data;
1554
1555         /* If this subscription has already expired remove it */
1556         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1557                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1558                         persistence->endpoint, persistence->tag);
1559                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1560                 return 0;
1561         }
1562
1563         memset(&rdata, 0, sizeof(rdata));
1564         pj_pool_reset(pool);
1565         rdata.tp_info.pool = pool;
1566
1567         if (ast_sip_create_rdata_with_contact(&rdata, persistence->packet, persistence->src_name,
1568                 persistence->src_port, persistence->transport_key, persistence->local_name,
1569                 persistence->local_port, persistence->contact_uri)) {
1570                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1571                         persistence->endpoint);
1572                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1573                 return 0;
1574         }
1575
1576         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1577                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1578                         persistence->endpoint);
1579                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1580                 return 0;
1581         }
1582
1583         /* Continue the remainder in the distributor serializer */
1584         serializer = ast_sip_get_distributor_serializer(&rdata);
1585         if (!serializer) {
1586                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1587                         persistence->endpoint);
1588                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1589                 return 0;
1590         }
1591         recreate_data.persistence = persistence;
1592         recreate_data.rdata = &rdata;
1593         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1594                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1595                         persistence->endpoint);
1596                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1597         }
1598         ast_taskprocessor_unreference(serializer);
1599
1600         return 0;
1601 }
1602
1603 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1604 static int subscription_persistence_load(void *data)
1605 {
1606         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1607                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1608         pj_pool_t *pool;
1609
1610         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1611                 PJSIP_POOL_RDATA_INC);
1612         if (!pool) {
1613                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1614                 return 0;
1615         }
1616
1617         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1618
1619         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1620
1621         ao2_ref(persisted_subscriptions, -1);
1622         return 0;
1623 }
1624
1625 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1626 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1627 {
1628         struct ast_json_payload *payload;
1629         const char *type;
1630
1631         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1632                 return;
1633         }
1634
1635         payload = stasis_message_data(message);
1636         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1637
1638         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1639          * recreate SIP subscriptions.
1640          */
1641         if (strcmp(type, "FullyBooted")) {
1642                 return;
1643         }
1644
1645         /* This has to be here so the subscription is recreated when the body generator is available */
1646         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1647
1648         /* Once the system is fully booted we don't care anymore */
1649         stasis_unsubscribe(sub);
1650 }
1651
1652 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1653
1654 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1655 {
1656         int num = 0;
1657         struct sip_subscription_tree *i;
1658
1659         if (!on_subscription) {
1660                 return num;
1661         }
1662
1663         AST_RWLIST_RDLOCK(&subscriptions);
1664         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1665                 if (on_subscription(i, arg)) {
1666                         break;
1667                 }
1668                 ++num;
1669         }
1670         AST_RWLIST_UNLOCK(&subscriptions);
1671         return num;
1672 }
1673
1674 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1675                                     struct ast_str **buf)
1676 {
1677         char str[256];
1678         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1679
1680         ast_str_append(buf, 0, "Role: %s\r\n",
1681                        sip_subscription_roles_map[sub_tree->role]);
1682         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1683                        ast_sorcery_object_get_id(sub_tree->endpoint));
1684
1685         if (sub_tree->dlg) {
1686                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1687         } else {
1688                 ast_copy_string(str, "<unknown>", sizeof(str));
1689         }
1690         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1691
1692         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1693
1694         ast_callerid_merge(str, sizeof(str),
1695                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1696                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1697                            "Unknown");
1698
1699         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1700
1701         /* XXX This needs to be done recursively for lists */
1702         if (sub_tree->root->handler->to_ami) {
1703                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1704         }
1705 }
1706
1707
1708 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1709 {
1710         pjsip_dialog *dlg;
1711         pjsip_msg *msg;
1712         pj_str_t name;
1713
1714         dlg = sub->tree->dlg;
1715         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1716         pj_cstr(&name, header);
1717
1718         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1719 }
1720
1721 /* XXX This function is not used. */
1722 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1723                 struct ast_sip_endpoint *endpoint, const char *resource)
1724 {
1725         struct ast_sip_subscription *sub;
1726         pjsip_dialog *dlg;
1727         struct ast_sip_contact *contact;
1728         pj_str_t event;
1729         pjsip_tx_data *tdata;
1730         pjsip_evsub *evsub;
1731         struct sip_subscription_tree *sub_tree = NULL;
1732
1733         sub_tree = allocate_subscription_tree(endpoint, NULL);
1734         if (!sub_tree) {
1735                 return NULL;
1736         }
1737
1738         sub = allocate_subscription(handler, resource, sub_tree);
1739         if (!sub) {
1740                 ao2_cleanup(sub_tree);
1741                 return NULL;
1742         }
1743
1744         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1745         if (!contact || ast_strlen_zero(contact->uri)) {
1746                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1747                                 ast_sorcery_object_get_id(endpoint));
1748                 ao2_ref(sub_tree, -1);
1749                 ao2_cleanup(contact);
1750                 return NULL;
1751         }
1752
1753         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1754         ao2_cleanup(contact);
1755         if (!dlg) {
1756                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1757                 ao2_ref(sub_tree, -1);
1758                 return NULL;
1759         }
1760
1761         pj_cstr(&event, handler->event_name);
1762         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1763         subscription_setup_dialog(sub_tree, dlg);
1764
1765         evsub = sub_tree->evsub;
1766
1767         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1768                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
1769         } else {
1770                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1771                  * being called and terminating the subscription. Therefore, we don't
1772                  * need to decrease the reference count of sub here.
1773                  */
1774                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1775                 ao2_ref(sub_tree, -1);
1776                 return NULL;
1777         }
1778
1779         add_subscription(sub_tree);
1780
1781         return sub;
1782 }
1783
1784 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1785 {
1786         ast_assert(sub->tree->dlg != NULL);
1787         return sub->tree->dlg;
1788 }
1789
1790 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1791 {
1792         ast_assert(sub->tree->endpoint != NULL);
1793         return ao2_bump(sub->tree->endpoint);
1794 }
1795
1796 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1797 {
1798         ast_assert(sub->tree->serializer != NULL);
1799         return sub->tree->serializer;
1800 }
1801
1802 /*!
1803  * \brief Pre-allocate a buffer for the transmission
1804  *
1805  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1806  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1807  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1808  * packet, then we get told the message is too long to be sent.
1809  *
1810  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1811  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1812  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1813  * if the message will fit, and resizing the buffer as required.
1814  *
1815  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1816  * it at 64000 for a couple of reasons:
1817  * 1) Allocating more than 64K at a time is hard to justify
1818  * 2) If the message goes through proxies, those proxies will want to add Via and
1819  *    Record-Route headers, making the message even larger. Giving some space for
1820  *    those headers is a nice thing to do.
1821  *
1822  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1823  * going to impose the same 64K limit as a memory savings.
1824  *
1825  * \param tdata The tdata onto which to allocate a buffer
1826  * \retval 0 Success
1827  * \retval -1 The message is too large
1828  */
1829 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1830 {
1831         int buf_size;
1832         int size = -1;
1833         char *buf;
1834
1835         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1836                 buf = pj_pool_alloc(tdata->pool, buf_size);
1837                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1838         }
1839
1840         if (size == -1) {
1841                 return -1;
1842         }
1843
1844         tdata->buf.start = buf;
1845         tdata->buf.cur = tdata->buf.start;
1846         tdata->buf.end = tdata->buf.start + buf_size;
1847
1848         return 0;
1849 }
1850
1851 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1852 {
1853 #ifdef TEST_FRAMEWORK
1854         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1855         pjsip_evsub *evsub = sub_tree->evsub;
1856 #endif
1857         int res;
1858
1859         if (allocate_tdata_buffer(tdata)) {
1860                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1861                 pjsip_tx_data_dec_ref(tdata);
1862                 return -1;
1863         }
1864
1865         res = pjsip_evsub_send_request(sub_tree->evsub, tdata);
1866
1867         subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST);
1868
1869         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1870                 "StateText: %s\r\n"
1871                 "Endpoint: %s\r\n",
1872                 pjsip_evsub_get_state_name(evsub),
1873                 ast_sorcery_object_get_id(endpoint));
1874
1875         return (res == PJ_SUCCESS ? 0 : -1);
1876 }
1877
1878 /*!
1879  * \brief Add a resource XML element to an RLMI body
1880  *
1881  * Each resource element represents a subscribed resource in the list. This function currently
1882  * will unconditionally add an instance element to each created resource element. Instance
1883  * elements refer to later parts in the multipart body.
1884  *
1885  * \param pool PJLIB allocation pool
1886  * \param cid Content-ID header of the resource
1887  * \param resource_name Name of the resource
1888  * \param resource_uri URI of the resource
1889  * \param state State of the subscribed resource
1890  */
1891 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1892                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1893 {
1894         static pj_str_t cid_name = { "cid", 3 };
1895         pj_xml_node *resource;
1896         pj_xml_node *name;
1897         pj_xml_node *instance;
1898         pj_xml_attr *cid_attr;
1899         char id[6];
1900         char uri[PJSIP_MAX_URL_SIZE];
1901
1902         /* This creates a string representing the Content-ID without the enclosing < > */
1903         const pj_str_t cid_stripped = {
1904                 .ptr = cid->hvalue.ptr + 1,
1905                 .slen = cid->hvalue.slen - 2,
1906         };
1907
1908         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1909         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1910         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1911
1912         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1913         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1914
1915         pj_strdup2(pool, &name->content, resource_name);
1916
1917         ast_generate_random_string(id, sizeof(id));
1918
1919         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1920         ast_sip_presence_xml_create_attr(pool, instance, "state",
1921                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1922
1923         /* Use the PJLIB-util XML library directly here since we are using a
1924          * pj_str_t
1925          */
1926
1927         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1928         pj_xml_add_attr(instance, cid_attr);
1929 }
1930
1931 /*!
1932  * \brief A multipart body part and meta-information
1933  *
1934  * When creating a multipart body part, the end result (the
1935  * pjsip_multipart_part) is hard to inspect without undoing
1936  * a lot of what was done to create it. Therefore, we use this
1937  * structure to store meta-information about the body part.
1938  *
1939  * The main consumer of this is the creator of the RLMI body
1940  * part of a multipart resource list body.
1941  */
1942 struct body_part {
1943         /*! Content-ID header for the body part */
1944         pjsip_generic_string_hdr *cid;
1945         /*! Subscribed resource represented in the body part */
1946         const char *resource;
1947         /*! URI for the subscribed body part */
1948         pjsip_sip_uri *uri;
1949         /*! Subscription state of the resource represented in the body part */
1950         pjsip_evsub_state state;
1951         /*! The actual body part that will be present in the multipart body */
1952         pjsip_multipart_part *part;
1953 };
1954
1955 /*!
1956  * \brief Type declaration for container of body part structures
1957  */
1958 AST_VECTOR(body_part_list, struct body_part *);
1959
1960 /*!
1961  * \brief Create a Content-ID header
1962  *
1963  * Content-ID headers are required by RFC2387 for multipart/related
1964  * bodies. They serve as identifiers for each part of the multipart body.
1965  *
1966  * \param pool PJLIB allocation pool
1967  * \param sub Subscription to a resource
1968  */
1969 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1970                 const struct ast_sip_subscription *sub)
1971 {
1972         static const pj_str_t cid_name = { "Content-ID", 10 };
1973         pjsip_generic_string_hdr *cid;
1974         char id[6];
1975         size_t alloc_size;
1976         pj_str_t cid_value;
1977
1978         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1979         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1980         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1981         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1982                         ast_generate_random_string(id, sizeof(id)),
1983                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1984         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1985
1986         return cid;
1987 }
1988
1989 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1990 {
1991         int num_printed;
1992         pj_xml_node *rlmi = msg_body->data;
1993
1994         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1995         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1996                 return -1;
1997         }
1998
1999         return num_printed;
2000 }
2001
2002 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
2003 {
2004         const pj_xml_node *rlmi = data;
2005
2006         return pj_xml_clone(pool, rlmi);
2007 }
2008
2009 /*!
2010  * \brief Create an RLMI body part for a multipart resource list body
2011  *
2012  * RLMI (Resource list meta information) is a special body type that lists
2013  * the subscribed resources and tells subscribers the number of subscribed
2014  * resources and what other body parts are in the multipart body. The
2015  * RLMI body also has a version number that a subscriber can use to ensure
2016  * that the locally-stored state corresponds to server state.
2017  *
2018  * \param pool The allocation pool
2019  * \param sub The subscription representing the subscribed resource list
2020  * \param body_parts A container of body parts that RLMI will refer to
2021  * \param full_state Indicates whether this is a full or partial state notification
2022  * \return The multipart part representing the RLMI body
2023  */
2024 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2025                 struct body_part_list *body_parts, unsigned int full_state)
2026 {
2027         pj_xml_node *rlmi;
2028         pj_xml_node *name;
2029         pjsip_multipart_part *rlmi_part;
2030         char version_str[32];
2031         char uri[PJSIP_MAX_URL_SIZE];
2032         pjsip_generic_string_hdr *cid;
2033         int i;
2034
2035         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
2036         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
2037
2038         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
2039         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
2040
2041         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
2042         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
2043         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
2044
2045         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
2046         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
2047
2048         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
2049                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
2050
2051                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
2052         }
2053
2054         rlmi_part = pjsip_multipart_create_part(pool);
2055
2056         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
2057         pjsip_media_type_cp(pool, &rlmi_part->body->content_type, &rlmi_media_type);
2058
2059         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2060         rlmi_part->body->clone_data = rlmi_clone_data;
2061         rlmi_part->body->print_body = rlmi_print_body;
2062
2063         cid = generate_content_id_hdr(pool, sub);
2064         pj_list_insert_before(&rlmi_part->hdr, cid);
2065
2066         return rlmi_part;
2067 }
2068
2069 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2070                 unsigned int force_full_state);
2071
2072 /*!
2073  * \brief Destroy a list of body parts
2074  *
2075  * \param parts The container of parts to destroy
2076  */
2077 static void free_body_parts(struct body_part_list *parts)
2078 {
2079         int i;
2080
2081         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2082                 struct body_part *part = AST_VECTOR_GET(parts, i);
2083                 ast_free(part);
2084         }
2085
2086         AST_VECTOR_FREE(parts);
2087 }
2088
2089 /*!
2090  * \brief Allocate and initialize a body part structure
2091  *
2092  * \param pool PJLIB allocation pool
2093  * \param sub Subscription representing a subscribed resource
2094  */
2095 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2096 {
2097         struct body_part *bp;
2098
2099         bp = ast_calloc(1, sizeof(*bp));
2100         if (!bp) {
2101                 return NULL;
2102         }
2103
2104         bp->cid = generate_content_id_hdr(pool, sub);
2105         bp->resource = sub->resource;
2106         bp->state = sub->subscription_state;
2107         bp->uri = sub->uri;
2108
2109         return bp;
2110 }
2111
2112 /*!
2113  * \brief Create a multipart body part for a subscribed resource
2114  *
2115  * \param pool PJLIB allocation pool
2116  * \param sub The subscription representing a subscribed resource
2117  * \param parts A vector of parts to append the created part to.
2118  * \param use_full_state Unused locally, but may be passed to other functions
2119  */
2120 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2121                 struct body_part_list *parts, unsigned int use_full_state)
2122 {
2123         struct body_part *bp;
2124         pjsip_msg_body *body;
2125
2126         bp = allocate_body_part(pool, sub);
2127         if (!bp) {
2128                 return;
2129         }
2130
2131         body = generate_notify_body(pool, sub, use_full_state);
2132         if (!body) {
2133                 /* Partial state was requested and the resource has not changed state */
2134                 ast_free(bp);
2135                 return;
2136         }
2137
2138         bp->part = pjsip_multipart_create_part(pool);
2139         bp->part->body = body;
2140         pj_list_insert_before(&bp->part->hdr, bp->cid);
2141
2142         AST_VECTOR_APPEND(parts, bp);
2143 }
2144
2145 /*!
2146  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2147  *
2148  * \param pool
2149  * \return The multipart message body
2150  */
2151 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2152 {
2153         pjsip_media_type media_type;
2154         pjsip_param *media_type_param;
2155         char boundary[6];
2156         pj_str_t pj_boundary;
2157
2158         pjsip_media_type_init2(&media_type, "multipart", "related");
2159
2160         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2161         pj_list_init(media_type_param);
2162
2163         pj_strdup2(pool, &media_type_param->name, "type");
2164         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2165
2166         pj_list_insert_before(&media_type.param, media_type_param);
2167
2168         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2169         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2170 }
2171
2172 /*!
2173  * \brief Create a resource list body for NOTIFY requests
2174  *
2175  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2176  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2177  * convey state of individual subscribed resources.
2178  *
2179  * \param pool PJLIB allocation pool
2180  * \param sub Subscription details from which to generate body
2181  * \param force_full_state If true, ignore resource list settings and send a full state notification
2182  * \return The generated multipart/related body
2183  */
2184 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2185                 unsigned int force_full_state)
2186 {
2187         int i;
2188         pjsip_multipart_part *rlmi_part;
2189         pjsip_msg_body *multipart;
2190         struct body_part_list body_parts;
2191         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2192
2193         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2194                 return NULL;
2195         }
2196
2197         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2198                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2199         }
2200
2201         /* This can happen if issuing partial state and no children of the list have changed state */
2202         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2203                 return NULL;
2204         }
2205
2206         multipart = create_multipart_body(pool);
2207
2208         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2209         if (!rlmi_part) {
2210                 return NULL;
2211         }
2212         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2213
2214         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2215                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2216         }
2217
2218         free_body_parts(&body_parts);
2219         return multipart;
2220 }
2221
2222 /*!
2223  * \brief Create the body for a NOTIFY request.
2224  *
2225  * \param pool The pool used for allocations
2226  * \param root The root of the subscription tree
2227  * \param force_full_state If true, ignore resource list settings and send a full state notification
2228  */
2229 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2230                 unsigned int force_full_state)
2231 {
2232         pjsip_msg_body *body;
2233
2234         if (AST_VECTOR_SIZE(&root->children) == 0) {
2235                 if (force_full_state || root->body_changed) {
2236                         /* Not a list. We've already generated the body and saved it on the subscription.
2237                          * Use that directly.
2238                          */
2239                         pj_str_t type;
2240                         pj_str_t subtype;
2241                         pj_str_t text;
2242
2243                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2244                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2245                         pj_cstr(&text, ast_str_buffer(root->body_text));
2246
2247                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2248                         root->body_changed = 0;
2249                 } else {
2250                         body = NULL;
2251                 }
2252         } else {
2253                 body = generate_list_body(pool, root, force_full_state);
2254         }
2255
2256         return body;
2257 }
2258
2259 /*!
2260  * \brief Shortcut method to create a Require: eventlist header
2261  */
2262 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2263 {
2264         pjsip_require_hdr *require;
2265
2266         require = pjsip_require_hdr_create(pool);
2267         pj_strdup2(pool, &require->values[0], "eventlist");
2268         require->count = 1;
2269
2270         return require;
2271 }
2272
2273 /*!
2274  * \brief Send a NOTIFY request to a subscriber
2275  *
2276  * \pre sub_tree->dlg is locked
2277  *
2278  * \param sub_tree The subscription tree representing the subscription
2279  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2280  * \retval 0 Success
2281  * \retval non-zero Failure
2282  */
2283 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2284 {
2285         pjsip_evsub *evsub = sub_tree->evsub;
2286         pjsip_tx_data *tdata;
2287
2288         if (ast_shutdown_final()
2289                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2290                 && sub_tree->persistence) {
2291                 return 0;
2292         }
2293
2294         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2295                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2296                 return -1;
2297         }
2298
2299         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2300         if (!tdata->msg->body) {
2301                 pjsip_tx_data_dec_ref(tdata);
2302                 return -1;
2303         }
2304
2305         if (sub_tree->is_list) {
2306                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2307                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2308         }
2309
2310         if (sip_subscription_send_request(sub_tree, tdata)) {
2311                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2312                 return -1;
2313         }
2314
2315         sub_tree->send_scheduled_notify = 0;
2316
2317         return 0;
2318 }
2319
2320 static int serialized_send_notify(void *userdata)
2321 {
2322         struct sip_subscription_tree *sub_tree = userdata;
2323         pjsip_dialog *dlg = sub_tree->dlg;
2324
2325         pjsip_dlg_inc_lock(dlg);
2326
2327         /* It's possible that between when the notification was scheduled
2328          * and now a new SUBSCRIBE arrived requiring full state to be
2329          * sent out in an immediate NOTIFY. It's also possible that we're
2330          * already processing a terminate.  If that has happened, we need to
2331          * bail out here instead of sending the batched NOTIFY.
2332          */
2333
2334         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2335                 || !sub_tree->send_scheduled_notify) {
2336                 pjsip_dlg_dec_lock(dlg);
2337                 ao2_cleanup(sub_tree);
2338                 return 0;
2339         }
2340
2341         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2342                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2343         }
2344
2345         send_notify(sub_tree, 0);
2346
2347         ast_test_suite_event_notify(
2348                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2349                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2350                 "Resource: %s", sub_tree->root->resource);
2351
2352         sub_tree->notify_sched_id = -1;
2353         pjsip_dlg_dec_lock(dlg);
2354         ao2_cleanup(sub_tree);
2355         return 0;
2356 }
2357
2358 static int sched_cb(const void *data)
2359 {
2360         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2361
2362         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2363         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2364                 ao2_cleanup(sub_tree);
2365         }
2366
2367         return 0;
2368 }
2369
2370 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2371 {
2372         /* There's already a notification scheduled */
2373         if (sub_tree->notify_sched_id > -1) {
2374                 return 0;
2375         }
2376
2377         sub_tree->send_scheduled_notify = 1;
2378         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2379         if (sub_tree->notify_sched_id < 0) {
2380                 ao2_cleanup(sub_tree);
2381                 return -1;
2382         }
2383
2384         return 0;
2385 }
2386
2387 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2388                 int terminate)
2389 {
2390         int res;
2391         pjsip_dialog *dlg = sub->tree->dlg;
2392
2393         pjsip_dlg_inc_lock(dlg);
2394
2395         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2396                 pjsip_dlg_dec_lock(dlg);
2397                 return 0;
2398         }
2399
2400         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2401                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2402                 pjsip_dlg_dec_lock(dlg);
2403                 return -1;
2404         }
2405
2406         sub->body_changed = 1;
2407         if (terminate) {
2408                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2409                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2410         }
2411
2412         if (sub->tree->notification_batch_interval) {
2413                 res = schedule_notification(sub->tree);
2414         } else {
2415                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2416                 ao2_ref(sub->tree, +1);
2417                 if (terminate) {
2418                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2419                 }
2420                 res = send_notify(sub->tree, 0);
2421                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2422                                 "Resource: %s",
2423                                 sub->tree->root->resource);
2424                 ao2_ref(sub->tree, -1);
2425         }
2426
2427         pjsip_dlg_dec_lock(dlg);
2428         return res;
2429 }
2430
2431 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2432 {
2433         return sub->uri;
2434 }
2435
2436 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2437 {
2438         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2439 }
2440
2441 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2442 {
2443         pjsip_dialog *dlg;
2444
2445         dlg = sub->tree->dlg;
2446         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2447 }
2448
2449 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2450 {
2451         return sub->resource;
2452 }
2453
2454 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2455 {
2456         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2457 }
2458
2459 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2460 {
2461         pjsip_hdr res_hdr;
2462
2463         /* If this is a persistence recreation the subscription has already been accepted */
2464         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2465                 return 0;
2466         }
2467
2468         pj_list_init(&res_hdr);
2469         if (sub_tree->is_list) {
2470                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2471                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2472         }
2473
2474         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2475 }
2476
2477 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2478 {
2479         return ast_datastores_alloc_datastore(info, uid);
2480 }
2481
2482 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2483 {
2484         return ast_datastores_add(subscription->datastores, datastore);
2485 }
2486
2487 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2488 {
2489         return ast_datastores_find(subscription->datastores, name);
2490 }
2491
2492 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2493 {
2494         ast_datastores_remove(subscription->datastores, name);
2495 }
2496
2497 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2498 {
2499         return subscription->datastores;
2500 }
2501
2502 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2503 {
2504         return ast_datastores_add(publication->datastores, datastore);
2505 }
2506
2507 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2508 {
2509         return ast_datastores_find(publication->datastores, name);
2510 }
2511
2512 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2513 {
2514         ast_datastores_remove(publication->datastores, name);
2515 }
2516
2517 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2518 {
2519         return publication->datastores;
2520 }
2521
2522 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2523
2524 static int publication_hash_fn(const void *obj, const int flags)
2525 {
2526         const struct ast_sip_publication *publication = obj;
2527         const int *entity_tag = obj;
2528
2529         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2530 }
2531
2532 static int publication_cmp_fn(void *obj, void *arg, int flags)
2533 {
2534         const struct ast_sip_publication *publication1 = obj;
2535         const struct ast_sip_publication *publication2 = arg;
2536         const int *entity_tag = arg;
2537
2538         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2539                 CMP_MATCH | CMP_STOP : 0);
2540 }
2541
2542 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2543 {
2544         AST_RWLIST_WRLOCK(&publish_handlers);
2545         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2546         AST_RWLIST_UNLOCK(&publish_handlers);
2547 }
2548
2549 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2550 {
2551         if (ast_strlen_zero(handler->event_name)) {
2552                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2553                 return -1;
2554         }
2555
2556         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2557                 publication_hash_fn, publication_cmp_fn))) {
2558                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2559                         handler->event_name);
2560                 return -1;
2561         }
2562
2563         publish_add_handler(handler);
2564
2565         ast_module_ref(ast_module_info->self);
2566
2567         return 0;
2568 }
2569
2570 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2571 {
2572         struct ast_sip_publish_handler *iter;
2573
2574         AST_RWLIST_WRLOCK(&publish_handlers);
2575         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2576                 if (handler == iter) {
2577                         AST_RWLIST_REMOVE_CURRENT(next);
2578                         ao2_cleanup(handler->publications);
2579                         ast_module_unref(ast_module_info->self);
2580                         break;
2581                 }
2582         }
2583         AST_RWLIST_TRAVERSE_SAFE_END;
2584         AST_RWLIST_UNLOCK(&publish_handlers);
2585 }
2586
2587 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2588
2589 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2590 {
2591         AST_RWLIST_WRLOCK(&subscription_handlers);
2592         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2593         ast_module_ref(ast_module_info->self);
2594         AST_RWLIST_UNLOCK(&subscription_handlers);
2595 }
2596
2597 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2598 {
2599         struct ast_sip_subscription_handler *iter;
2600
2601         AST_RWLIST_RDLOCK(&subscription_handlers);
2602         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2603                 if (!strcmp(iter->event_name, event_name)) {
2604                         break;
2605                 }
2606         }
2607         AST_RWLIST_UNLOCK(&subscription_handlers);
2608         return iter;
2609 }
2610
2611 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2612 {
2613         pj_str_t event;
2614         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2615         struct ast_sip_subscription_handler *existing;
2616         int i = 0;
2617
2618         if (ast_strlen_zero(handler->event_name)) {
2619                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2620                 return -1;
2621         }
2622
2623         existing = find_sub_handler_for_event_name(handler->event_name);
2624         if (existing) {
2625                 ast_log(LOG_ERROR,
2626                         "Unable to register subscription handler for event %s.  A handler is already registered\n",
2627                         handler->event_name);
2628                 return -1;
2629         }
2630
2631         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2632                 pj_cstr(&accept[i], handler->accept[i]);
2633         }
2634
2635         pj_cstr(&event, handler->event_name);
2636
2637         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2638
2639         sub_add_handler(handler);
2640
2641         return 0;
2642 }
2643
2644 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2645 {
2646         struct ast_sip_subscription_handler *iter;
2647
2648         AST_RWLIST_WRLOCK(&subscription_handlers);
2649         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2650                 if (handler == iter) {
2651                         AST_RWLIST_REMOVE_CURRENT(next);
2652                         ast_module_unref(ast_module_info->self);
2653                         break;
2654                 }
2655         }
2656         AST_RWLIST_TRAVERSE_SAFE_END;
2657         AST_RWLIST_UNLOCK(&subscription_handlers);
2658 }
2659
2660 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2661 {
2662         struct ast_sip_pubsub_body_generator *gen;
2663
2664         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2665                 if (!strcmp(gen->type, type)
2666                         && !strcmp(gen->subtype, subtype)) {
2667                         break;
2668                 }
2669         }
2670
2671         return gen;
2672 }
2673
2674 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2675 {
2676         struct ast_sip_pubsub_body_generator *gen;
2677
2678         AST_RWLIST_RDLOCK(&body_generators);
2679         gen = find_body_generator_type_subtype_nolock(type, subtype);
2680         AST_RWLIST_UNLOCK(&body_generators);
2681         return gen;
2682 }
2683
2684 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2685 {
2686         char *accept_copy = ast_strdupa(accept);
2687         char *subtype = accept_copy;
2688         char *type = strsep(&subtype, "/");
2689
2690         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2691                 return NULL;
2692         }
2693
2694         return find_body_generator_type_subtype(type, subtype);
2695 }
2696
2697 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2698                 size_t num_accept, const char *body_type)
2699 {
2700         int i;
2701         struct ast_sip_pubsub_body_generator *generator = NULL;
2702
2703         for (i = 0; i < num_accept; ++i) {
2704                 generator = find_body_generator_accept(accept[i]);
2705                 if (generator) {
2706                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2707                         if (strcmp(generator->body_type, body_type)) {
2708                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2709                                                 generator->type, generator->subtype, generator);
2710                                 generator = NULL;
2711                                 continue;
2712                         }
2713                         break;
2714                 } else {
2715                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2716                 }
2717         }
2718
2719         return generator;
2720 }
2721
2722 static int generate_initial_notify(struct ast_sip_subscription *sub)
2723 {
2724         void *notify_data;
2725         int res;
2726         struct ast_sip_body_data data = {
2727                 .body_type = sub->handler->body_type,
2728         };
2729
2730         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2731                 int i;
2732
2733                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2734                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2735                                 return -1;
2736                         }
2737                 }
2738
2739                 return 0;
2740         }
2741
2742         /* We notify subscription establishment only on the tree leaves. */
2743         if (sub->handler->notifier->subscription_established(sub)) {
2744                 return -1;
2745         }
2746
2747         notify_data = sub->handler->notifier->get_notify_data(sub);
2748         if (!notify_data) {
2749                 return -1;
2750         }
2751
2752         data.body_data = notify_data;
2753
2754         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2755                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2756
2757         ao2_cleanup(notify_data);
2758
2759         return res;
2760 }
2761
2762 static int pubsub_on_refresh_timeout(void *userdata);
2763
2764 static int initial_notify_task(void * obj)
2765 {
2766         struct initial_notify_data *ind = obj;
2767
2768         if (generate_initial_notify(ind->sub_tree->root)) {
2769                 pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE);
2770         } else {
2771                 send_notify(ind->sub_tree, 1);
2772                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2773                         "Resource: %s",
2774                         ind->sub_tree->root->resource);
2775         }
2776
2777         if (ind->expires > -1) {
2778                 char *name = ast_alloca(strlen("->/ ") +
2779                         strlen(ind->sub_tree->persistence->endpoint) +
2780                         strlen(ind->sub_tree->root->resource) +
2781                         strlen(ind->sub_tree->root->handler->event_name) +
2782                         ind->sub_tree->dlg->call_id->id.slen + 1);
2783
2784                 sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint,
2785                         ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name,
2786                         (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr);
2787
2788                 ast_debug(3, "Scheduling timer: %s\n", name);
2789                 ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer,
2790                         ind->expires * 1000, pubsub_on_refresh_timeout, name,
2791                         ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2);
2792                 if (!ind->sub_tree->expiration_task) {
2793                         ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n",
2794                                 ind->expires, name);
2795                 }
2796         }
2797
2798         ao2_ref(ind->sub_tree, -1);
2799         ast_free(ind);
2800
2801         return 0;
2802 }
2803
2804 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2805 {
2806         pjsip_expires_hdr *expires_header;
2807         struct ast_sip_subscription_handler *handler;
2808         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2809         struct sip_subscription_tree *sub_tree;
2810         struct ast_sip_pubsub_body_generator *generator;
2811         char *resource;
2812         pjsip_uri *request_uri;
2813         pjsip_sip_uri *request_uri_sip;
2814         size_t resource_size;
2815         int resp;
2816         struct resource_tree tree;
2817         pj_status_t dlg_status;
2818
2819         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2820         ast_assert(endpoint != NULL);
2821
2822         if (!endpoint->subscription.allow) {
2823                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2824                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2825                 return PJ_TRUE;
2826         }
2827
2828         request_uri = rdata->msg_info.msg->line.req.uri;
2829
2830         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2831                 char uri_str[PJSIP_MAX_URL_SIZE];
2832
2833                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2834                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2835                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2836                 return PJ_TRUE;
2837         }
2838
2839         request_uri_sip = pjsip_uri_get_uri(request_uri);
2840         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2841         resource = ast_alloca(resource_size);
2842         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2843
2844         /*
2845          * We may want to match without any user options getting
2846          * in the way.
2847          */
2848         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
2849
2850         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2851         if (expires_header) {
2852                 if (expires_header->ivalue == 0) {
2853                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2854                                 ast_sorcery_object_get_id(endpoint));
2855                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2856                                 return PJ_TRUE;
2857                 }
2858                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2859                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2860                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2861                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2862                         return PJ_TRUE;
2863                 }
2864         }
2865
2866         handler = subscription_get_handler_from_rdata(rdata);
2867         if (!handler) {
2868                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2869                 return PJ_TRUE;
2870         }
2871
2872         generator = subscription_get_generator_from_rdata(rdata, handler);
2873         if (!generator) {
2874                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2875                 return PJ_TRUE;
2876         }
2877
2878         memset(&tree, 0, sizeof(tree));
2879         resp = build_resource_tree(endpoint, handler, resource, &tree,
2880                 ast_sip_pubsub_has_eventlist_support(rdata));
2881         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2882                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2883                 resource_tree_destroy(&tree);
2884                 return PJ_TRUE;
2885         }
2886
2887         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2888         if (!sub_tree) {
2889                 if (dlg_status != PJ_EEXISTS) {
2890                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2891                 }
2892         } else {
2893                 struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
2894
2895                 if (!ind) {
2896                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2897                         resource_tree_destroy(&tree);
2898                         return PJ_TRUE;
2899                 }
2900
2901                 ind->sub_tree = ao2_bump(sub_tree);
2902                 /* Since this is a normal subscribe, pjproject takes care of the timer */
2903                 ind->expires = -1;
2904
2905                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2906                 subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
2907                 sip_subscription_accept(sub_tree, rdata, resp);
2908                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
2909                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2910                         ao2_ref(sub_tree, -1);
2911                         ast_free(ind);
2912                 }
2913         }
2914
2915         resource_tree_destroy(&tree);
2916         return PJ_TRUE;
2917 }
2918
2919 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2920 {
2921         struct ast_sip_publish_handler *iter = NULL;
2922
2923         AST_RWLIST_RDLOCK(&publish_handlers);
2924         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2925                 if (strcmp(event, iter->event_name)) {
2926                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2927                         continue;
2928                 }
2929                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2930                 break;
2931         }
2932         AST_RWLIST_UNLOCK(&publish_handlers);
2933
2934         return iter;
2935 }
2936
2937 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2938         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2939 {
2940         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2941
2942         if (etag_hdr) {
2943                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2944
2945                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2946
2947                 if (sscanf(etag, "%30d", entity_id) != 1) {
2948                         return SIP_PUBLISH_UNKNOWN;
2949                 }
2950         }
2951
2952         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2953
2954         if (!(*expires)) {
2955                 return SIP_PUBLISH_REMOVE;
2956         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2957                 return SIP_PUBLISH_INITIAL;
2958         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2959                 return SIP_PUBLISH_REFRESH;
2960         } else if (etag_hdr && rdata->msg_info.msg->body) {
2961                 return SIP_PUBLISH_MODIFY;
2962         }
2963
2964         return SIP_PUBLISH_UNKNOWN;
2965 }
2966
2967 /*! \brief Internal destructor for publications */
2968 static void publication_destroy_fn(void *obj)
2969 {
2970         struct ast_sip_publication *publication = obj;
2971
2972         ast_debug(3, "Destroying SIP publication\n");
2973
2974         ao2_cleanup(publication->datastores);
2975         ao2_cleanup(publication->endpoint);
2976 }
2977
2978 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2979         const char *resource, const char *event_configuration_name)
2980 {
2981         struct ast_sip_publication *publication;
2982         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2983         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2984         char *dst;
2985
2986         ast_assert(endpoint != NULL);
2987
2988         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2989                 return NULL;
2990         }
2991
2992         if (!(publication->datastores = ast_datastores_alloc())) {
2993                 ao2_ref(publication, -1);
2994                 return NULL;
2995         }
2996
2997         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2998         ao2_ref(endpoint, +1);
2999         publication->endpoint = endpoint;
3000         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
3001         publication->sched_id = -1;
3002         dst = publication->data;
3003         publication->resource = strcpy(dst, resource);
3004         dst += resource_len;
3005         publication->event_configuration_name = strcpy(dst, event_configuration_name);
3006
3007         return publication;
3008 }
3009
3010 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
3011                 pjsip_rx_data *rdata)
3012 {
3013         pjsip_tx_data *tdata;
3014         pjsip_transaction *tsx;
3015
3016         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
3017                 return -1;
3018         }
3019
3020         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
3021                 char buf[30];
3022
3023                 snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
3024                 ast_sip_add_header(tdata, "SIP-ETag", buf);
3025
3026                 snprintf(buf, sizeof(buf), "%d", pub->expires);
3027                 ast_sip_add_header(tdata, "Expires", buf);
3028         }
3029
3030         if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
3031                 pjsip_tx_data_dec_ref(tdata);
3032                 return -1;
3033         }
3034
3035         pjsip_tsx_recv_msg(tsx, rdata);
3036
3037         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
3038                 pjsip_tx_data_dec_ref(tdata);
3039                 return -1;
3040         }
3041
3042         return 0;
3043 }
3044
3045 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
3046         struct ast_sip_publish_handler *handler)
3047 {
3048         struct ast_sip_publication *publication;
3049         char *resource_name;
3050         size_t resource_size;
3051         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
3052         struct ast_variable *event_configuration_name = NULL;
3053         pjsip_uri *request_uri;
3054         pjsip_sip_uri *request_uri_sip;
3055         int resp;
3056
3057         request_uri = rdata->msg_info.msg->line.req.uri;
3058
3059         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
3060                 char uri_str[PJSIP_MAX_URL_SIZE];
3061
3062                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
3063                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
3064                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
3065                 return NULL;
3066         }
3067
3068         request_uri_sip = pjsip_uri_get_uri(request_uri);
3069         resource_size = pj_strlen(&request_uri_sip->user) + 1;
3070         resource_name = ast_alloca(resource_size);
3071         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
3072
3073         /*
3074          * We may want to match without any user options getting
3075          * in the way.
3076          */
3077         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
3078
3079         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
3080         if (!resource) {
3081                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
3082                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3083                 return NULL;
3084         }
3085
3086         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
3087                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
3088                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
3089                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
3090                 return NULL;
3091         }
3092
3093         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
3094                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
3095                         break;
3096                 }
3097         }
3098
3099         if (!event_configuration_name) {
3100                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
3101                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3102                 return NULL;
3103         }
3104
3105         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
3106
3107         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
3108                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
3109                 return NULL;
3110         }
3111
3112         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3113
3114         if (!publication) {
3115                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3116                 return NULL;
3117         }
3118
3119         publication->handler = handler;
3120         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3121                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3122                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3123                 ao2_cleanup(publication);
3124                 return NULL;
3125         }
3126
3127         sip_publication_respond(publication, resp, rdata);
3128
3129         return publication;
3130 }
3131
3132 static int publish_expire_callback(void *data)
3133 {
3134         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3135
3136         if (publication->handler->publish_expire) {
3137                 publication->handler->publish_expire(publication);
3138         }
3139
3140         return 0;
3141 }
3142
3143 static int publish_expire(const void *data)
3144 {
3145         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3146
3147         ao2_unlink(publication->handler->publications, publication);
3148         publication->sched_id = -1;
3149
3150         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3151                 ao2_cleanup(publication);
3152         }
3153
3154         return 0;
3155 }
3156
3157 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3158 {
3159         pjsip_event_hdr *event_header;
3160         struct ast_sip_publish_handler *handler;
3161         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3162         char event[32];
3163         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3164         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3165         enum sip_publish_type publish_type;
3166         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3167         int expires = 0, entity_id, response = 0;
3168
3169         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3170         ast_assert(endpoint != NULL);
3171
3172         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3173         if (!event_header) {
3174                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3175                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3176                 return PJ_TRUE;
3177         }
3178         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3179
3180         handler = find_pub_handler(event);
3181         if (!handler) {
3182                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3183                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3184                 return PJ_TRUE;
3185         }
3186
3187         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3188
3189         /* If this is not an initial publish ensure that a publication is present */
3190         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3191                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3192                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3193
3194                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3195                                 NULL, NULL);
3196                         return PJ_TRUE;
3197                 }
3198
3199                 /* Per the RFC every response has to have a new entity tag */
3200                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3201
3202                 /* Update the expires here so that the created responses will contain the correct value */
3203                 publication->expires = expires;
3204         }
3205
3206         switch (publish_type) {
3207                 case SIP_PUBLISH_INITIAL:
3208                         publication = publish_request_initial(endpoint, rdata, handler);
3209                         break;
3210                 case SIP_PUBLISH_REFRESH:
3211                 case SIP_PUBLISH_MODIFY:
3212                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3213                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3214                                 /* If an error occurs we want to terminate the publication */
3215                                 expires = 0;
3216                         }
3217                         response = 200;
3218                         break;
3219                 case SIP_PUBLISH_REMOVE:
3220                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3221                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3222                         response = 200;
3223                         break;
3224                 case SIP_PUBLISH_UNKNOWN:
3225                 default:
3226                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3227                         break;
3228         }
3229
3230         if (publication) {
3231