Merge "rtp_engine/res_rtp_asterisk: Fix RTP struct reentrancy crashes."
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/test.h"
47 #include "res_pjsip/include/res_pjsip_private.h"
48 #include "asterisk/res_pjsip_presence_xml.h"
49
50 /*** DOCUMENTATION
51         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
52                 <synopsis>
53                         Lists subscriptions.
54                 </synopsis>
55                 <syntax />
56                 <description>
57                         <para>
58                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
59                         is issued for each subscription object.  Once all detail events are completed an
60                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
61                         </para>
62                 </description>
63         </manager>
64         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
65                 <synopsis>
66                         Lists subscriptions.
67                 </synopsis>
68                 <syntax />
69                 <description>
70                         <para>
71                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
72                         is issued for each subscription object.  Once all detail events are completed an
73                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
74                         </para>
75                 </description>
76         </manager>
77         <manager name="PJSIPShowResourceLists" language="en_US">
78                 <synopsis>
79                         Displays settings for configured resource lists.
80                 </synopsis>
81                 <syntax />
82                 <description>
83                         <para>
84                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
85                         is issued for each resource list object.  Once all detail events are completed a
86                         <literal>ResourceListDetailComplete</literal> event is issued.
87                         </para>
88                 </description>
89         </manager>
90
91         <configInfo name="res_pjsip_pubsub" language="en_US">
92                 <synopsis>Module that implements publish and subscribe support.</synopsis>
93                 <configFile name="pjsip.conf">
94                         <configObject name="subscription_persistence">
95                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
96                                 <configOption name="packet">
97                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
98                                 </configOption>
99                                 <configOption name="src_name">
100                                         <synopsis>The source address of the subscription</synopsis>
101                                 </configOption>
102                                 <configOption name="src_port">
103                                         <synopsis>The source port of the subscription</synopsis>
104                                 </configOption>
105                                 <configOption name="transport_key">
106                                         <synopsis>The type of transport the subscription was received on</synopsis>
107                                 </configOption>
108                                 <configOption name="local_name">
109                                         <synopsis>The local address the subscription was received on</synopsis>
110                                 </configOption>
111                                 <configOption name="local_port">
112                                         <synopsis>The local port the subscription was received on</synopsis>
113                                 </configOption>
114                                 <configOption name="cseq">
115                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
116                                 </configOption>
117                                 <configOption name="tag">
118                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
119                                 </configOption>
120                                 <configOption name="endpoint">
121                                         <synopsis>The name of the endpoint that subscribed</synopsis>
122                                 </configOption>
123                                 <configOption name="expires">
124                                         <synopsis>The time at which the subscription expires</synopsis>
125                                 </configOption>
126                                 <configOption name="contact_uri">
127                                         <synopsis>The Contact URI of the dialog for the subscription</synopsis>
128                                 </configOption>
129                         </configObject>
130                         <configObject name="resource_list">
131                                 <synopsis>Resource list configuration parameters.</synopsis>
132                                 <description>
133                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
134                                         to be specified. This can be useful to decrease the amount of subscription traffic
135                                         that a server has to process.</para>
136                                         <note>
137                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
138                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
139                                                 will need to make adjustments.</para>
140                                         </note>
141                                 </description>
142                                 <configOption name="type">
143                                         <synopsis>Must be of type 'resource_list'</synopsis>
144                                 </configOption>
145                                 <configOption name="event">
146                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
147                                         <description><para>
148                                                 The SIP event package describes the types of resources that Asterisk reports
149                                                 the state of.
150                                         </para>
151                                                 <enumlist>
152                                                         <enum name="presence"><para>
153                                                                 Device state and presence reporting.
154                                                         </para></enum>
155                                                         <enum name="dialog"><para>
156                                                                 This is identical to <replaceable>presence</replaceable>.
157                                                         </para></enum>
158                                                         <enum name="message-summary"><para>
159                                                                 Message-waiting indication (MWI) reporting.
160                                                         </para></enum>
161                                                 </enumlist>
162                                         </description>
163                                 </configOption>
164                                 <configOption name="list_item">
165                                         <synopsis>The name of a resource to report state on</synopsis>
166                                         <description>
167                                                 <para>In general Asterisk looks up list items in the following way:</para>
168                                                 <para>1. Check if the list item refers to another configured resource list.</para>
169                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
170                                                    to find the specified resource.</para>
171                                                 <para>The second part means that the way the list item is specified depends
172                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
173                                                 set to <literal>presence</literal>, then list items should be in the form of
174                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
175                                                 names should be listed.</para>
176                                         </description>
177                                 </configOption>
178                                 <configOption name="full_state" default="no">
179                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
180                                         <description>
181                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
182                                                 a notification that contains the state of all resources in the list. If the option is
183                                                 disabled, Asterisk will construct a notification that only contains the states of
184                                                 resources that have changed.</para>
185                                                 <note>
186                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
187                                                         to send a notification with the states of all resources in the list. When a subscriber
188                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
189                                                         notification.</para>
190                                                 </note>
191                                         </description>
192                                 </configOption>
193                                 <configOption name="notification_batch_interval" default="0">
194                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
195                                         <description>
196                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
197                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
198                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
199                                                 many notifications.</para>
200                                         </description>
201                                 </configOption>
202                         </configObject>
203                         <configObject name="inbound-publication">
204                                 <synopsis>The configuration for inbound publications</synopsis>
205                                 <configOption name="endpoint" default="">
206                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
207                                 </configOption>
208                                 <configOption name="type">
209                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
210                                 </configOption>
211                         </configObject>
212                 </configFile>
213         </configInfo>
214  ***/
215
216 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
217
218 static struct pjsip_module pubsub_module = {
219         .name = { "PubSub Module", 13 },
220         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
221         .on_rx_request = pubsub_on_rx_request,
222 };
223
224 #define MOD_DATA_PERSISTENCE "sub_persistence"
225 #define MOD_DATA_MSG "sub_msg"
226
227 static const pj_str_t str_event_name = { "Event", 5 };
228
229 /*! \brief Scheduler used for automatically expiring publications */
230 static struct ast_sched_context *sched;
231
232 /*! \brief Number of buckets for publications (on a per handler) */
233 #define PUBLICATIONS_BUCKETS 37
234
235 /*! \brief Default expiration time for PUBLISH if one is not specified */
236 #define DEFAULT_PUBLISH_EXPIRES 3600
237
238 /*! \brief Number of buckets for subscription datastore */
239 #define DATASTORE_BUCKETS 53
240
241 /*! \brief Default expiration for subscriptions */
242 #define DEFAULT_EXPIRES 3600
243
244 /*! \brief Defined method for PUBLISH */
245 const pjsip_method pjsip_publish_method =
246 {
247         PJSIP_OTHER_METHOD,
248         { "PUBLISH", 7 }
249 };
250
251 /*!
252  * \brief The types of PUBLISH messages defined in RFC 3903
253  */
254 enum sip_publish_type {
255         /*!
256          * \brief Unknown
257          *
258          * \details
259          * This actually is not defined in RFC 3903. We use this as a constant
260          * to indicate that an incoming PUBLISH does not fit into any of the
261          * other categories and is thus invalid.
262          */
263         SIP_PUBLISH_UNKNOWN,
264
265         /*!
266          * \brief Initial
267          *
268          * \details
269          * The first PUBLISH sent. This will contain a non-zero Expires header
270          * as well as a body that indicates the current state of the endpoint
271          * that has sent the message. The initial PUBLISH is the only type
272          * of PUBLISH to not contain a Sip-If-Match header in it.
273          */
274         SIP_PUBLISH_INITIAL,
275
276         /*!
277          * \brief Refresh
278          *
279          * \details
280          * Used to keep a published state from expiring. This will contain a
281          * non-zero Expires header but no body since its purpose is not to
282          * update state.
283          */
284         SIP_PUBLISH_REFRESH,
285
286         /*!
287          * \brief Modify
288          *
289          * \details
290          * Used to change state from its previous value. This will contain
291          * a body updating the published state. May or may not contain an
292          * Expires header.
293          */
294         SIP_PUBLISH_MODIFY,
295
296         /*!
297          * \brief Remove
298          *
299          * \details
300          * Used to remove published state from an ESC. This will contain
301          * an Expires header set to 0 and likely no body.
302          */
303         SIP_PUBLISH_REMOVE,
304 };
305
306 /*!
307  * \brief A vector of strings commonly used throughout this module
308  */
309 AST_VECTOR(resources, const char *);
310
311 /*!
312  * \brief Resource list configuration item
313  */
314 struct resource_list {
315         SORCERY_OBJECT(details);
316         /*! SIP event package the list uses. */
317         char event[32];
318         /*! Strings representing resources in the list. */
319         struct resources items;
320         /*! Indicates if Asterisk sends full or partial state on notifications. */
321         unsigned int full_state;
322         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
323         unsigned int notification_batch_interval;
324 };
325
326 /*!
327  * Used to create new entity IDs by ESCs.
328  */
329 static int esc_etag_counter;
330
331 /*!
332  * \brief Structure representing a SIP publication
333  */
334 struct ast_sip_publication {
335         /*! Publication datastores set up by handlers */
336         struct ao2_container *datastores;
337         /*! \brief Entity tag for the publication */
338         int entity_tag;
339         /*! \brief Handler for this publication */
340         struct ast_sip_publish_handler *handler;
341         /*! \brief The endpoint with which the subscription is communicating */
342         struct ast_sip_endpoint *endpoint;
343         /*! \brief Expiration time of the publication */
344         int expires;
345         /*! \brief Scheduled item for expiration of publication */
346         int sched_id;
347         /*! \brief The resource the publication is to */
348         char *resource;
349         /*! \brief The name of the event type configuration */
350         char *event_configuration_name;
351         /*! \brief Data containing the above */
352         char data[0];
353 };
354
355
356 /*!
357  * \brief Structure used for persisting an inbound subscription
358  */
359 struct subscription_persistence {
360         /*! Sorcery object details */
361         SORCERY_OBJECT(details);
362         /*! The name of the endpoint involved in the subscription */
363         char *endpoint;
364         /*! SIP message that creates the subscription */
365         char packet[PJSIP_MAX_PKT_LEN];
366         /*! Source address of the message */
367         char src_name[PJ_INET6_ADDRSTRLEN];
368         /*! Source port of the message */
369         int src_port;
370         /*! Local transport key type */
371         char transport_key[32];
372         /*! Local transport address */
373         char local_name[PJ_INET6_ADDRSTRLEN];
374         /*! Local transport port */
375         int local_port;
376         /*! Next CSeq to use for message */
377         unsigned int cseq;
378         /*! Local tag of the dialog */
379         char *tag;
380         /*! When this subscription expires */
381         struct timeval expires;
382         /*! Contact URI */
383         char contact_uri[PJSIP_MAX_URL_SIZE];
384 };
385
386 /*!
387  * \brief The state of the subscription tree
388  */
389 enum sip_subscription_tree_state {
390         /*! Normal operation */
391         SIP_SUB_TREE_NORMAL = 0,
392         /*! A terminate has been requested by Asterisk, the client, or pjproject */
393         SIP_SUB_TREE_TERMINATE_PENDING,
394         /*! The terminate is in progress */
395         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
396         /*! The terminate process has finished and the subscription tree is no longer valid */
397         SIP_SUB_TREE_TERMINATED,
398 };
399
400 static char *sub_tree_state_description[] = {
401         "Normal",
402         "TerminatePending",
403         "TerminateInProgress",
404         "Terminated"
405 };
406
407 /*!
408  * \brief A tree of SIP subscriptions
409  *
410  * Because of the ability to subscribe to resource lists, a SIP
411  * subscription can result in a tree of subscriptions being created.
412  * This structure represents the information relevant to the subscription
413  * as a whole, to include the underlying PJSIP structure for the
414  * subscription.
415  */
416 struct sip_subscription_tree {
417         /*! The endpoint with which the subscription is communicating */
418         struct ast_sip_endpoint *endpoint;
419         /*! Serializer on which to place operations for this subscription */
420         struct ast_taskprocessor *serializer;
421         /*! The role for this subscription */
422         enum ast_sip_subscription_role role;
423         /*! Persistence information */
424         struct subscription_persistence *persistence;
425         /*! The underlying PJSIP event subscription structure */
426         pjsip_evsub *evsub;
427         /*! The underlying PJSIP dialog */
428         pjsip_dialog *dlg;
429         /*! Interval to use for batching notifications */
430         unsigned int notification_batch_interval;
431         /*! Scheduler ID for batched notification */
432         int notify_sched_id;
433         /*! Indicator if scheduled batched notification should be sent */
434         unsigned int send_scheduled_notify;
435         /*! The root of the subscription tree */
436         struct ast_sip_subscription *root;
437         /*! Is this subscription to a list? */
438         int is_list;
439         /*! Next item in the list */
440         AST_LIST_ENTRY(sip_subscription_tree) next;
441         /*! Subscription tree state */
442         enum sip_subscription_tree_state state;
443         /*! On asterisk restart, this is the task data used
444          * to restart the expiration timer if pjproject isn't
445          * capable of restarting the timer.
446          */
447         struct ast_sip_sched_task *expiration_task;
448 };
449
450 /*!
451  * \brief Structure representing a "virtual" SIP subscription.
452  *
453  * This structure serves a dual purpose. Structurally, it is
454  * the constructed tree of subscriptions based on the resources
455  * being subscribed to. API-wise, this serves as the handle that
456  * subscription handlers use in order to interact with the pubsub API.
457  */
458 struct ast_sip_subscription {
459         /*! Subscription datastores set up by handlers */
460         struct ao2_container *datastores;
461         /*! The handler for this subscription */
462         const struct ast_sip_subscription_handler *handler;
463         /*! Pointer to the base of the tree */
464         struct sip_subscription_tree *tree;
465         /*! Body generaator for NOTIFYs */
466         struct ast_sip_pubsub_body_generator *body_generator;
467         /*! Vector of child subscriptions */
468         AST_VECTOR(, struct ast_sip_subscription *) children;
469         /*! Saved NOTIFY body text for this subscription */
470         struct ast_str *body_text;
471         /*! Indicator that the body text has changed since the last notification */
472         int body_changed;
473         /*! The current state of the subscription */
474         pjsip_evsub_state subscription_state;
475         /*! For lists, the current version to place in the RLMI body */
476         unsigned int version;
477         /*! For lists, indicates if full state should always be communicated. */
478         unsigned int full_state;
479         /*! URI associated with the subscription */
480         pjsip_sip_uri *uri;
481         /*! Name of resource being subscribed to */
482         char resource[0];
483 };
484
485 /*!
486  * \brief Structure representing a publication resource
487  */
488 struct ast_sip_publication_resource {
489         /*! \brief Sorcery object details */
490         SORCERY_OBJECT(details);
491         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
492         char *endpoint;
493         /*! \brief Mapping for event types to configuration */
494         struct ast_variable *events;
495 };
496
497 static const char *sip_subscription_roles_map[] = {
498         [AST_SIP_SUBSCRIBER] = "Subscriber",
499         [AST_SIP_NOTIFIER] = "Notifier"
500 };
501
502 enum sip_persistence_update_type {
503         /*! Called from send request */
504         SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0,
505         /*! Subscription created from initial client request */
506         SUBSCRIPTION_PERSISTENCE_CREATED,
507         /*! Subscription recreated by asterisk on startup */
508         SUBSCRIPTION_PERSISTENCE_RECREATED,
509         /*! Subscription created from client refresh */
510         SUBSCRIPTION_PERSISTENCE_REFRESHED,
511 };
512
513 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
514
515 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
516 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
517
518 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
519 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
520                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
521 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
522                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
523 static void pubsub_on_client_refresh(pjsip_evsub *sub);
524 static void pubsub_on_server_timeout(pjsip_evsub *sub);
525  
526 static pjsip_evsub_user pubsub_cb = {
527         .on_evsub_state = pubsub_on_evsub_state,
528         .on_rx_refresh = pubsub_on_rx_refresh,
529         .on_rx_notify = pubsub_on_rx_notify,
530         .on_client_refresh = pubsub_on_client_refresh,
531         .on_server_timeout = pubsub_on_server_timeout,
532 };
533
534 /*! \brief Destructor for publication resource */
535 static void publication_resource_destroy(void *obj)
536 {
537         struct ast_sip_publication_resource *resource = obj;
538
539         ast_free(resource->endpoint);
540         ast_variables_destroy(resource->events);
541 }
542
543 /*! \brief Allocator for publication resource */
544 static void *publication_resource_alloc(const char *name)
545 {
546         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
547 }
548
549 /*! \brief Destructor for subscription persistence */
550 static void subscription_persistence_destroy(void *obj)
551 {
552         struct subscription_persistence *persistence = obj;
553
554         ast_free(persistence->endpoint);
555         ast_free(persistence->tag);
556 }
557
558 /*! \brief Allocator for subscription persistence */
559 static void *subscription_persistence_alloc(const char *name)
560 {
561         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
562 }
563
564 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
565 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
566 {
567         char tag[PJ_GUID_STRING_LENGTH + 1];
568
569         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
570          * look it up by id at all.
571          */
572         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
573                 "subscription_persistence", NULL);
574
575         pjsip_dialog *dlg = sub_tree->dlg;
576
577         if (!persistence) {
578                 return NULL;
579         }
580
581         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
582         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
583         persistence->tag = ast_strdup(tag);
584
585         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
586         return persistence;
587 }
588
589 /*! \brief Function which updates persistence information of a subscription in sorcery */
590 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
591         pjsip_rx_data *rdata, enum sip_persistence_update_type type)
592 {
593         pjsip_dialog *dlg;
594
595         if (!sub_tree->persistence) {
596                 return;
597         }
598
599         ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint,
600                 sub_tree->root->resource);
601
602         dlg = sub_tree->dlg;
603         sub_tree->persistence->cseq = dlg->local.cseq;
604
605         if (rdata) {
606                 int expires;
607                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
608                 pjsip_contact_hdr *contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
609
610                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
611                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
612
613                 pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri,
614                         sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri));
615
616                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
617                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
618                  * will always point to the proper SIP message that is to be processed. When updating subscription
619                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
620                  * only ever have a single SIP message on it, and so we base persistence on that.
621                  */
622                 if (type == SUBSCRIPTION_PERSISTENCE_CREATED
623                         || type == SUBSCRIPTION_PERSISTENCE_RECREATED) {
624                         if (rdata->msg_info.msg_buf) {
625                                 ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
626                                                 MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
627                         } else {
628                                 ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
629                                                 sizeof(sub_tree->persistence->packet));
630                         }
631                 }
632                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
633                                 sizeof(sub_tree->persistence->src_name));
634                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
635                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
636                         sizeof(sub_tree->persistence->transport_key));
637                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
638                         sizeof(sub_tree->persistence->local_name));
639                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
640         }
641
642         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
643 }
644
645 /*! \brief Function which removes persistence of a subscription from sorcery */
646 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
647 {
648         if (!sub_tree->persistence) {
649                 return;
650         }
651
652         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
653         ao2_ref(sub_tree->persistence, -1);
654         sub_tree->persistence = NULL;
655 }
656
657
658 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
659 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
660                 size_t num_accept, const char *body_type);
661
662 /*! \brief Retrieve a handler using the Event header of an rdata message */
663 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
664 {
665         pjsip_event_hdr *event_header;
666         char event[32];
667         struct ast_sip_subscription_handler *handler;
668
669         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
670         if (!event_header) {
671                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
672                 return NULL;
673         }
674         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
675
676         handler = find_sub_handler_for_event_name(event);
677         if (!handler) {
678                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
679         }
680
681         return handler;
682 }
683
684 /*!
685  * \brief Accept headers that are exceptions to the rule
686  *
687  * Typically, when a SUBSCRIBE arrives, we attempt to find a
688  * body generator that matches one of the Accept headers in
689  * the request. When subscribing to a single resource, this works
690  * great. However, when subscribing to a list, things work
691  * differently. Most Accept header values are fine, but there
692  * are a couple that are endemic to resource lists that need
693  * to be ignored when searching for a body generator to use
694  * for the individual resources of the subscription.
695  */
696 const char *accept_exceptions[] =  {
697         "multipart/related",
698         "application/rlmi+xml",
699 };
700
701 /*!
702  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
703  *
704  * \retval 1 This Accept header value is an exception to the rule.
705  * \retval 0 This Accept header is not an exception to the rule.
706  */
707 static int exceptional_accept(const pj_str_t *accept)
708 {
709         int i;
710
711         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
712                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
713                         return 1;
714                 }
715         }
716
717         return 0;
718 }
719
720 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
721 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
722         const struct ast_sip_subscription_handler *handler)
723 {
724         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
725         char accept[AST_SIP_MAX_ACCEPT][64];
726         size_t num_accept_headers = 0;
727
728         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
729                 int i;
730
731                 for (i = 0; i < accept_header->count; ++i) {
732                         if (!exceptional_accept(&accept_header->values[i])) {
733                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
734                                 ++num_accept_headers;
735                         }
736                 }
737         }
738
739         if (num_accept_headers == 0) {
740                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
741                  * the default accept type for the event package is to be used.
742                  */
743                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
744                 num_accept_headers = 1;
745         }
746
747         return find_body_generator(accept, num_accept_headers, handler->body_type);
748 }
749
750 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
751  *
752  *  \retval 1 rdata has an eventlist containing supported header
753  *  \retval 0 rdata doesn't have an eventlist containing supported header
754  */
755 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
756 {
757         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
758
759         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
760                 int i;
761
762                 for (i = 0; i < supported_header->count; i++) {
763                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
764                                 return 1;
765                         }
766                 }
767         }
768
769         return 0;
770 }
771
772 struct resource_tree;
773
774 /*!
775  * \brief A node for a resource tree.
776  */
777 struct tree_node {
778         AST_VECTOR(, struct tree_node *) children;
779         unsigned int full_state;
780         char resource[0];
781 };
782
783 /*!
784  * \brief Helper function for retrieving a resource list for a given event.
785  *
786  * This will retrieve a resource list that corresponds to the resource and event provided.
787  *
788  * \param resource The name of the resource list to retrieve
789  * \param event The expected event name on the resource list
790  */
791 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
792 {
793         struct resource_list *list;
794
795         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
796         if (!list) {
797                 return NULL;
798         }
799
800         if (strcmp(list->event, event)) {
801                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
802                                 resource, list->event, event);
803                 ao2_cleanup(list);
804                 return NULL;
805         }
806
807         return list;
808 }
809
810 /*!
811  * \brief Allocate a tree node
812  *
813  * In addition to allocating and initializing the tree node, the node is also added
814  * to the vector of visited resources. See \ref build_resource_tree for more information
815  * on the visited resources.
816  *
817  * \param resource The name of the resource for this tree node.
818  * \param visited The vector of resources that have been visited.
819  * \param if allocating a list, indicate whether full state is requested in notifications.
820  * \retval NULL Allocation failure.
821  * \retval non-NULL The newly-allocated tree_node
822  */
823 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
824 {
825         struct tree_node *node;
826
827         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
828         if (!node) {
829                 return NULL;
830         }
831
832         strcpy(node->resource, resource);
833         if (AST_VECTOR_INIT(&node->children, 4)) {
834                 ast_free(node);
835                 return NULL;
836         }
837         node->full_state = full_state;
838
839         if (visited) {
840                 AST_VECTOR_APPEND(visited, resource);
841         }
842         return node;
843 }
844
845 /*!
846  * \brief Destructor for a tree node
847  *
848  * This function calls recursively in order to destroy
849  * all nodes lower in the tree from the given node in
850  * addition to the node itself.
851  *
852  * \param node The node to destroy.
853  */
854 static void tree_node_destroy(struct tree_node *node)
855 {
856         int i;
857         if (!node) {
858                 return;
859         }
860
861         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
862                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
863         }
864         AST_VECTOR_FREE(&node->children);
865         ast_free(node);
866 }
867
868 /*!
869  * \brief Determine if this resource has been visited already
870  *
871  * See \ref build_resource_tree for more information
872  *
873  * \param resource The resource currently being visited
874  * \param visited The resources that have previously been visited
875  */
876 static int have_visited(const char *resource, struct resources *visited)
877 {
878         int i;
879
880         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
881                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
882                         return 1;
883                 }
884         }
885
886         return 0;
887 }
888
889 /*!
890  * \brief Build child nodes for a given parent.
891  *
892  * This iterates through the items on a resource list and creates tree nodes for each one. The
893  * tree nodes created are children of the supplied parent node. If an item in the resource
894  * list is itself a list, then this function is called recursively to provide children for
895  * the the new node.
896  *
897  * If an item in a resource list is not a list, then the supplied subscription handler is
898  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
899  * is used to determine if the node can be added to the tree or not.
900  *
901  * If a parent node ends up having no child nodes added under it, then the parent node is
902  * pruned from the tree.
903  *
904  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
905  * \param handler The subscription handler for leaf nodes in the tree.
906  * \param list The configured resource list from which the child node is being built.
907  * \param parent The parent node for these children.
908  * \param visited The resources that have already been visited.
909  */
910 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
911                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
912 {
913         int i;
914
915         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
916                 struct tree_node *current;
917                 struct resource_list *child_list;
918                 const char *resource = AST_VECTOR_GET(&list->items, i);
919
920                 if (have_visited(resource, visited)) {
921                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
922                         continue;
923                 }
924
925                 child_list = retrieve_resource_list(resource, list->event);
926                 if (!child_list) {
927                         int resp = handler->notifier->new_subscribe(endpoint, resource);
928                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
929                                 current = tree_node_alloc(resource, visited, 0);
930                                 if (!current) {
931                                         ast_debug(1,
932                                                 "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n",
933                                                 resource);
934                                         continue;
935                                 }
936                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
937                                                 resource, parent->resource);
938                                 AST_VECTOR_APPEND(&parent->children, current);
939                         } else {
940                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
941                                                 resource, resp);
942                         }
943                 } else {
944                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
945                         current = tree_node_alloc(resource, visited, child_list->full_state);
946                         if (!current) {
947                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
948                                 continue;
949                         }
950                         build_node_children(endpoint, handler, child_list, current, visited);
951                         if (AST_VECTOR_SIZE(&current->children) > 0) {
952                                 ast_debug(1, "List %s had no successful children.\n", resource);
953                                 AST_VECTOR_APPEND(&parent->children, current);
954                         } else {
955                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
956                                                 resource, parent->resource);
957                                 tree_node_destroy(current);
958                         }
959                         ao2_cleanup(child_list);
960                 }
961         }
962 }
963
964 /*!
965  * \brief A resource tree
966  *
967  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
968  * be a resource list. If this is the case, the resource list may contain resources
969  * that are themselves lists. The structure needed to hold the resources is
970  * a tree.
971  *
972  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
973  * to the individual resources in the tree would be successful or not. Any successful
974  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
975  * result in no node being created.
976  *
977  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
978  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
979  */
980 struct resource_tree {
981         struct tree_node *root;
982         unsigned int notification_batch_interval;
983 };
984
985 /*!
986  * \brief Destroy a resource tree.
987  *
988  * This function makes no assumptions about how the tree itself was
989  * allocated and does not attempt to free the tree itself. Callers
990  * of this function are responsible for freeing the tree.
991  *
992  * \param tree The tree to destroy.
993  */
994 static void resource_tree_destroy(struct resource_tree *tree)
995 {
996         if (tree) {
997                 tree_node_destroy(tree->root);
998         }
999 }
1000
1001 /*!
1002  * \brief Build a resource tree
1003  *
1004  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
1005  *
1006  * This function also creates a container that has all resources that have been visited during
1007  * creation of the tree, whether those resources resulted in a tree node being created or not.
1008  * Keeping this container of visited resources allows for misconfigurations such as loops in
1009  * the tree or duplicated resources to be detected.
1010  *
1011  * \param endpoint The endpoint that sent the SUBSCRIBE request.
1012  * \param handler The subscription handler for leaf nodes in the tree.
1013  * \param resource The resource requested in the SUBSCRIBE request.
1014  * \param tree The tree that is to be built.
1015  * \param has_eventlist_support
1016  *
1017  * \retval 200-299 Successfully subscribed to at least one resource.
1018  * \retval 300-699 Failure to subscribe to requested resource.
1019  */
1020 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
1021                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
1022 {
1023         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
1024         struct resources visited;
1025
1026         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
1027                 ast_debug(2, "Subscription '%s->%s' is not to a list\n",
1028                         ast_sorcery_object_get_id(endpoint), resource);
1029                 tree->root = tree_node_alloc(resource, NULL, 0);
1030                 if (!tree->root) {
1031                         return 500;
1032                 }
1033                 return handler->notifier->new_subscribe(endpoint, resource);
1034         }
1035
1036         ast_debug(2, "Subscription '%s->%s' is a list\n",
1037                 ast_sorcery_object_get_id(endpoint), resource);
1038         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
1039                 return 500;
1040         }
1041
1042         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1043         if (!tree->root) {
1044                 AST_VECTOR_FREE(&visited);
1045                 return 500;
1046         }
1047
1048         tree->notification_batch_interval = list->notification_batch_interval;
1049
1050         build_node_children(endpoint, handler, list, tree->root, &visited);
1051         AST_VECTOR_FREE(&visited);
1052
1053         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1054                 return 200;
1055         } else {
1056                 return 500;
1057         }
1058 }
1059
1060 static void add_subscription(struct sip_subscription_tree *obj)
1061 {
1062         AST_RWLIST_WRLOCK(&subscriptions);
1063         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1064         AST_RWLIST_UNLOCK(&subscriptions);
1065 }
1066
1067 static void remove_subscription(struct sip_subscription_tree *obj)
1068 {
1069         struct sip_subscription_tree *i;
1070
1071         AST_RWLIST_WRLOCK(&subscriptions);
1072         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1073                 if (i == obj) {
1074                         AST_RWLIST_REMOVE_CURRENT(next);
1075                         if (i->root) {
1076                                 ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n",
1077                                         ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root));
1078                         }
1079                         break;
1080                 }
1081         }
1082         AST_RWLIST_TRAVERSE_SAFE_END;
1083         AST_RWLIST_UNLOCK(&subscriptions);
1084 }
1085
1086 static void destroy_subscription(struct ast_sip_subscription *sub)
1087 {
1088         ast_debug(3, "Destroying SIP subscription from '%s->%s'\n",
1089                 ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource);
1090         ast_free(sub->body_text);
1091
1092         AST_VECTOR_FREE(&sub->children);
1093         ao2_cleanup(sub->datastores);
1094         ast_free(sub);
1095 }
1096
1097 static void destroy_subscriptions(struct ast_sip_subscription *root)
1098 {
1099         int i;
1100
1101         if (!root) {
1102                 return;
1103         }
1104
1105         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1106                 struct ast_sip_subscription *child;
1107
1108                 child = AST_VECTOR_GET(&root->children, i);
1109                 destroy_subscriptions(child);
1110         }
1111
1112         destroy_subscription(root);
1113 }
1114
1115 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1116                 const char *resource, struct sip_subscription_tree *tree)
1117 {
1118         struct ast_sip_subscription *sub;
1119         pjsip_sip_uri *contact_uri;
1120
1121         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1122         if (!sub) {
1123                 return NULL;
1124         }
1125         strcpy(sub->resource, resource); /* Safe */
1126
1127         sub->datastores = ast_datastores_alloc();
1128         if (!sub->datastores) {
1129                 destroy_subscription(sub);
1130                 return NULL;
1131         }
1132
1133         sub->body_text = ast_str_create(128);
1134         if (!sub->body_text) {
1135                 destroy_subscription(sub);
1136                 return NULL;
1137         }
1138
1139         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1140         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1141         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1142         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1143
1144         sub->handler = handler;
1145         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1146         sub->tree = ao2_bump(tree);
1147
1148         return sub;
1149 }
1150
1151 /*!
1152  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1153  *
1154  * \param handler The handler to supply to leaf subscriptions.
1155  * \param resource The requested resource for this subscription.
1156  * \param generator Body generator to use for leaf subscriptions.
1157  * \param tree The root of the subscription tree.
1158  * \param current The tree node that corresponds to the subscription being created.
1159  */
1160 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1161                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1162                 struct sip_subscription_tree *tree, struct tree_node *current)
1163 {
1164         int i;
1165         struct ast_sip_subscription *sub;
1166
1167         sub = allocate_subscription(handler, resource, tree);
1168         if (!sub) {
1169                 return NULL;
1170         }
1171
1172         sub->full_state = current->full_state;
1173         sub->body_generator = generator;
1174         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1175
1176         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1177                 struct ast_sip_subscription *child;
1178                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1179
1180                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1181                                 tree, child_node);
1182
1183                 if (!child) {
1184                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1185                                         child_node->resource);
1186                         continue;
1187                 }
1188
1189                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1190                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1191                                         child_node->resource);
1192                 }
1193         }
1194
1195         return sub;
1196 }
1197
1198 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1199 {
1200         int i;
1201
1202         if (!sub) {
1203                 return;
1204         }
1205
1206         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1207                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1208                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1209                 }
1210                 return;
1211         }
1212
1213         /* We notify subscription shutdown only on the tree leaves. */
1214         if (sub->handler->subscription_shutdown) {
1215                 sub->handler->subscription_shutdown(sub);
1216         }
1217 }
1218 static int subscription_unreference_dialog(void *obj)
1219 {
1220         struct sip_subscription_tree *sub_tree = obj;
1221
1222         /* This is why we keep the dialog on the subscription. When the subscription
1223          * is destroyed, there is no guarantee that the underlying dialog is ready
1224          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1225          * either. The dialog could be destroyed before our subscription is. We fix
1226          * this problem by keeping a reference to the dialog until it is time to
1227          * destroy the subscription. We need to have the dialog available when the
1228          * subscription is destroyed so that we can guarantee that our attempt to
1229          * remove the serializer will be successful.
1230          */
1231         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1232         sub_tree->dlg = NULL;
1233
1234         return 0;
1235 }
1236
1237 static void subscription_tree_destructor(void *obj)
1238 {
1239         struct sip_subscription_tree *sub_tree = obj;
1240
1241         ast_debug(3, "Destroying subscription tree %p '%s->%s'\n",
1242                 sub_tree,
1243                 sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
1244                 sub_tree->root ? sub_tree->root->resource : "Unknown");
1245
1246         ao2_cleanup(sub_tree->endpoint);
1247
1248         destroy_subscriptions(sub_tree->root);
1249
1250         if (sub_tree->dlg) {
1251                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1252         }
1253
1254         ast_taskprocessor_unreference(sub_tree->serializer);
1255         ast_module_unref(ast_module_info->self);
1256 }
1257
1258 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1259 {
1260         ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n",
1261                 sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree);
1262         ao2_cleanup(sub->tree);
1263 }
1264
1265 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1266 {
1267         sub_tree->dlg = dlg;
1268         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1269         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1270         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1271         pjsip_dlg_inc_session(dlg, &pubsub_module);
1272 }
1273
1274 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1275 {
1276         struct sip_subscription_tree *sub_tree;
1277
1278         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1279         if (!sub_tree) {
1280                 return NULL;
1281         }
1282
1283         ast_module_ref(ast_module_info->self);
1284
1285         if (rdata) {
1286                 /*
1287                  * We must continue using the serializer that the original
1288                  * SUBSCRIBE came in on for the dialog.  There may be
1289                  * retransmissions already enqueued in the original
1290                  * serializer that can result in reentrancy and message
1291                  * sequencing problems.
1292                  */
1293                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1294         } else {
1295                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1296
1297                 /* Create name with seq number appended. */
1298                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1299                         ast_sorcery_object_get_id(endpoint));
1300
1301                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1302         }
1303         if (!sub_tree->serializer) {
1304                 ao2_ref(sub_tree, -1);
1305                 return NULL;
1306         }
1307
1308         sub_tree->endpoint = ao2_bump(endpoint);
1309         sub_tree->notify_sched_id = -1;
1310
1311         return sub_tree;
1312 }
1313
1314 /*!
1315  * \brief Create a subscription tree based on a resource tree.
1316  *
1317  * Using the previously-determined valid resources in the provided resource tree,
1318  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1319  * subscription tree is a real subscription, and the rest in the tree are
1320  * virtual subscriptions.
1321  *
1322  * \param handler The handler to use for leaf subscriptions
1323  * \param endpoint The endpoint that sent the SUBSCRIBE request
1324  * \param rdata The SUBSCRIBE content
1325  * \param resource The requested resource in the SUBSCRIBE request
1326  * \param generator The body generator to use in leaf subscriptions
1327  * \param tree The resource tree on which the subscription tree is based
1328  * \param dlg_status[out] The result of attempting to create a dialog.
1329  *
1330  * \retval NULL Could not create the subscription tree
1331  * \retval non-NULL The root of the created subscription tree
1332  */
1333
1334 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1335                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1336                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1337                 pj_status_t *dlg_status)
1338 {
1339         struct sip_subscription_tree *sub_tree;
1340         pjsip_dialog *dlg;
1341         struct subscription_persistence *persistence;
1342
1343         sub_tree = allocate_subscription_tree(endpoint, rdata);
1344         if (!sub_tree) {
1345                 *dlg_status = PJ_ENOMEM;
1346                 return NULL;
1347         }
1348         sub_tree->role = AST_SIP_NOTIFIER;
1349
1350         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1351         if (!dlg) {
1352                 if (*dlg_status != PJ_EEXISTS) {
1353                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1354                 }
1355                 ao2_ref(sub_tree, -1);
1356                 return NULL;
1357         }
1358
1359         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1360                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1361         if (persistence) {
1362                 /* Update the created dialog with the persisted information */
1363                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1364                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1365                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1366                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1367                 dlg->local.cseq = persistence->cseq;
1368         }
1369
1370         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1371         subscription_setup_dialog(sub_tree, dlg);
1372
1373 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1374         pjsip_evsub_add_ref(sub_tree->evsub);
1375 #endif
1376
1377         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1378                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1379
1380         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1381
1382         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1383         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1384                 sub_tree->is_list = 1;
1385         }
1386
1387         add_subscription(sub_tree);
1388
1389         return sub_tree;
1390 }
1391
1392 /*! Wrapper structure for initial_notify_task */
1393 struct initial_notify_data {
1394         struct sip_subscription_tree *sub_tree;
1395         int expires;
1396 };
1397
1398 static int initial_notify_task(void *obj);
1399 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1400
1401 /*! Persistent subscription recreation continuation under distributor serializer data */
1402 struct persistence_recreate_data {
1403         struct subscription_persistence *persistence;
1404         pjsip_rx_data *rdata;
1405 };
1406
1407 /*!
1408  * \internal
1409  * \brief subscription_persistence_recreate continuation under distributor serializer.
1410  * \since 13.10.0
1411  *
1412  * \retval 0 on success.
1413  * \retval -1 on error.
1414  */
1415 static int sub_persistence_recreate(void *obj)
1416 {
1417         struct persistence_recreate_data *recreate_data = obj;
1418         struct subscription_persistence *persistence = recreate_data->persistence;
1419         pjsip_rx_data *rdata = recreate_data->rdata;
1420         struct ast_sip_endpoint *endpoint;
1421         struct sip_subscription_tree *sub_tree;
1422         struct ast_sip_pubsub_body_generator *generator;
1423         struct ast_sip_subscription_handler *handler;
1424         char *resource;
1425         pjsip_sip_uri *request_uri;
1426         size_t resource_size;
1427         int resp;
1428         struct resource_tree tree;
1429         pjsip_expires_hdr *expires_header;
1430
1431         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1432         resource_size = pj_strlen(&request_uri->user) + 1;
1433         resource = ast_alloca(resource_size);
1434         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1435
1436         /*
1437          * We may want to match without any user options getting
1438          * in the way.
1439          */
1440         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
1441
1442         handler = subscription_get_handler_from_rdata(rdata);
1443         if (!handler || !handler->notifier) {
1444                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1445                         persistence->endpoint);
1446                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1447                 return 0;
1448         }
1449
1450         generator = subscription_get_generator_from_rdata(rdata, handler);
1451         if (!generator) {
1452                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1453                         persistence->endpoint);
1454                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1455                 return 0;
1456         }
1457
1458         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1459                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1460
1461         /* Getting the endpoint may take some time that can affect the expiration. */
1462         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1463                 persistence->endpoint);
1464         if (!endpoint) {
1465                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1466                         persistence->endpoint);
1467                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1468                 return 0;
1469         }
1470
1471         /* Update the expiration header with the new expiration */
1472         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1473                 rdata->msg_info.msg->hdr.next);
1474         if (!expires_header) {
1475                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1476                 if (!expires_header) {
1477                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1478                                 persistence->endpoint);
1479                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1480                         ao2_ref(endpoint, -1);
1481                         return 0;
1482                 }
1483                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1484         }
1485
1486         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1487         if (expires_header->ivalue <= 0) {
1488                 /* The subscription expired since we started recreating the subscription. */
1489                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1490                         persistence->endpoint, persistence->tag);
1491                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1492                 ao2_ref(endpoint, -1);
1493                 return 0;
1494         }
1495
1496         memset(&tree, 0, sizeof(tree));
1497         resp = build_resource_tree(endpoint, handler, resource, &tree,
1498                 ast_sip_pubsub_has_eventlist_support(rdata));
1499         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1500                 pj_status_t dlg_status;
1501
1502                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1503                         &tree, &dlg_status);
1504                 if (!sub_tree) {
1505                         if (dlg_status != PJ_EEXISTS) {
1506                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1507                                         persistence->endpoint);
1508                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1509                         }
1510                 } else {
1511                         struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
1512
1513                         if (!ind) {
1514                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1515                                 goto error;
1516                         }
1517
1518                         ind->sub_tree = ao2_bump(sub_tree);
1519                         ind->expires = expires_header->ivalue;
1520
1521                         sub_tree->persistence = ao2_bump(persistence);
1522                         subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED);
1523                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
1524                                 /* Could not send initial subscribe NOTIFY */
1525                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1526                                 ao2_ref(sub_tree, -1);
1527                                 ast_free(ind);
1528                         }
1529                 }
1530         } else {
1531                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1532         }
1533
1534 error:
1535         resource_tree_destroy(&tree);
1536         ao2_ref(endpoint, -1);
1537
1538         return 0;
1539 }
1540
1541 /*! \brief Callback function to perform the actual recreation of a subscription */
1542 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1543 {
1544         struct subscription_persistence *persistence = obj;
1545         pj_pool_t *pool = arg;
1546         struct ast_taskprocessor *serializer;
1547         pjsip_rx_data rdata;
1548         struct persistence_recreate_data recreate_data;
1549
1550         /* If this subscription has already expired remove it */
1551         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1552                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1553                         persistence->endpoint, persistence->tag);
1554                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1555                 return 0;
1556         }
1557
1558         memset(&rdata, 0, sizeof(rdata));
1559         pj_pool_reset(pool);
1560         rdata.tp_info.pool = pool;
1561
1562         if (ast_sip_create_rdata_with_contact(&rdata, persistence->packet, persistence->src_name,
1563                 persistence->src_port, persistence->transport_key, persistence->local_name,
1564                 persistence->local_port, persistence->contact_uri)) {
1565                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1566                         persistence->endpoint);
1567                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1568                 return 0;
1569         }
1570
1571         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1572                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1573                         persistence->endpoint);
1574                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1575                 return 0;
1576         }
1577
1578         /* Continue the remainder in the distributor serializer */
1579         serializer = ast_sip_get_distributor_serializer(&rdata);
1580         if (!serializer) {
1581                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1582                         persistence->endpoint);
1583                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1584                 return 0;
1585         }
1586         recreate_data.persistence = persistence;
1587         recreate_data.rdata = &rdata;
1588         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1589                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1590                         persistence->endpoint);
1591                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1592         }
1593         ast_taskprocessor_unreference(serializer);
1594
1595         return 0;
1596 }
1597
1598 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1599 static int subscription_persistence_load(void *data)
1600 {
1601         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1602                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1603         pj_pool_t *pool;
1604
1605         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1606                 PJSIP_POOL_RDATA_INC);
1607         if (!pool) {
1608                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1609                 return 0;
1610         }
1611
1612         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1613
1614         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1615
1616         ao2_ref(persisted_subscriptions, -1);
1617         return 0;
1618 }
1619
1620 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1621 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1622 {
1623         struct ast_json_payload *payload;
1624         const char *type;
1625
1626         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1627                 return;
1628         }
1629
1630         payload = stasis_message_data(message);
1631         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1632
1633         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1634          * recreate SIP subscriptions.
1635          */
1636         if (strcmp(type, "FullyBooted")) {
1637                 return;
1638         }
1639
1640         /* This has to be here so the subscription is recreated when the body generator is available */
1641         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1642
1643         /* Once the system is fully booted we don't care anymore */
1644         stasis_unsubscribe(sub);
1645 }
1646
1647 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1648
1649 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1650 {
1651         int num = 0;
1652         struct sip_subscription_tree *i;
1653
1654         if (!on_subscription) {
1655                 return num;
1656         }
1657
1658         AST_RWLIST_RDLOCK(&subscriptions);
1659         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1660                 if (on_subscription(i, arg)) {
1661                         break;
1662                 }
1663                 ++num;
1664         }
1665         AST_RWLIST_UNLOCK(&subscriptions);
1666         return num;
1667 }
1668
1669 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1670                                     struct ast_str **buf)
1671 {
1672         char str[256];
1673         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1674
1675         ast_str_append(buf, 0, "Role: %s\r\n",
1676                        sip_subscription_roles_map[sub_tree->role]);
1677         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1678                        ast_sorcery_object_get_id(sub_tree->endpoint));
1679
1680         if (sub_tree->dlg) {
1681                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1682         } else {
1683                 ast_copy_string(str, "<unknown>", sizeof(str));
1684         }
1685         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1686
1687         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1688
1689         ast_callerid_merge(str, sizeof(str),
1690                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1691                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1692                            "Unknown");
1693
1694         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1695
1696         /* XXX This needs to be done recursively for lists */
1697         if (sub_tree->root->handler->to_ami) {
1698                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1699         }
1700 }
1701
1702
1703 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1704 {
1705         pjsip_dialog *dlg;
1706         pjsip_msg *msg;
1707         pj_str_t name;
1708
1709         dlg = sub->tree->dlg;
1710         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1711         pj_cstr(&name, header);
1712
1713         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1714 }
1715
1716 /* XXX This function is not used. */
1717 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1718                 struct ast_sip_endpoint *endpoint, const char *resource)
1719 {
1720         struct ast_sip_subscription *sub;
1721         pjsip_dialog *dlg;
1722         struct ast_sip_contact *contact;
1723         pj_str_t event;
1724         pjsip_tx_data *tdata;
1725         pjsip_evsub *evsub;
1726         struct sip_subscription_tree *sub_tree = NULL;
1727
1728         sub_tree = allocate_subscription_tree(endpoint, NULL);
1729         if (!sub_tree) {
1730                 return NULL;
1731         }
1732
1733         sub = allocate_subscription(handler, resource, sub_tree);
1734         if (!sub) {
1735                 ao2_cleanup(sub_tree);
1736                 return NULL;
1737         }
1738
1739         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1740         if (!contact || ast_strlen_zero(contact->uri)) {
1741                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1742                                 ast_sorcery_object_get_id(endpoint));
1743                 ao2_ref(sub_tree, -1);
1744                 ao2_cleanup(contact);
1745                 return NULL;
1746         }
1747
1748         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1749         ao2_cleanup(contact);
1750         if (!dlg) {
1751                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1752                 ao2_ref(sub_tree, -1);
1753                 return NULL;
1754         }
1755
1756         pj_cstr(&event, handler->event_name);
1757         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1758         subscription_setup_dialog(sub_tree, dlg);
1759
1760         evsub = sub_tree->evsub;
1761
1762         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1763                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
1764         } else {
1765                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1766                  * being called and terminating the subscription. Therefore, we don't
1767                  * need to decrease the reference count of sub here.
1768                  */
1769                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1770                 ao2_ref(sub_tree, -1);
1771                 return NULL;
1772         }
1773
1774         add_subscription(sub_tree);
1775
1776         return sub;
1777 }
1778
1779 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1780 {
1781         ast_assert(sub->tree->dlg != NULL);
1782         return sub->tree->dlg;
1783 }
1784
1785 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1786 {
1787         ast_assert(sub->tree->endpoint != NULL);
1788         return ao2_bump(sub->tree->endpoint);
1789 }
1790
1791 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1792 {
1793         ast_assert(sub->tree->serializer != NULL);
1794         return sub->tree->serializer;
1795 }
1796
1797 /*!
1798  * \brief Pre-allocate a buffer for the transmission
1799  *
1800  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1801  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1802  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1803  * packet, then we get told the message is too long to be sent.
1804  *
1805  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1806  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1807  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1808  * if the message will fit, and resizing the buffer as required.
1809  *
1810  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1811  * it at 64000 for a couple of reasons:
1812  * 1) Allocating more than 64K at a time is hard to justify
1813  * 2) If the message goes through proxies, those proxies will want to add Via and
1814  *    Record-Route headers, making the message even larger. Giving some space for
1815  *    those headers is a nice thing to do.
1816  *
1817  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1818  * going to impose the same 64K limit as a memory savings.
1819  *
1820  * \param tdata The tdata onto which to allocate a buffer
1821  * \retval 0 Success
1822  * \retval -1 The message is too large
1823  */
1824 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1825 {
1826         int buf_size;
1827         int size = -1;
1828         char *buf;
1829
1830         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1831                 buf = pj_pool_alloc(tdata->pool, buf_size);
1832                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1833         }
1834
1835         if (size == -1) {
1836                 return -1;
1837         }
1838
1839         tdata->buf.start = buf;
1840         tdata->buf.cur = tdata->buf.start;
1841         tdata->buf.end = tdata->buf.start + buf_size;
1842
1843         return 0;
1844 }
1845
1846 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1847 {
1848 #ifdef TEST_FRAMEWORK
1849         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1850         pjsip_evsub *evsub = sub_tree->evsub;
1851 #endif
1852         int res;
1853
1854         if (allocate_tdata_buffer(tdata)) {
1855                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1856                 pjsip_tx_data_dec_ref(tdata);
1857                 return -1;
1858         }
1859
1860         res = pjsip_evsub_send_request(sub_tree->evsub, tdata);
1861
1862         subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST);
1863
1864         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1865                 "StateText: %s\r\n"
1866                 "Endpoint: %s\r\n",
1867                 pjsip_evsub_get_state_name(evsub),
1868                 ast_sorcery_object_get_id(endpoint));
1869
1870         return (res == PJ_SUCCESS ? 0 : -1);
1871 }
1872
1873 /*!
1874  * \brief Add a resource XML element to an RLMI body
1875  *
1876  * Each resource element represents a subscribed resource in the list. This function currently
1877  * will unconditionally add an instance element to each created resource element. Instance
1878  * elements refer to later parts in the multipart body.
1879  *
1880  * \param pool PJLIB allocation pool
1881  * \param cid Content-ID header of the resource
1882  * \param resource_name Name of the resource
1883  * \param resource_uri URI of the resource
1884  * \param state State of the subscribed resource
1885  */
1886 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1887                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1888 {
1889         static pj_str_t cid_name = { "cid", 3 };
1890         pj_xml_node *resource;
1891         pj_xml_node *name;
1892         pj_xml_node *instance;
1893         pj_xml_attr *cid_attr;
1894         char id[6];
1895         char uri[PJSIP_MAX_URL_SIZE];
1896
1897         /* This creates a string representing the Content-ID without the enclosing < > */
1898         const pj_str_t cid_stripped = {
1899                 .ptr = cid->hvalue.ptr + 1,
1900                 .slen = cid->hvalue.slen - 2,
1901         };
1902
1903         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1904         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1905         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1906
1907         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1908         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1909
1910         pj_strdup2(pool, &name->content, resource_name);
1911
1912         ast_generate_random_string(id, sizeof(id));
1913
1914         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1915         ast_sip_presence_xml_create_attr(pool, instance, "state",
1916                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1917
1918         /* Use the PJLIB-util XML library directly here since we are using a
1919          * pj_str_t
1920          */
1921
1922         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1923         pj_xml_add_attr(instance, cid_attr);
1924 }
1925
1926 /*!
1927  * \brief A multipart body part and meta-information
1928  *
1929  * When creating a multipart body part, the end result (the
1930  * pjsip_multipart_part) is hard to inspect without undoing
1931  * a lot of what was done to create it. Therefore, we use this
1932  * structure to store meta-information about the body part.
1933  *
1934  * The main consumer of this is the creator of the RLMI body
1935  * part of a multipart resource list body.
1936  */
1937 struct body_part {
1938         /*! Content-ID header for the body part */
1939         pjsip_generic_string_hdr *cid;
1940         /*! Subscribed resource represented in the body part */
1941         const char *resource;
1942         /*! URI for the subscribed body part */
1943         pjsip_sip_uri *uri;
1944         /*! Subscription state of the resource represented in the body part */
1945         pjsip_evsub_state state;
1946         /*! The actual body part that will be present in the multipart body */
1947         pjsip_multipart_part *part;
1948 };
1949
1950 /*!
1951  * \brief Type declaration for container of body part structures
1952  */
1953 AST_VECTOR(body_part_list, struct body_part *);
1954
1955 /*!
1956  * \brief Create a Content-ID header
1957  *
1958  * Content-ID headers are required by RFC2387 for multipart/related
1959  * bodies. They serve as identifiers for each part of the multipart body.
1960  *
1961  * \param pool PJLIB allocation pool
1962  * \param sub Subscription to a resource
1963  */
1964 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1965                 const struct ast_sip_subscription *sub)
1966 {
1967         static const pj_str_t cid_name = { "Content-ID", 10 };
1968         pjsip_generic_string_hdr *cid;
1969         char id[6];
1970         size_t alloc_size;
1971         pj_str_t cid_value;
1972
1973         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1974         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1975         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1976         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1977                         ast_generate_random_string(id, sizeof(id)),
1978                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1979         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1980
1981         return cid;
1982 }
1983
1984 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1985 {
1986         int num_printed;
1987         pj_xml_node *rlmi = msg_body->data;
1988
1989         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1990         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1991                 return -1;
1992         }
1993
1994         return num_printed;
1995 }
1996
1997 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1998 {
1999         const pj_xml_node *rlmi = data;
2000
2001         return pj_xml_clone(pool, rlmi);
2002 }
2003
2004 /*!
2005  * \brief Create an RLMI body part for a multipart resource list body
2006  *
2007  * RLMI (Resource list meta information) is a special body type that lists
2008  * the subscribed resources and tells subscribers the number of subscribed
2009  * resources and what other body parts are in the multipart body. The
2010  * RLMI body also has a version number that a subscriber can use to ensure
2011  * that the locally-stored state corresponds to server state.
2012  *
2013  * \param pool The allocation pool
2014  * \param sub The subscription representing the subscribed resource list
2015  * \param body_parts A container of body parts that RLMI will refer to
2016  * \param full_state Indicates whether this is a full or partial state notification
2017  * \return The multipart part representing the RLMI body
2018  */
2019 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2020                 struct body_part_list *body_parts, unsigned int full_state)
2021 {
2022         static const pj_str_t rlmi_type = { "application", 11 };
2023         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
2024         pj_xml_node *rlmi;
2025         pj_xml_node *name;
2026         pjsip_multipart_part *rlmi_part;
2027         char version_str[32];
2028         char uri[PJSIP_MAX_URL_SIZE];
2029         pjsip_generic_string_hdr *cid;
2030         int i;
2031
2032         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
2033         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
2034
2035         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
2036         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
2037
2038         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
2039         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
2040         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
2041
2042         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
2043         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
2044
2045         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
2046                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
2047
2048                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
2049         }
2050
2051         rlmi_part = pjsip_multipart_create_part(pool);
2052
2053         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
2054         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
2055         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
2056         pj_list_init(&rlmi_part->body->content_type.param);
2057
2058         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2059         rlmi_part->body->clone_data = rlmi_clone_data;
2060         rlmi_part->body->print_body = rlmi_print_body;
2061
2062         cid = generate_content_id_hdr(pool, sub);
2063         pj_list_insert_before(&rlmi_part->hdr, cid);
2064
2065         return rlmi_part;
2066 }
2067
2068 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2069                 unsigned int force_full_state);
2070
2071 /*!
2072  * \brief Destroy a list of body parts
2073  *
2074  * \param parts The container of parts to destroy
2075  */
2076 static void free_body_parts(struct body_part_list *parts)
2077 {
2078         int i;
2079
2080         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2081                 struct body_part *part = AST_VECTOR_GET(parts, i);
2082                 ast_free(part);
2083         }
2084
2085         AST_VECTOR_FREE(parts);
2086 }
2087
2088 /*!
2089  * \brief Allocate and initialize a body part structure
2090  *
2091  * \param pool PJLIB allocation pool
2092  * \param sub Subscription representing a subscribed resource
2093  */
2094 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2095 {
2096         struct body_part *bp;
2097
2098         bp = ast_calloc(1, sizeof(*bp));
2099         if (!bp) {
2100                 return NULL;
2101         }
2102
2103         bp->cid = generate_content_id_hdr(pool, sub);
2104         bp->resource = sub->resource;
2105         bp->state = sub->subscription_state;
2106         bp->uri = sub->uri;
2107
2108         return bp;
2109 }
2110
2111 /*!
2112  * \brief Create a multipart body part for a subscribed resource
2113  *
2114  * \param pool PJLIB allocation pool
2115  * \param sub The subscription representing a subscribed resource
2116  * \param parts A vector of parts to append the created part to.
2117  * \param use_full_state Unused locally, but may be passed to other functions
2118  */
2119 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2120                 struct body_part_list *parts, unsigned int use_full_state)
2121 {
2122         struct body_part *bp;
2123         pjsip_msg_body *body;
2124
2125         bp = allocate_body_part(pool, sub);
2126         if (!bp) {
2127                 return;
2128         }
2129
2130         body = generate_notify_body(pool, sub, use_full_state);
2131         if (!body) {
2132                 /* Partial state was requested and the resource has not changed state */
2133                 ast_free(bp);
2134                 return;
2135         }
2136
2137         bp->part = pjsip_multipart_create_part(pool);
2138         bp->part->body = body;
2139         pj_list_insert_before(&bp->part->hdr, bp->cid);
2140
2141         AST_VECTOR_APPEND(parts, bp);
2142 }
2143
2144 /*!
2145  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2146  *
2147  * \param pool
2148  * \return The multipart message body
2149  */
2150 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2151 {
2152         pjsip_media_type media_type;
2153         pjsip_param *media_type_param;
2154         char boundary[6];
2155         pj_str_t pj_boundary;
2156
2157         pjsip_media_type_init2(&media_type, "multipart", "related");
2158
2159         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2160         pj_list_init(media_type_param);
2161
2162         pj_strdup2(pool, &media_type_param->name, "type");
2163         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2164
2165         pj_list_insert_before(&media_type.param, media_type_param);
2166
2167         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2168         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2169 }
2170
2171 /*!
2172  * \brief Create a resource list body for NOTIFY requests
2173  *
2174  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2175  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2176  * convey state of individual subscribed resources.
2177  *
2178  * \param pool PJLIB allocation pool
2179  * \param sub Subscription details from which to generate body
2180  * \param force_full_state If true, ignore resource list settings and send a full state notification
2181  * \return The generated multipart/related body
2182  */
2183 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2184                 unsigned int force_full_state)
2185 {
2186         int i;
2187         pjsip_multipart_part *rlmi_part;
2188         pjsip_msg_body *multipart;
2189         struct body_part_list body_parts;
2190         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2191
2192         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2193                 return NULL;
2194         }
2195
2196         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2197                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2198         }
2199
2200         /* This can happen if issuing partial state and no children of the list have changed state */
2201         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2202                 return NULL;
2203         }
2204
2205         multipart = create_multipart_body(pool);
2206
2207         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2208         if (!rlmi_part) {
2209                 return NULL;
2210         }
2211         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2212
2213         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2214                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2215         }
2216
2217         free_body_parts(&body_parts);
2218         return multipart;
2219 }
2220
2221 /*!
2222  * \brief Create the body for a NOTIFY request.
2223  *
2224  * \param pool The pool used for allocations
2225  * \param root The root of the subscription tree
2226  * \param force_full_state If true, ignore resource list settings and send a full state notification
2227  */
2228 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2229                 unsigned int force_full_state)
2230 {
2231         pjsip_msg_body *body;
2232
2233         if (AST_VECTOR_SIZE(&root->children) == 0) {
2234                 if (force_full_state || root->body_changed) {
2235                         /* Not a list. We've already generated the body and saved it on the subscription.
2236                          * Use that directly.
2237                          */
2238                         pj_str_t type;
2239                         pj_str_t subtype;
2240                         pj_str_t text;
2241
2242                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2243                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2244                         pj_cstr(&text, ast_str_buffer(root->body_text));
2245
2246                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2247                         root->body_changed = 0;
2248                 } else {
2249                         body = NULL;
2250                 }
2251         } else {
2252                 body = generate_list_body(pool, root, force_full_state);
2253         }
2254
2255         return body;
2256 }
2257
2258 /*!
2259  * \brief Shortcut method to create a Require: eventlist header
2260  */
2261 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2262 {
2263         pjsip_require_hdr *require;
2264
2265         require = pjsip_require_hdr_create(pool);
2266         pj_strdup2(pool, &require->values[0], "eventlist");
2267         require->count = 1;
2268
2269         return require;
2270 }
2271
2272 /*!
2273  * \brief Send a NOTIFY request to a subscriber
2274  *
2275  * \pre sub_tree->dlg is locked
2276  *
2277  * \param sub_tree The subscription tree representing the subscription
2278  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2279  * \retval 0 Success
2280  * \retval non-zero Failure
2281  */
2282 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2283 {
2284         pjsip_evsub *evsub = sub_tree->evsub;
2285         pjsip_tx_data *tdata;
2286
2287         if (ast_shutdown_final()
2288                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2289                 && sub_tree->persistence) {
2290                 return 0;
2291         }
2292
2293         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2294                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2295                 return -1;
2296         }
2297
2298         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2299         if (!tdata->msg->body) {
2300                 pjsip_tx_data_dec_ref(tdata);
2301                 return -1;
2302         }
2303
2304         if (sub_tree->is_list) {
2305                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2306                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2307         }
2308
2309         if (sip_subscription_send_request(sub_tree, tdata)) {
2310                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2311                 return -1;
2312         }
2313
2314         sub_tree->send_scheduled_notify = 0;
2315
2316         return 0;
2317 }
2318
2319 static int serialized_send_notify(void *userdata)
2320 {
2321         struct sip_subscription_tree *sub_tree = userdata;
2322         pjsip_dialog *dlg = sub_tree->dlg;
2323
2324         pjsip_dlg_inc_lock(dlg);
2325
2326         /* It's possible that between when the notification was scheduled
2327          * and now a new SUBSCRIBE arrived requiring full state to be
2328          * sent out in an immediate NOTIFY. It's also possible that we're
2329          * already processing a terminate.  If that has happened, we need to
2330          * bail out here instead of sending the batched NOTIFY.
2331          */
2332
2333         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2334                 || !sub_tree->send_scheduled_notify) {
2335                 pjsip_dlg_dec_lock(dlg);
2336                 ao2_cleanup(sub_tree);
2337                 return 0;
2338         }
2339
2340         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2341                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2342         }
2343
2344         send_notify(sub_tree, 0);
2345
2346         ast_test_suite_event_notify(
2347                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2348                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2349                 "Resource: %s", sub_tree->root->resource);
2350
2351         sub_tree->notify_sched_id = -1;
2352         pjsip_dlg_dec_lock(dlg);
2353         ao2_cleanup(sub_tree);
2354         return 0;
2355 }
2356
2357 static int sched_cb(const void *data)
2358 {
2359         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2360
2361         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2362         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2363                 ao2_cleanup(sub_tree);
2364         }
2365
2366         return 0;
2367 }
2368
2369 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2370 {
2371         /* There's already a notification scheduled */
2372         if (sub_tree->notify_sched_id > -1) {
2373                 return 0;
2374         }
2375
2376         sub_tree->send_scheduled_notify = 1;
2377         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2378         if (sub_tree->notify_sched_id < 0) {
2379                 ao2_cleanup(sub_tree);
2380                 return -1;
2381         }
2382
2383         return 0;
2384 }
2385
2386 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2387                 int terminate)
2388 {
2389         int res;
2390         pjsip_dialog *dlg = sub->tree->dlg;
2391
2392         pjsip_dlg_inc_lock(dlg);
2393
2394         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2395                 pjsip_dlg_dec_lock(dlg);
2396                 return 0;
2397         }
2398
2399         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2400                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2401                 pjsip_dlg_dec_lock(dlg);
2402                 return -1;
2403         }
2404
2405         sub->body_changed = 1;
2406         if (terminate) {
2407                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2408                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2409         }
2410
2411         if (sub->tree->notification_batch_interval) {
2412                 res = schedule_notification(sub->tree);
2413         } else {
2414                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2415                 ao2_ref(sub->tree, +1);
2416                 if (terminate) {
2417                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2418                 }
2419                 res = send_notify(sub->tree, 0);
2420                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2421                                 "Resource: %s",
2422                                 sub->tree->root->resource);
2423                 ao2_ref(sub->tree, -1);
2424         }
2425
2426         pjsip_dlg_dec_lock(dlg);
2427         return res;
2428 }
2429
2430 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2431 {
2432         return sub->uri;
2433 }
2434
2435 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2436 {
2437         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2438 }
2439
2440 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2441 {
2442         pjsip_dialog *dlg;
2443
2444         dlg = sub->tree->dlg;
2445         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2446 }
2447
2448 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2449 {
2450         return sub->resource;
2451 }
2452
2453 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2454 {
2455         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2456 }
2457
2458 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2459 {
2460         pjsip_hdr res_hdr;
2461
2462         /* If this is a persistence recreation the subscription has already been accepted */
2463         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2464                 return 0;
2465         }
2466
2467         pj_list_init(&res_hdr);
2468         if (sub_tree->is_list) {
2469                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2470                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2471         }
2472
2473         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2474 }
2475
2476 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2477 {
2478         return ast_datastores_alloc_datastore(info, uid);
2479 }
2480
2481 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2482 {
2483         return ast_datastores_add(subscription->datastores, datastore);
2484 }
2485
2486 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2487 {
2488         return ast_datastores_find(subscription->datastores, name);
2489 }
2490
2491 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2492 {
2493         ast_datastores_remove(subscription->datastores, name);
2494 }
2495
2496 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2497 {
2498         return subscription->datastores;
2499 }
2500
2501 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2502 {
2503         return ast_datastores_add(publication->datastores, datastore);
2504 }
2505
2506 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2507 {
2508         return ast_datastores_find(publication->datastores, name);
2509 }
2510
2511 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2512 {
2513         ast_datastores_remove(publication->datastores, name);
2514 }
2515
2516 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2517 {
2518         return publication->datastores;
2519 }
2520
2521 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2522
2523 static int publication_hash_fn(const void *obj, const int flags)
2524 {
2525         const struct ast_sip_publication *publication = obj;
2526         const int *entity_tag = obj;
2527
2528         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2529 }
2530
2531 static int publication_cmp_fn(void *obj, void *arg, int flags)
2532 {
2533         const struct ast_sip_publication *publication1 = obj;
2534         const struct ast_sip_publication *publication2 = arg;
2535         const int *entity_tag = arg;
2536
2537         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2538                 CMP_MATCH | CMP_STOP : 0);
2539 }
2540
2541 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2542 {
2543         AST_RWLIST_WRLOCK(&publish_handlers);
2544         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2545         AST_RWLIST_UNLOCK(&publish_handlers);
2546 }
2547
2548 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2549 {
2550         if (ast_strlen_zero(handler->event_name)) {
2551                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2552                 return -1;
2553         }
2554
2555         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2556                 publication_hash_fn, publication_cmp_fn))) {
2557                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2558                         handler->event_name);
2559                 return -1;
2560         }
2561
2562         publish_add_handler(handler);
2563
2564         ast_module_ref(ast_module_info->self);
2565
2566         return 0;
2567 }
2568
2569 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2570 {
2571         struct ast_sip_publish_handler *iter;
2572
2573         AST_RWLIST_WRLOCK(&publish_handlers);
2574         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2575                 if (handler == iter) {
2576                         AST_RWLIST_REMOVE_CURRENT(next);
2577                         ao2_cleanup(handler->publications);
2578                         ast_module_unref(ast_module_info->self);
2579                         break;
2580                 }
2581         }
2582         AST_RWLIST_TRAVERSE_SAFE_END;
2583         AST_RWLIST_UNLOCK(&publish_handlers);
2584 }
2585
2586 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2587
2588 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2589 {
2590         AST_RWLIST_WRLOCK(&subscription_handlers);
2591         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2592         ast_module_ref(ast_module_info->self);
2593         AST_RWLIST_UNLOCK(&subscription_handlers);
2594 }
2595
2596 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2597 {
2598         struct ast_sip_subscription_handler *iter;
2599
2600         AST_RWLIST_RDLOCK(&subscription_handlers);
2601         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2602                 if (!strcmp(iter->event_name, event_name)) {
2603                         break;
2604                 }
2605         }
2606         AST_RWLIST_UNLOCK(&subscription_handlers);
2607         return iter;
2608 }
2609
2610 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2611 {
2612         pj_str_t event;
2613         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2614         struct ast_sip_subscription_handler *existing;
2615         int i = 0;
2616
2617         if (ast_strlen_zero(handler->event_name)) {
2618                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2619                 return -1;
2620         }
2621
2622         existing = find_sub_handler_for_event_name(handler->event_name);
2623         if (existing) {
2624                 ast_log(LOG_ERROR,
2625                         "Unable to register subscription handler for event %s.  A handler is already registered\n",
2626                         handler->event_name);
2627                 return -1;
2628         }
2629
2630         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2631                 pj_cstr(&accept[i], handler->accept[i]);
2632         }
2633
2634         pj_cstr(&event, handler->event_name);
2635
2636         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2637
2638         sub_add_handler(handler);
2639
2640         return 0;
2641 }
2642
2643 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2644 {
2645         struct ast_sip_subscription_handler *iter;
2646
2647         AST_RWLIST_WRLOCK(&subscription_handlers);
2648         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2649                 if (handler == iter) {
2650                         AST_RWLIST_REMOVE_CURRENT(next);
2651                         ast_module_unref(ast_module_info->self);
2652                         break;
2653                 }
2654         }
2655         AST_RWLIST_TRAVERSE_SAFE_END;
2656         AST_RWLIST_UNLOCK(&subscription_handlers);
2657 }
2658
2659 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2660 {
2661         struct ast_sip_pubsub_body_generator *gen;
2662
2663         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2664                 if (!strcmp(gen->type, type)
2665                         && !strcmp(gen->subtype, subtype)) {
2666                         break;
2667                 }
2668         }
2669
2670         return gen;
2671 }
2672
2673 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2674 {
2675         struct ast_sip_pubsub_body_generator *gen;
2676
2677         AST_RWLIST_RDLOCK(&body_generators);
2678         gen = find_body_generator_type_subtype_nolock(type, subtype);
2679         AST_RWLIST_UNLOCK(&body_generators);
2680         return gen;
2681 }
2682
2683 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2684 {
2685         char *accept_copy = ast_strdupa(accept);
2686         char *subtype = accept_copy;
2687         char *type = strsep(&subtype, "/");
2688
2689         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2690                 return NULL;
2691         }
2692
2693         return find_body_generator_type_subtype(type, subtype);
2694 }
2695
2696 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2697                 size_t num_accept, const char *body_type)
2698 {
2699         int i;
2700         struct ast_sip_pubsub_body_generator *generator = NULL;
2701
2702         for (i = 0; i < num_accept; ++i) {
2703                 generator = find_body_generator_accept(accept[i]);
2704                 if (generator) {
2705                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2706                         if (strcmp(generator->body_type, body_type)) {
2707                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2708                                                 generator->type, generator->subtype, generator);
2709                                 generator = NULL;
2710                                 continue;
2711                         }
2712                         break;
2713                 } else {
2714                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2715                 }
2716         }
2717
2718         return generator;
2719 }
2720
2721 static int generate_initial_notify(struct ast_sip_subscription *sub)
2722 {
2723         void *notify_data;
2724         int res;
2725         struct ast_sip_body_data data = {
2726                 .body_type = sub->handler->body_type,
2727         };
2728
2729         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2730                 int i;
2731
2732                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2733                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2734                                 return -1;
2735                         }
2736                 }
2737
2738                 return 0;
2739         }
2740
2741         /* We notify subscription establishment only on the tree leaves. */
2742         if (sub->handler->notifier->subscription_established(sub)) {
2743                 return -1;
2744         }
2745
2746         notify_data = sub->handler->notifier->get_notify_data(sub);
2747         if (!notify_data) {
2748                 return -1;
2749         }
2750
2751         data.body_data = notify_data;
2752
2753         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2754                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2755
2756         ao2_cleanup(notify_data);
2757
2758         return res;
2759 }
2760
2761 static int pubsub_on_refresh_timeout(void *userdata);
2762
2763 static int initial_notify_task(void * obj)
2764 {
2765         struct initial_notify_data *ind = obj;
2766
2767         if (generate_initial_notify(ind->sub_tree->root)) {
2768                 pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE);
2769         } else {
2770                 send_notify(ind->sub_tree, 1);
2771                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2772                         "Resource: %s",
2773                         ind->sub_tree->root->resource);
2774         }
2775
2776         if (ind->expires > -1) {
2777                 char *name = ast_alloca(strlen("->/ ") +
2778                         strlen(ind->sub_tree->persistence->endpoint) +
2779                         strlen(ind->sub_tree->root->resource) +
2780                         strlen(ind->sub_tree->root->handler->event_name) +
2781                         ind->sub_tree->dlg->call_id->id.slen + 1);
2782
2783                 sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint,
2784                         ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name,
2785                         (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr);
2786
2787                 ast_debug(3, "Scheduling timer: %s\n", name);
2788                 ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer,
2789                         ind->expires * 1000, pubsub_on_refresh_timeout, name,
2790                         ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2);
2791                 if (!ind->sub_tree->expiration_task) {
2792                         ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n",
2793                                 ind->expires, name);
2794                 }
2795         }
2796
2797         ao2_ref(ind->sub_tree, -1);
2798         ast_free(ind);
2799
2800         return 0;
2801 }
2802
2803 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2804 {
2805         pjsip_expires_hdr *expires_header;
2806         struct ast_sip_subscription_handler *handler;
2807         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2808         struct sip_subscription_tree *sub_tree;
2809         struct ast_sip_pubsub_body_generator *generator;
2810         char *resource;
2811         pjsip_uri *request_uri;
2812         pjsip_sip_uri *request_uri_sip;
2813         size_t resource_size;
2814         int resp;
2815         struct resource_tree tree;
2816         pj_status_t dlg_status;
2817
2818         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2819         ast_assert(endpoint != NULL);
2820
2821         if (!endpoint->subscription.allow) {
2822                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2823                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2824                 return PJ_TRUE;
2825         }
2826
2827         request_uri = rdata->msg_info.msg->line.req.uri;
2828
2829         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2830                 char uri_str[PJSIP_MAX_URL_SIZE];
2831
2832                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2833                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2834                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2835                 return PJ_TRUE;
2836         }
2837
2838         request_uri_sip = pjsip_uri_get_uri(request_uri);
2839         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2840         resource = ast_alloca(resource_size);
2841         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2842
2843         /*
2844          * We may want to match without any user options getting
2845          * in the way.
2846          */
2847         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
2848
2849         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2850         if (expires_header) {
2851                 if (expires_header->ivalue == 0) {
2852                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2853                                 ast_sorcery_object_get_id(endpoint));
2854                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2855                                 return PJ_TRUE;
2856                 }
2857                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2858                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2859                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2860                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2861                         return PJ_TRUE;
2862                 }
2863         }
2864
2865         handler = subscription_get_handler_from_rdata(rdata);
2866         if (!handler) {
2867                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2868                 return PJ_TRUE;
2869         }
2870
2871         generator = subscription_get_generator_from_rdata(rdata, handler);
2872         if (!generator) {
2873                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2874                 return PJ_TRUE;
2875         }
2876
2877         memset(&tree, 0, sizeof(tree));
2878         resp = build_resource_tree(endpoint, handler, resource, &tree,
2879                 ast_sip_pubsub_has_eventlist_support(rdata));
2880         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2881                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2882                 resource_tree_destroy(&tree);
2883                 return PJ_TRUE;
2884         }
2885
2886         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2887         if (!sub_tree) {
2888                 if (dlg_status != PJ_EEXISTS) {
2889                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2890                 }
2891         } else {
2892                 struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
2893
2894                 if (!ind) {
2895                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2896                         resource_tree_destroy(&tree);
2897                         return PJ_TRUE;
2898                 }
2899
2900                 ind->sub_tree = ao2_bump(sub_tree);
2901                 /* Since this is a normal subscribe, pjproject takes care of the timer */
2902                 ind->expires = -1;
2903
2904                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2905                 subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
2906                 sip_subscription_accept(sub_tree, rdata, resp);
2907                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
2908                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2909                         ao2_ref(sub_tree, -1);
2910                         ast_free(ind);
2911                 }
2912         }
2913
2914         resource_tree_destroy(&tree);
2915         return PJ_TRUE;
2916 }
2917
2918 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2919 {
2920         struct ast_sip_publish_handler *iter = NULL;
2921
2922         AST_RWLIST_RDLOCK(&publish_handlers);
2923         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2924                 if (strcmp(event, iter->event_name)) {
2925                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2926                         continue;
2927                 }
2928                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2929                 break;
2930         }
2931         AST_RWLIST_UNLOCK(&publish_handlers);
2932
2933         return iter;
2934 }
2935
2936 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2937         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2938 {
2939         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2940
2941         if (etag_hdr) {
2942                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2943
2944                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2945
2946                 if (sscanf(etag, "%30d", entity_id) != 1) {
2947                         return SIP_PUBLISH_UNKNOWN;
2948                 }
2949         }
2950
2951         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2952
2953         if (!(*expires)) {
2954                 return SIP_PUBLISH_REMOVE;
2955         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2956                 return SIP_PUBLISH_INITIAL;
2957         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2958                 return SIP_PUBLISH_REFRESH;
2959         } else if (etag_hdr && rdata->msg_info.msg->body) {
2960                 return SIP_PUBLISH_MODIFY;
2961         }
2962
2963         return SIP_PUBLISH_UNKNOWN;
2964 }
2965
2966 /*! \brief Internal destructor for publications */
2967 static void publication_destroy_fn(void *obj)
2968 {
2969         struct ast_sip_publication *publication = obj;
2970
2971         ast_debug(3, "Destroying SIP publication\n");
2972
2973         ao2_cleanup(publication->datastores);
2974         ao2_cleanup(publication->endpoint);
2975 }
2976
2977 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2978         const char *resource, const char *event_configuration_name)
2979 {
2980         struct ast_sip_publication *publication;
2981         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2982         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2983         char *dst;
2984
2985         ast_assert(endpoint != NULL);
2986
2987         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2988                 return NULL;
2989         }
2990
2991         if (!(publication->datastores = ast_datastores_alloc())) {
2992                 ao2_ref(publication, -1);
2993                 return NULL;
2994         }
2995
2996         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2997         ao2_ref(endpoint, +1);
2998         publication->endpoint = endpoint;
2999         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
3000         publication->sched_id = -1;
3001         dst = publication->data;
3002         publication->resource = strcpy(dst, resource);
3003         dst += resource_len;
3004         publication->event_configuration_name = strcpy(dst, event_configuration_name);
3005
3006         return publication;
3007 }
3008
3009 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
3010                 pjsip_rx_data *rdata)
3011 {
3012         pjsip_tx_data *tdata;
3013         pjsip_transaction *tsx;
3014
3015         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
3016                 return -1;
3017         }
3018
3019         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
3020                 char buf[30];
3021
3022                 snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
3023                 ast_sip_add_header(tdata, "SIP-ETag", buf);
3024
3025                 snprintf(buf, sizeof(buf), "%d", pub->expires);
3026                 ast_sip_add_header(tdata, "Expires", buf);
3027         }
3028
3029         if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
3030                 pjsip_tx_data_dec_ref(tdata);
3031                 return -1;
3032         }
3033
3034         pjsip_tsx_recv_msg(tsx, rdata);
3035
3036         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
3037                 pjsip_tx_data_dec_ref(tdata);
3038                 return -1;
3039         }
3040
3041         return 0;
3042 }
3043
3044 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
3045         struct ast_sip_publish_handler *handler)
3046 {
3047         struct ast_sip_publication *publication;
3048         char *resource_name;
3049         size_t resource_size;
3050         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
3051         struct ast_variable *event_configuration_name = NULL;
3052         pjsip_uri *request_uri;
3053         pjsip_sip_uri *request_uri_sip;
3054         int resp;
3055
3056         request_uri = rdata->msg_info.msg->line.req.uri;
3057
3058         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
3059                 char uri_str[PJSIP_MAX_URL_SIZE];
3060
3061                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
3062                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
3063                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
3064                 return NULL;
3065         }
3066
3067         request_uri_sip = pjsip_uri_get_uri(request_uri);
3068         resource_size = pj_strlen(&request_uri_sip->user) + 1;
3069         resource_name = ast_alloca(resource_size);
3070         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
3071
3072         /*
3073          * We may want to match without any user options getting
3074          * in the way.
3075          */
3076         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
3077
3078         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
3079         if (!resource) {
3080                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
3081                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3082                 return NULL;
3083         }
3084
3085         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
3086                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
3087                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
3088                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
3089                 return NULL;
3090         }
3091
3092         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
3093                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
3094                         break;
3095                 }
3096         }
3097
3098         if (!event_configuration_name) {
3099                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
3100                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3101                 return NULL;
3102         }
3103
3104         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
3105
3106         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
3107                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
3108                 return NULL;
3109         }
3110
3111         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3112
3113         if (!publication) {
3114                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3115                 return NULL;
3116         }
3117
3118         publication->handler = handler;
3119         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3120                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3121                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3122                 ao2_cleanup(publication);
3123                 return NULL;
3124         }
3125
3126         sip_publication_respond(publication, resp, rdata);
3127
3128         return publication;
3129 }
3130
3131 static int publish_expire_callback(void *data)
3132 {
3133         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3134
3135         if (publication->handler->publish_expire) {
3136                 publication->handler->publish_expire(publication);
3137         }
3138
3139         return 0;
3140 }
3141
3142 static int publish_expire(const void *data)
3143 {
3144         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3145
3146         ao2_unlink(publication->handler->publications, publication);
3147         publication->sched_id = -1;
3148
3149         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3150                 ao2_cleanup(publication);
3151         }
3152
3153         return 0;
3154 }
3155
3156 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3157 {
3158         pjsip_event_hdr *event_header;
3159         struct ast_sip_publish_handler *handler;
3160         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3161         char event[32];
3162         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3163         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3164         enum sip_publish_type publish_type;
3165         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3166         int expires = 0, entity_id, response = 0;
3167
3168         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3169         ast_assert(endpoint != NULL);
3170
3171         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3172         if (!event_header) {
3173                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3174                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3175                 return PJ_TRUE;
3176         }
3177         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3178
3179         handler = find_pub_handler(event);
3180         if (!handler) {
3181                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3182                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3183                 return PJ_TRUE;
3184         }
3185
3186         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3187
3188         /* If this is not an initial publish ensure that a publication is present */
3189         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3190                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3191                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3192
3193                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3194                                 NULL, NULL);
3195                         return PJ_TRUE;
3196                 }
3197
3198                 /* Per the RFC every response has to have a new entity tag */
3199                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3200
3201                 /* Update the expires here so that the created responses will contain the correct value */
3202                 publication->expires = expires;
3203         }
3204
3205         switch (publish_type) {
3206                 case SIP_PUBLISH_INITIAL:
3207                         publication = publish_request_initial(endpoint, rdata, handler);
3208                         break;
3209                 case SIP_PUBLISH_REFRESH:
3210                 case SIP_PUBLISH_MODIFY:
3211                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3212                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3213                                 /* If an error occurs we want to terminate the publication */
3214                                 expires = 0;
3215                         }
3216                         response = 200;
3217                         break;
3218                 case SIP_PUBLISH_REMOVE:
3219                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3220                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3221                         response = 200;
3222                         break;
3223                 case SIP_PUBLISH_UNKNOWN:
3224                 default:
3225                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3226                         break;
3227         }
3228
3229         if (publication) {