res_pjsip_pubsub: Correctly implement persisted subscriptions
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/cli.h"
46 #include "asterisk/test.h"
47 #include "res_pjsip/include/res_pjsip_private.h"
48 #include "asterisk/res_pjsip_presence_xml.h"
49
50 /*** DOCUMENTATION
51         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
52                 <synopsis>
53                         Lists subscriptions.
54                 </synopsis>
55                 <syntax />
56                 <description>
57                         <para>
58                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
59                         is issued for each subscription object.  Once all detail events are completed an
60                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
61                         </para>
62                 </description>
63         </manager>
64         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
65                 <synopsis>
66                         Lists subscriptions.
67                 </synopsis>
68                 <syntax />
69                 <description>
70                         <para>
71                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
72                         is issued for each subscription object.  Once all detail events are completed an
73                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
74                         </para>
75                 </description>
76         </manager>
77         <manager name="PJSIPShowResourceLists" language="en_US">
78                 <synopsis>
79                         Displays settings for configured resource lists.
80                 </synopsis>
81                 <syntax />
82                 <description>
83                         <para>
84                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
85                         is issued for each resource list object.  Once all detail events are completed a
86                         <literal>ResourceListDetailComplete</literal> event is issued.
87                         </para>
88                 </description>
89         </manager>
90
91         <configInfo name="res_pjsip_pubsub" language="en_US">
92                 <synopsis>Module that implements publish and subscribe support.</synopsis>
93                 <configFile name="pjsip.conf">
94                         <configObject name="subscription_persistence">
95                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
96                                 <configOption name="packet">
97                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
98                                 </configOption>
99                                 <configOption name="src_name">
100                                         <synopsis>The source address of the subscription</synopsis>
101                                 </configOption>
102                                 <configOption name="src_port">
103                                         <synopsis>The source port of the subscription</synopsis>
104                                 </configOption>
105                                 <configOption name="transport_key">
106                                         <synopsis>The type of transport the subscription was received on</synopsis>
107                                 </configOption>
108                                 <configOption name="local_name">
109                                         <synopsis>The local address the subscription was received on</synopsis>
110                                 </configOption>
111                                 <configOption name="local_port">
112                                         <synopsis>The local port the subscription was received on</synopsis>
113                                 </configOption>
114                                 <configOption name="cseq">
115                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
116                                 </configOption>
117                                 <configOption name="tag">
118                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
119                                 </configOption>
120                                 <configOption name="endpoint">
121                                         <synopsis>The name of the endpoint that subscribed</synopsis>
122                                 </configOption>
123                                 <configOption name="expires">
124                                         <synopsis>The time at which the subscription expires</synopsis>
125                                 </configOption>
126                         </configObject>
127                         <configObject name="resource_list">
128                                 <synopsis>Resource list configuration parameters.</synopsis>
129                                 <description>
130                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
131                                         to be specified. This can be useful to decrease the amount of subscription traffic
132                                         that a server has to process.</para>
133                                         <note>
134                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
135                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
136                                                 will need to make adjustments.</para>
137                                         </note>
138                                 </description>
139                                 <configOption name="type">
140                                         <synopsis>Must be of type 'resource_list'</synopsis>
141                                 </configOption>
142                                 <configOption name="event">
143                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
144                                         <description><para>
145                                                 The SIP event package describes the types of resources that Asterisk reports
146                                                 the state of.
147                                         </para>
148                                                 <enumlist>
149                                                         <enum name="presence"><para>
150                                                                 Device state and presence reporting.
151                                                         </para></enum>
152                                                         <enum name="dialog"><para>
153                                                                 This is identical to <replaceable>presence</replaceable>.
154                                                         </para></enum>
155                                                         <enum name="message-summary"><para>
156                                                                 Message-waiting indication (MWI) reporting.
157                                                         </para></enum>
158                                                 </enumlist>
159                                         </description>
160                                 </configOption>
161                                 <configOption name="list_item">
162                                         <synopsis>The name of a resource to report state on</synopsis>
163                                         <description>
164                                                 <para>In general Asterisk looks up list items in the following way:</para>
165                                                 <para>1. Check if the list item refers to another configured resource list.</para>
166                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
167                                                    to find the specified resource.</para>
168                                                 <para>The second part means that the way the list item is specified depends
169                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
170                                                 set to <literal>presence</literal>, then list items should be in the form of
171                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
172                                                 names should be listed.</para>
173                                         </description>
174                                 </configOption>
175                                 <configOption name="full_state" default="no">
176                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
177                                         <description>
178                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
179                                                 a notification that contains the state of all resources in the list. If the option is
180                                                 disabled, Asterisk will construct a notification that only contains the states of
181                                                 resources that have changed.</para>
182                                                 <note>
183                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
184                                                         to send a notification with the states of all resources in the list. When a subscriber
185                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
186                                                         notification.</para>
187                                                 </note>
188                                         </description>
189                                 </configOption>
190                                 <configOption name="notification_batch_interval" default="0">
191                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
192                                         <description>
193                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
194                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
195                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
196                                                 many notifications.</para>
197                                         </description>
198                                 </configOption>
199                         </configObject>
200                         <configObject name="inbound-publication">
201                                 <synopsis>The configuration for inbound publications</synopsis>
202                                 <configOption name="endpoint" default="">
203                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
204                                 </configOption>
205                                 <configOption name="type">
206                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
207                                 </configOption>
208                         </configObject>
209                 </configFile>
210         </configInfo>
211  ***/
212
213 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
214
215 static struct pjsip_module pubsub_module = {
216         .name = { "PubSub Module", 13 },
217         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
218         .on_rx_request = pubsub_on_rx_request,
219 };
220
221 #define MOD_DATA_PERSISTENCE "sub_persistence"
222 #define MOD_DATA_MSG "sub_msg"
223
224 static const pj_str_t str_event_name = { "Event", 5 };
225
226 /*! \brief Scheduler used for automatically expiring publications */
227 static struct ast_sched_context *sched;
228
229 /*! \brief Number of buckets for publications (on a per handler) */
230 #define PUBLICATIONS_BUCKETS 37
231
232 /*! \brief Default expiration time for PUBLISH if one is not specified */
233 #define DEFAULT_PUBLISH_EXPIRES 3600
234
235 /*! \brief Number of buckets for subscription datastore */
236 #define DATASTORE_BUCKETS 53
237
238 /*! \brief Default expiration for subscriptions */
239 #define DEFAULT_EXPIRES 3600
240
241 /*! \brief Defined method for PUBLISH */
242 const pjsip_method pjsip_publish_method =
243 {
244         PJSIP_OTHER_METHOD,
245         { "PUBLISH", 7 }
246 };
247
248 /*!
249  * \brief The types of PUBLISH messages defined in RFC 3903
250  */
251 enum sip_publish_type {
252         /*!
253          * \brief Unknown
254          *
255          * \details
256          * This actually is not defined in RFC 3903. We use this as a constant
257          * to indicate that an incoming PUBLISH does not fit into any of the
258          * other categories and is thus invalid.
259          */
260         SIP_PUBLISH_UNKNOWN,
261
262         /*!
263          * \brief Initial
264          *
265          * \details
266          * The first PUBLISH sent. This will contain a non-zero Expires header
267          * as well as a body that indicates the current state of the endpoint
268          * that has sent the message. The initial PUBLISH is the only type
269          * of PUBLISH to not contain a Sip-If-Match header in it.
270          */
271         SIP_PUBLISH_INITIAL,
272
273         /*!
274          * \brief Refresh
275          *
276          * \details
277          * Used to keep a published state from expiring. This will contain a
278          * non-zero Expires header but no body since its purpose is not to
279          * update state.
280          */
281         SIP_PUBLISH_REFRESH,
282
283         /*!
284          * \brief Modify
285          *
286          * \details
287          * Used to change state from its previous value. This will contain
288          * a body updating the published state. May or may not contain an
289          * Expires header.
290          */
291         SIP_PUBLISH_MODIFY,
292
293         /*!
294          * \brief Remove
295          *
296          * \details
297          * Used to remove published state from an ESC. This will contain
298          * an Expires header set to 0 and likely no body.
299          */
300         SIP_PUBLISH_REMOVE,
301 };
302
303 /*!
304  * \brief A vector of strings commonly used throughout this module
305  */
306 AST_VECTOR(resources, const char *);
307
308 /*!
309  * \brief Resource list configuration item
310  */
311 struct resource_list {
312         SORCERY_OBJECT(details);
313         /*! SIP event package the list uses. */
314         char event[32];
315         /*! Strings representing resources in the list. */
316         struct resources items;
317         /*! Indicates if Asterisk sends full or partial state on notifications. */
318         unsigned int full_state;
319         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
320         unsigned int notification_batch_interval;
321 };
322
323 /*!
324  * Used to create new entity IDs by ESCs.
325  */
326 static int esc_etag_counter;
327
328 /*!
329  * \brief Structure representing a SIP publication
330  */
331 struct ast_sip_publication {
332         /*! Publication datastores set up by handlers */
333         struct ao2_container *datastores;
334         /*! \brief Entity tag for the publication */
335         int entity_tag;
336         /*! \brief Handler for this publication */
337         struct ast_sip_publish_handler *handler;
338         /*! \brief The endpoint with which the subscription is communicating */
339         struct ast_sip_endpoint *endpoint;
340         /*! \brief Expiration time of the publication */
341         int expires;
342         /*! \brief Scheduled item for expiration of publication */
343         int sched_id;
344         /*! \brief The resource the publication is to */
345         char *resource;
346         /*! \brief The name of the event type configuration */
347         char *event_configuration_name;
348         /*! \brief Data containing the above */
349         char data[0];
350 };
351
352
353 /*!
354  * \brief Structure used for persisting an inbound subscription
355  */
356 struct subscription_persistence {
357         /*! Sorcery object details */
358         SORCERY_OBJECT(details);
359         /*! The name of the endpoint involved in the subscription */
360         char *endpoint;
361         /*! SIP message that creates the subscription */
362         char packet[PJSIP_MAX_PKT_LEN];
363         /*! Source address of the message */
364         char src_name[PJ_INET6_ADDRSTRLEN];
365         /*! Source port of the message */
366         int src_port;
367         /*! Local transport key type */
368         char transport_key[32];
369         /*! Local transport address */
370         char local_name[PJ_INET6_ADDRSTRLEN];
371         /*! Local transport port */
372         int local_port;
373         /*! Next CSeq to use for message */
374         unsigned int cseq;
375         /*! Local tag of the dialog */
376         char *tag;
377         /*! When this subscription expires */
378         struct timeval expires;
379 };
380
381 /*!
382  * \brief The state of the subscription tree
383  */
384 enum sip_subscription_tree_state {
385         /*! Normal operation */
386         SIP_SUB_TREE_NORMAL = 0,
387         /*! A terminate has been requested by Asterisk, the client, or pjproject */
388         SIP_SUB_TREE_TERMINATE_PENDING,
389         /*! The terminate is in progress */
390         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
391         /*! The terminate process has finished and the subscription tree is no longer valid */
392         SIP_SUB_TREE_TERMINATED,
393 };
394
395 static char *sub_tree_state_description[] = {
396         "Normal",
397         "TerminatePending",
398         "TerminateInProgress",
399         "Terminated"
400 };
401
402 /*!
403  * \brief A tree of SIP subscriptions
404  *
405  * Because of the ability to subscribe to resource lists, a SIP
406  * subscription can result in a tree of subscriptions being created.
407  * This structure represents the information relevant to the subscription
408  * as a whole, to include the underlying PJSIP structure for the
409  * subscription.
410  */
411 struct sip_subscription_tree {
412         /*! The endpoint with which the subscription is communicating */
413         struct ast_sip_endpoint *endpoint;
414         /*! Serializer on which to place operations for this subscription */
415         struct ast_taskprocessor *serializer;
416         /*! The role for this subscription */
417         enum ast_sip_subscription_role role;
418         /*! Persistence information */
419         struct subscription_persistence *persistence;
420         /*! The underlying PJSIP event subscription structure */
421         pjsip_evsub *evsub;
422         /*! The underlying PJSIP dialog */
423         pjsip_dialog *dlg;
424         /*! Interval to use for batching notifications */
425         unsigned int notification_batch_interval;
426         /*! Scheduler ID for batched notification */
427         int notify_sched_id;
428         /*! Indicator if scheduled batched notification should be sent */
429         unsigned int send_scheduled_notify;
430         /*! The root of the subscription tree */
431         struct ast_sip_subscription *root;
432         /*! Is this subscription to a list? */
433         int is_list;
434         /*! Next item in the list */
435         AST_LIST_ENTRY(sip_subscription_tree) next;
436         /*! Subscription tree state */
437         enum sip_subscription_tree_state state;
438         /*! On asterisk restart, this is the task data used
439          * to restart the expiration timer if pjproject isn't
440          * capable of restarting the timer.
441          */
442         struct ast_sip_sched_task *expiration_task;
443 };
444
445 /*!
446  * \brief Structure representing a "virtual" SIP subscription.
447  *
448  * This structure serves a dual purpose. Structurally, it is
449  * the constructed tree of subscriptions based on the resources
450  * being subscribed to. API-wise, this serves as the handle that
451  * subscription handlers use in order to interact with the pubsub API.
452  */
453 struct ast_sip_subscription {
454         /*! Subscription datastores set up by handlers */
455         struct ao2_container *datastores;
456         /*! The handler for this subscription */
457         const struct ast_sip_subscription_handler *handler;
458         /*! Pointer to the base of the tree */
459         struct sip_subscription_tree *tree;
460         /*! Body generaator for NOTIFYs */
461         struct ast_sip_pubsub_body_generator *body_generator;
462         /*! Vector of child subscriptions */
463         AST_VECTOR(, struct ast_sip_subscription *) children;
464         /*! Saved NOTIFY body text for this subscription */
465         struct ast_str *body_text;
466         /*! Indicator that the body text has changed since the last notification */
467         int body_changed;
468         /*! The current state of the subscription */
469         pjsip_evsub_state subscription_state;
470         /*! For lists, the current version to place in the RLMI body */
471         unsigned int version;
472         /*! For lists, indicates if full state should always be communicated. */
473         unsigned int full_state;
474         /*! URI associated with the subscription */
475         pjsip_sip_uri *uri;
476         /*! Name of resource being subscribed to */
477         char resource[0];
478 };
479
480 /*!
481  * \brief Structure representing a publication resource
482  */
483 struct ast_sip_publication_resource {
484         /*! \brief Sorcery object details */
485         SORCERY_OBJECT(details);
486         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
487         char *endpoint;
488         /*! \brief Mapping for event types to configuration */
489         struct ast_variable *events;
490 };
491
492 static const char *sip_subscription_roles_map[] = {
493         [AST_SIP_SUBSCRIBER] = "Subscriber",
494         [AST_SIP_NOTIFIER] = "Notifier"
495 };
496
497 enum sip_persistence_update_type {
498         /*! Called from send request */
499         SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0,
500         /*! Subscription created from initial client request */
501         SUBSCRIPTION_PERSISTENCE_CREATED,
502         /*! Subscription recreated by asterisk on startup */
503         SUBSCRIPTION_PERSISTENCE_RECREATED,
504         /*! Subscription created from client refresh */
505         SUBSCRIPTION_PERSISTENCE_REFRESHED,
506 };
507
508 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
509
510 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
511 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
512
513 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
514 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
515                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
516 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
517                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
518 static void pubsub_on_client_refresh(pjsip_evsub *sub);
519 static void pubsub_on_server_timeout(pjsip_evsub *sub);
520  
521 static pjsip_evsub_user pubsub_cb = {
522         .on_evsub_state = pubsub_on_evsub_state,
523         .on_rx_refresh = pubsub_on_rx_refresh,
524         .on_rx_notify = pubsub_on_rx_notify,
525         .on_client_refresh = pubsub_on_client_refresh,
526         .on_server_timeout = pubsub_on_server_timeout,
527 };
528
529 /*! \brief Destructor for publication resource */
530 static void publication_resource_destroy(void *obj)
531 {
532         struct ast_sip_publication_resource *resource = obj;
533
534         ast_free(resource->endpoint);
535         ast_variables_destroy(resource->events);
536 }
537
538 /*! \brief Allocator for publication resource */
539 static void *publication_resource_alloc(const char *name)
540 {
541         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
542 }
543
544 /*! \brief Destructor for subscription persistence */
545 static void subscription_persistence_destroy(void *obj)
546 {
547         struct subscription_persistence *persistence = obj;
548
549         ast_free(persistence->endpoint);
550         ast_free(persistence->tag);
551 }
552
553 /*! \brief Allocator for subscription persistence */
554 static void *subscription_persistence_alloc(const char *name)
555 {
556         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
557 }
558
559 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
560 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
561 {
562         char tag[PJ_GUID_STRING_LENGTH + 1];
563
564         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
565          * look it up by id at all.
566          */
567         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
568                 "subscription_persistence", NULL);
569
570         pjsip_dialog *dlg = sub_tree->dlg;
571
572         if (!persistence) {
573                 return NULL;
574         }
575
576         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
577         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
578         persistence->tag = ast_strdup(tag);
579
580         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
581         return persistence;
582 }
583
584 /*! \brief Function which updates persistence information of a subscription in sorcery */
585 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
586         pjsip_rx_data *rdata, enum sip_persistence_update_type type)
587 {
588         pjsip_dialog *dlg;
589
590         if (!sub_tree->persistence) {
591                 return;
592         }
593
594         ast_debug(3, "Updating persistence for '%s->%s'\n",
595                 ast_sorcery_object_get_id(sub_tree->endpoint), sub_tree->root->resource);
596
597         dlg = sub_tree->dlg;
598         sub_tree->persistence->cseq = dlg->local.cseq;
599
600         if (rdata) {
601                 int expires;
602                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
603
604                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
605                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
606
607                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
608                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
609                  * will always point to the proper SIP message that is to be processed. When updating subscription
610                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
611                  * only ever have a single SIP message on it, and so we base persistence on that.
612                  */
613                 if (type == SUBSCRIPTION_PERSISTENCE_CREATED
614                         || type == SUBSCRIPTION_PERSISTENCE_RECREATED) {
615                         if (rdata->msg_info.msg_buf) {
616                                 ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
617                                                 MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
618                         } else {
619                                 ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
620                                                 sizeof(sub_tree->persistence->packet));
621                         }
622                 }
623                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
624                                 sizeof(sub_tree->persistence->src_name));
625                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
626                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
627                         sizeof(sub_tree->persistence->transport_key));
628                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
629                         sizeof(sub_tree->persistence->local_name));
630                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
631         }
632
633         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
634 }
635
636 /*! \brief Function which removes persistence of a subscription from sorcery */
637 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
638 {
639         if (!sub_tree->persistence) {
640                 return;
641         }
642
643         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
644         ao2_ref(sub_tree->persistence, -1);
645         sub_tree->persistence = NULL;
646 }
647
648
649 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
650 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
651                 size_t num_accept, const char *body_type);
652
653 /*! \brief Retrieve a handler using the Event header of an rdata message */
654 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
655 {
656         pjsip_event_hdr *event_header;
657         char event[32];
658         struct ast_sip_subscription_handler *handler;
659
660         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
661         if (!event_header) {
662                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
663                 return NULL;
664         }
665         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
666
667         handler = find_sub_handler_for_event_name(event);
668         if (!handler) {
669                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
670         }
671
672         return handler;
673 }
674
675 /*!
676  * \brief Accept headers that are exceptions to the rule
677  *
678  * Typically, when a SUBSCRIBE arrives, we attempt to find a
679  * body generator that matches one of the Accept headers in
680  * the request. When subscribing to a single resource, this works
681  * great. However, when subscribing to a list, things work
682  * differently. Most Accept header values are fine, but there
683  * are a couple that are endemic to resource lists that need
684  * to be ignored when searching for a body generator to use
685  * for the individual resources of the subscription.
686  */
687 const char *accept_exceptions[] =  {
688         "multipart/related",
689         "application/rlmi+xml",
690 };
691
692 /*!
693  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
694  *
695  * \retval 1 This Accept header value is an exception to the rule.
696  * \retval 0 This Accept header is not an exception to the rule.
697  */
698 static int exceptional_accept(const pj_str_t *accept)
699 {
700         int i;
701
702         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
703                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
704                         return 1;
705                 }
706         }
707
708         return 0;
709 }
710
711 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
712 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
713         const struct ast_sip_subscription_handler *handler)
714 {
715         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
716         char accept[AST_SIP_MAX_ACCEPT][64];
717         size_t num_accept_headers = 0;
718
719         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
720                 int i;
721
722                 for (i = 0; i < accept_header->count; ++i) {
723                         if (!exceptional_accept(&accept_header->values[i])) {
724                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
725                                 ++num_accept_headers;
726                         }
727                 }
728         }
729
730         if (num_accept_headers == 0) {
731                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
732                  * the default accept type for the event package is to be used.
733                  */
734                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
735                 num_accept_headers = 1;
736         }
737
738         return find_body_generator(accept, num_accept_headers, handler->body_type);
739 }
740
741 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
742  *
743  *  \retval 1 rdata has an eventlist containing supported header
744  *  \retval 0 rdata doesn't have an eventlist containing supported header
745  */
746 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
747 {
748         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
749
750         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
751                 int i;
752
753                 for (i = 0; i < supported_header->count; i++) {
754                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
755                                 return 1;
756                         }
757                 }
758         }
759
760         return 0;
761 }
762
763 struct resource_tree;
764
765 /*!
766  * \brief A node for a resource tree.
767  */
768 struct tree_node {
769         AST_VECTOR(, struct tree_node *) children;
770         unsigned int full_state;
771         char resource[0];
772 };
773
774 /*!
775  * \brief Helper function for retrieving a resource list for a given event.
776  *
777  * This will retrieve a resource list that corresponds to the resource and event provided.
778  *
779  * \param resource The name of the resource list to retrieve
780  * \param event The expected event name on the resource list
781  */
782 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
783 {
784         struct resource_list *list;
785
786         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
787         if (!list) {
788                 return NULL;
789         }
790
791         if (strcmp(list->event, event)) {
792                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
793                                 resource, list->event, event);
794                 ao2_cleanup(list);
795                 return NULL;
796         }
797
798         return list;
799 }
800
801 /*!
802  * \brief Allocate a tree node
803  *
804  * In addition to allocating and initializing the tree node, the node is also added
805  * to the vector of visited resources. See \ref build_resource_tree for more information
806  * on the visited resources.
807  *
808  * \param resource The name of the resource for this tree node.
809  * \param visited The vector of resources that have been visited.
810  * \param if allocating a list, indicate whether full state is requested in notifications.
811  * \retval NULL Allocation failure.
812  * \retval non-NULL The newly-allocated tree_node
813  */
814 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
815 {
816         struct tree_node *node;
817
818         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
819         if (!node) {
820                 return NULL;
821         }
822
823         strcpy(node->resource, resource);
824         if (AST_VECTOR_INIT(&node->children, 4)) {
825                 ast_free(node);
826                 return NULL;
827         }
828         node->full_state = full_state;
829
830         if (visited) {
831                 AST_VECTOR_APPEND(visited, resource);
832         }
833         return node;
834 }
835
836 /*!
837  * \brief Destructor for a tree node
838  *
839  * This function calls recursively in order to destroy
840  * all nodes lower in the tree from the given node in
841  * addition to the node itself.
842  *
843  * \param node The node to destroy.
844  */
845 static void tree_node_destroy(struct tree_node *node)
846 {
847         int i;
848         if (!node) {
849                 return;
850         }
851
852         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
853                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
854         }
855         AST_VECTOR_FREE(&node->children);
856         ast_free(node);
857 }
858
859 /*!
860  * \brief Determine if this resource has been visited already
861  *
862  * See \ref build_resource_tree for more information
863  *
864  * \param resource The resource currently being visited
865  * \param visited The resources that have previously been visited
866  */
867 static int have_visited(const char *resource, struct resources *visited)
868 {
869         int i;
870
871         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
872                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
873                         return 1;
874                 }
875         }
876
877         return 0;
878 }
879
880 /*!
881  * \brief Build child nodes for a given parent.
882  *
883  * This iterates through the items on a resource list and creates tree nodes for each one. The
884  * tree nodes created are children of the supplied parent node. If an item in the resource
885  * list is itself a list, then this function is called recursively to provide children for
886  * the the new node.
887  *
888  * If an item in a resource list is not a list, then the supplied subscription handler is
889  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
890  * is used to determine if the node can be added to the tree or not.
891  *
892  * If a parent node ends up having no child nodes added under it, then the parent node is
893  * pruned from the tree.
894  *
895  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
896  * \param handler The subscription handler for leaf nodes in the tree.
897  * \param list The configured resource list from which the child node is being built.
898  * \param parent The parent node for these children.
899  * \param visited The resources that have already been visited.
900  */
901 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
902                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
903 {
904         int i;
905
906         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
907                 struct tree_node *current;
908                 struct resource_list *child_list;
909                 const char *resource = AST_VECTOR_GET(&list->items, i);
910
911                 if (have_visited(resource, visited)) {
912                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
913                         continue;
914                 }
915
916                 child_list = retrieve_resource_list(resource, list->event);
917                 if (!child_list) {
918                         int resp = handler->notifier->new_subscribe(endpoint, resource);
919                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
920                                 current = tree_node_alloc(resource, visited, 0);
921                                 if (!current) {
922                                         ast_debug(1,
923                                                 "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n",
924                                                 resource);
925                                         continue;
926                                 }
927                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
928                                                 resource, parent->resource);
929                                 AST_VECTOR_APPEND(&parent->children, current);
930                         } else {
931                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
932                                                 resource, resp);
933                         }
934                 } else {
935                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
936                         current = tree_node_alloc(resource, visited, child_list->full_state);
937                         if (!current) {
938                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
939                                 continue;
940                         }
941                         build_node_children(endpoint, handler, child_list, current, visited);
942                         if (AST_VECTOR_SIZE(&current->children) > 0) {
943                                 ast_debug(1, "List %s had no successful children.\n", resource);
944                                 AST_VECTOR_APPEND(&parent->children, current);
945                         } else {
946                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
947                                                 resource, parent->resource);
948                                 tree_node_destroy(current);
949                         }
950                         ao2_cleanup(child_list);
951                 }
952         }
953 }
954
955 /*!
956  * \brief A resource tree
957  *
958  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
959  * be a resource list. If this is the case, the resource list may contain resources
960  * that are themselves lists. The structure needed to hold the resources is
961  * a tree.
962  *
963  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
964  * to the individual resources in the tree would be successful or not. Any successful
965  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
966  * result in no node being created.
967  *
968  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
969  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
970  */
971 struct resource_tree {
972         struct tree_node *root;
973         unsigned int notification_batch_interval;
974 };
975
976 /*!
977  * \brief Destroy a resource tree.
978  *
979  * This function makes no assumptions about how the tree itself was
980  * allocated and does not attempt to free the tree itself. Callers
981  * of this function are responsible for freeing the tree.
982  *
983  * \param tree The tree to destroy.
984  */
985 static void resource_tree_destroy(struct resource_tree *tree)
986 {
987         if (tree) {
988                 tree_node_destroy(tree->root);
989         }
990 }
991
992 /*!
993  * \brief Build a resource tree
994  *
995  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
996  *
997  * This function also creates a container that has all resources that have been visited during
998  * creation of the tree, whether those resources resulted in a tree node being created or not.
999  * Keeping this container of visited resources allows for misconfigurations such as loops in
1000  * the tree or duplicated resources to be detected.
1001  *
1002  * \param endpoint The endpoint that sent the SUBSCRIBE request.
1003  * \param handler The subscription handler for leaf nodes in the tree.
1004  * \param resource The resource requested in the SUBSCRIBE request.
1005  * \param tree The tree that is to be built.
1006  * \param has_eventlist_support
1007  *
1008  * \retval 200-299 Successfully subscribed to at least one resource.
1009  * \retval 300-699 Failure to subscribe to requested resource.
1010  */
1011 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
1012                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
1013 {
1014         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
1015         struct resources visited;
1016
1017         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
1018                 ast_debug(2, "Subscription '%s->%s' is not to a list\n",
1019                         ast_sorcery_object_get_id(endpoint), resource);
1020                 tree->root = tree_node_alloc(resource, NULL, 0);
1021                 if (!tree->root) {
1022                         return 500;
1023                 }
1024                 return handler->notifier->new_subscribe(endpoint, resource);
1025         }
1026
1027         ast_debug(2, "Subscription '%s->%s' is a list\n",
1028                 ast_sorcery_object_get_id(endpoint), resource);
1029         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
1030                 return 500;
1031         }
1032
1033         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1034         if (!tree->root) {
1035                 AST_VECTOR_FREE(&visited);
1036                 return 500;
1037         }
1038
1039         tree->notification_batch_interval = list->notification_batch_interval;
1040
1041         build_node_children(endpoint, handler, list, tree->root, &visited);
1042         AST_VECTOR_FREE(&visited);
1043
1044         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1045                 return 200;
1046         } else {
1047                 return 500;
1048         }
1049 }
1050
1051 static void add_subscription(struct sip_subscription_tree *obj)
1052 {
1053         AST_RWLIST_WRLOCK(&subscriptions);
1054         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1055         AST_RWLIST_UNLOCK(&subscriptions);
1056 }
1057
1058 static void remove_subscription(struct sip_subscription_tree *obj)
1059 {
1060         struct sip_subscription_tree *i;
1061
1062         AST_RWLIST_WRLOCK(&subscriptions);
1063         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1064                 if (i == obj) {
1065                         AST_RWLIST_REMOVE_CURRENT(next);
1066                         if (i->root) {
1067                                 ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n",
1068                                         ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root));
1069                         }
1070                         break;
1071                 }
1072         }
1073         AST_RWLIST_TRAVERSE_SAFE_END;
1074         AST_RWLIST_UNLOCK(&subscriptions);
1075 }
1076
1077 static void destroy_subscription(struct ast_sip_subscription *sub)
1078 {
1079         ast_debug(3, "Destroying SIP subscription from '%s->%s'\n",
1080                 ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource);
1081         ast_free(sub->body_text);
1082
1083         AST_VECTOR_FREE(&sub->children);
1084         ao2_cleanup(sub->datastores);
1085         ast_free(sub);
1086 }
1087
1088 static void destroy_subscriptions(struct ast_sip_subscription *root)
1089 {
1090         int i;
1091
1092         if (!root) {
1093                 return;
1094         }
1095
1096         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1097                 struct ast_sip_subscription *child;
1098
1099                 child = AST_VECTOR_GET(&root->children, i);
1100                 destroy_subscriptions(child);
1101         }
1102
1103         destroy_subscription(root);
1104 }
1105
1106 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1107                 const char *resource, struct sip_subscription_tree *tree)
1108 {
1109         struct ast_sip_subscription *sub;
1110         pjsip_sip_uri *contact_uri;
1111
1112         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1113         if (!sub) {
1114                 return NULL;
1115         }
1116         strcpy(sub->resource, resource); /* Safe */
1117
1118         sub->datastores = ast_datastores_alloc();
1119         if (!sub->datastores) {
1120                 destroy_subscription(sub);
1121                 return NULL;
1122         }
1123
1124         sub->body_text = ast_str_create(128);
1125         if (!sub->body_text) {
1126                 destroy_subscription(sub);
1127                 return NULL;
1128         }
1129
1130         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1131         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1132         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1133         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1134
1135         sub->handler = handler;
1136         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1137         sub->tree = ao2_bump(tree);
1138
1139         return sub;
1140 }
1141
1142 /*!
1143  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1144  *
1145  * \param handler The handler to supply to leaf subscriptions.
1146  * \param resource The requested resource for this subscription.
1147  * \param generator Body generator to use for leaf subscriptions.
1148  * \param tree The root of the subscription tree.
1149  * \param current The tree node that corresponds to the subscription being created.
1150  */
1151 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1152                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1153                 struct sip_subscription_tree *tree, struct tree_node *current)
1154 {
1155         int i;
1156         struct ast_sip_subscription *sub;
1157
1158         sub = allocate_subscription(handler, resource, tree);
1159         if (!sub) {
1160                 return NULL;
1161         }
1162
1163         sub->full_state = current->full_state;
1164         sub->body_generator = generator;
1165         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1166
1167         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1168                 struct ast_sip_subscription *child;
1169                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1170
1171                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1172                                 tree, child_node);
1173
1174                 if (!child) {
1175                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1176                                         child_node->resource);
1177                         continue;
1178                 }
1179
1180                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1181                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1182                                         child_node->resource);
1183                 }
1184         }
1185
1186         return sub;
1187 }
1188
1189 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1190 {
1191         int i;
1192
1193         if (!sub) {
1194                 return;
1195         }
1196
1197         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1198                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1199                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1200                 }
1201                 return;
1202         }
1203
1204         /* We notify subscription shutdown only on the tree leaves. */
1205         if (sub->handler->subscription_shutdown) {
1206                 sub->handler->subscription_shutdown(sub);
1207         }
1208 }
1209 static int subscription_unreference_dialog(void *obj)
1210 {
1211         struct sip_subscription_tree *sub_tree = obj;
1212
1213         /* This is why we keep the dialog on the subscription. When the subscription
1214          * is destroyed, there is no guarantee that the underlying dialog is ready
1215          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1216          * either. The dialog could be destroyed before our subscription is. We fix
1217          * this problem by keeping a reference to the dialog until it is time to
1218          * destroy the subscription. We need to have the dialog available when the
1219          * subscription is destroyed so that we can guarantee that our attempt to
1220          * remove the serializer will be successful.
1221          */
1222         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1223         sub_tree->dlg = NULL;
1224
1225         return 0;
1226 }
1227
1228 static void subscription_tree_destructor(void *obj)
1229 {
1230         struct sip_subscription_tree *sub_tree = obj;
1231
1232         ast_debug(3, "Destroying subscription tree %p '%s->%s'\n",
1233                 sub_tree,
1234                 sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown",
1235                 sub_tree->root ? sub_tree->root->resource : "Unknown");
1236
1237         ao2_cleanup(sub_tree->endpoint);
1238
1239         destroy_subscriptions(sub_tree->root);
1240
1241         if (sub_tree->dlg) {
1242                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1243         }
1244
1245         ast_taskprocessor_unreference(sub_tree->serializer);
1246         ast_module_unref(ast_module_info->self);
1247 }
1248
1249 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1250 {
1251         ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n",
1252                 sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree);
1253         ao2_cleanup(sub->tree);
1254 }
1255
1256 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1257 {
1258         sub_tree->dlg = dlg;
1259         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1260         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1261         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1262         pjsip_dlg_inc_session(dlg, &pubsub_module);
1263 }
1264
1265 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1266 {
1267         struct sip_subscription_tree *sub_tree;
1268
1269         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1270         if (!sub_tree) {
1271                 return NULL;
1272         }
1273
1274         ast_module_ref(ast_module_info->self);
1275
1276         if (rdata) {
1277                 /*
1278                  * We must continue using the serializer that the original
1279                  * SUBSCRIBE came in on for the dialog.  There may be
1280                  * retransmissions already enqueued in the original
1281                  * serializer that can result in reentrancy and message
1282                  * sequencing problems.
1283                  */
1284                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1285         } else {
1286                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1287
1288                 /* Create name with seq number appended. */
1289                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1290                         ast_sorcery_object_get_id(endpoint));
1291
1292                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1293         }
1294         if (!sub_tree->serializer) {
1295                 ao2_ref(sub_tree, -1);
1296                 return NULL;
1297         }
1298
1299         sub_tree->endpoint = ao2_bump(endpoint);
1300         sub_tree->notify_sched_id = -1;
1301
1302         return sub_tree;
1303 }
1304
1305 /*!
1306  * \brief Create a subscription tree based on a resource tree.
1307  *
1308  * Using the previously-determined valid resources in the provided resource tree,
1309  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1310  * subscription tree is a real subscription, and the rest in the tree are
1311  * virtual subscriptions.
1312  *
1313  * \param handler The handler to use for leaf subscriptions
1314  * \param endpoint The endpoint that sent the SUBSCRIBE request
1315  * \param rdata The SUBSCRIBE content
1316  * \param resource The requested resource in the SUBSCRIBE request
1317  * \param generator The body generator to use in leaf subscriptions
1318  * \param tree The resource tree on which the subscription tree is based
1319  * \param dlg_status[out] The result of attempting to create a dialog.
1320  *
1321  * \retval NULL Could not create the subscription tree
1322  * \retval non-NULL The root of the created subscription tree
1323  */
1324
1325 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1326                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1327                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1328                 pj_status_t *dlg_status)
1329 {
1330         struct sip_subscription_tree *sub_tree;
1331         pjsip_dialog *dlg;
1332         struct subscription_persistence *persistence;
1333
1334         sub_tree = allocate_subscription_tree(endpoint, rdata);
1335         if (!sub_tree) {
1336                 *dlg_status = PJ_ENOMEM;
1337                 return NULL;
1338         }
1339         sub_tree->role = AST_SIP_NOTIFIER;
1340
1341         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1342         if (!dlg) {
1343                 if (*dlg_status != PJ_EEXISTS) {
1344                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1345                 }
1346                 ao2_ref(sub_tree, -1);
1347                 return NULL;
1348         }
1349
1350         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1351                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1352         if (persistence) {
1353                 /* Update the created dialog with the persisted information */
1354                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1355                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1356                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1357                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1358                 dlg->local.cseq = persistence->cseq;
1359         }
1360
1361         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1362         subscription_setup_dialog(sub_tree, dlg);
1363
1364 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1365         pjsip_evsub_add_ref(sub_tree->evsub);
1366 #endif
1367
1368         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1369                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1370
1371         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1372
1373         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1374         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1375                 sub_tree->is_list = 1;
1376         }
1377
1378         add_subscription(sub_tree);
1379
1380         return sub_tree;
1381 }
1382
1383 /*! Wrapper structure for initial_notify_task */
1384 struct initial_notify_data {
1385         struct sip_subscription_tree *sub_tree;
1386         int expires;
1387 };
1388
1389 static int initial_notify_task(void *obj);
1390 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1391
1392 /*! Persistent subscription recreation continuation under distributor serializer data */
1393 struct persistence_recreate_data {
1394         struct subscription_persistence *persistence;
1395         pjsip_rx_data *rdata;
1396 };
1397
1398 /*!
1399  * \internal
1400  * \brief subscription_persistence_recreate continuation under distributor serializer.
1401  * \since 13.10.0
1402  *
1403  * \retval 0 on success.
1404  * \retval -1 on error.
1405  */
1406 static int sub_persistence_recreate(void *obj)
1407 {
1408         struct persistence_recreate_data *recreate_data = obj;
1409         struct subscription_persistence *persistence = recreate_data->persistence;
1410         pjsip_rx_data *rdata = recreate_data->rdata;
1411         struct ast_sip_endpoint *endpoint;
1412         struct sip_subscription_tree *sub_tree;
1413         struct ast_sip_pubsub_body_generator *generator;
1414         struct ast_sip_subscription_handler *handler;
1415         char *resource;
1416         pjsip_sip_uri *request_uri;
1417         size_t resource_size;
1418         int resp;
1419         struct resource_tree tree;
1420         pjsip_expires_hdr *expires_header;
1421
1422         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1423         resource_size = pj_strlen(&request_uri->user) + 1;
1424         resource = ast_alloca(resource_size);
1425         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1426
1427         /*
1428          * We may want to match without any user options getting
1429          * in the way.
1430          */
1431         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
1432
1433         handler = subscription_get_handler_from_rdata(rdata);
1434         if (!handler || !handler->notifier) {
1435                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1436                         persistence->endpoint);
1437                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1438                 return 0;
1439         }
1440
1441         generator = subscription_get_generator_from_rdata(rdata, handler);
1442         if (!generator) {
1443                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1444                         persistence->endpoint);
1445                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1446                 return 0;
1447         }
1448
1449         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1450                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1451
1452         /* Getting the endpoint may take some time that can affect the expiration. */
1453         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1454                 persistence->endpoint);
1455         if (!endpoint) {
1456                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1457                         persistence->endpoint);
1458                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1459                 ao2_ref(endpoint, -1);
1460                 return 0;
1461         }
1462
1463         /* Update the expiration header with the new expiration */
1464         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1465                 rdata->msg_info.msg->hdr.next);
1466         if (!expires_header) {
1467                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1468                 if (!expires_header) {
1469                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1470                                 persistence->endpoint);
1471                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1472                         ao2_ref(endpoint, -1);
1473                         return 0;
1474                 }
1475                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1476         }
1477
1478         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1479         if (expires_header->ivalue <= 0) {
1480                 /* The subscription expired since we started recreating the subscription. */
1481                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1482                         persistence->endpoint, persistence->tag);
1483                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1484                 ao2_ref(endpoint, -1);
1485                 return 0;
1486         }
1487
1488         memset(&tree, 0, sizeof(tree));
1489         resp = build_resource_tree(endpoint, handler, resource, &tree,
1490                 ast_sip_pubsub_has_eventlist_support(rdata));
1491         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1492                 pj_status_t dlg_status;
1493
1494                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1495                         &tree, &dlg_status);
1496                 if (!sub_tree) {
1497                         if (dlg_status != PJ_EEXISTS) {
1498                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1499                                         persistence->endpoint);
1500                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1501                         }
1502                 } else {
1503                         struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
1504
1505                         if (!ind) {
1506                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1507                                 goto error;
1508                         }
1509
1510                         ind->sub_tree = ao2_bump(sub_tree);
1511                         ind->expires = expires_header->ivalue;
1512
1513                         sub_tree->persistence = ao2_bump(persistence);
1514                         subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED);
1515                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
1516                                 /* Could not send initial subscribe NOTIFY */
1517                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1518                                 ao2_ref(sub_tree, -1);
1519                                 ast_free(ind);
1520                         }
1521                 }
1522         } else {
1523                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1524         }
1525
1526 error:
1527         resource_tree_destroy(&tree);
1528         ao2_ref(endpoint, -1);
1529
1530         return 0;
1531 }
1532
1533 /*! \brief Callback function to perform the actual recreation of a subscription */
1534 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1535 {
1536         struct subscription_persistence *persistence = obj;
1537         pj_pool_t *pool = arg;
1538         struct ast_taskprocessor *serializer;
1539         pjsip_rx_data rdata;
1540         struct persistence_recreate_data recreate_data;
1541
1542         /* If this subscription has already expired remove it */
1543         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1544                 ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n",
1545                         persistence->endpoint, persistence->tag);
1546                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1547                 return 0;
1548         }
1549
1550         memset(&rdata, 0, sizeof(rdata));
1551         pj_pool_reset(pool);
1552         rdata.tp_info.pool = pool;
1553
1554         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1555                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1556                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1557                         persistence->endpoint);
1558                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1559                 return 0;
1560         }
1561
1562         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1563                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1564                         persistence->endpoint);
1565                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1566                 return 0;
1567         }
1568
1569         /* Continue the remainder in the distributor serializer */
1570         serializer = ast_sip_get_distributor_serializer(&rdata);
1571         if (!serializer) {
1572                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1573                         persistence->endpoint);
1574                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1575                 return 0;
1576         }
1577         recreate_data.persistence = persistence;
1578         recreate_data.rdata = &rdata;
1579         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1580                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1581                         persistence->endpoint);
1582                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1583         }
1584         ast_taskprocessor_unreference(serializer);
1585
1586         return 0;
1587 }
1588
1589 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1590 static int subscription_persistence_load(void *data)
1591 {
1592         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1593                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1594         pj_pool_t *pool;
1595
1596         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1597                 PJSIP_POOL_RDATA_INC);
1598         if (!pool) {
1599                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1600                 return 0;
1601         }
1602
1603         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1604
1605         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1606
1607         ao2_ref(persisted_subscriptions, -1);
1608         return 0;
1609 }
1610
1611 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1612 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1613 {
1614         struct ast_json_payload *payload;
1615         const char *type;
1616
1617         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1618                 return;
1619         }
1620
1621         payload = stasis_message_data(message);
1622         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1623
1624         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1625          * recreate SIP subscriptions.
1626          */
1627         if (strcmp(type, "FullyBooted")) {
1628                 return;
1629         }
1630
1631         /* This has to be here so the subscription is recreated when the body generator is available */
1632         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1633
1634         /* Once the system is fully booted we don't care anymore */
1635         stasis_unsubscribe(sub);
1636 }
1637
1638 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1639
1640 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1641 {
1642         int num = 0;
1643         struct sip_subscription_tree *i;
1644
1645         if (!on_subscription) {
1646                 return num;
1647         }
1648
1649         AST_RWLIST_RDLOCK(&subscriptions);
1650         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1651                 if (on_subscription(i, arg)) {
1652                         break;
1653                 }
1654                 ++num;
1655         }
1656         AST_RWLIST_UNLOCK(&subscriptions);
1657         return num;
1658 }
1659
1660 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1661                                     struct ast_str **buf)
1662 {
1663         char str[256];
1664         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1665
1666         ast_str_append(buf, 0, "Role: %s\r\n",
1667                        sip_subscription_roles_map[sub_tree->role]);
1668         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1669                        ast_sorcery_object_get_id(sub_tree->endpoint));
1670
1671         if (sub_tree->dlg) {
1672                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1673         } else {
1674                 ast_copy_string(str, "<unknown>", sizeof(str));
1675         }
1676         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1677
1678         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1679
1680         ast_callerid_merge(str, sizeof(str),
1681                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1682                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1683                            "Unknown");
1684
1685         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1686
1687         /* XXX This needs to be done recursively for lists */
1688         if (sub_tree->root->handler->to_ami) {
1689                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1690         }
1691 }
1692
1693
1694 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1695 {
1696         pjsip_dialog *dlg;
1697         pjsip_msg *msg;
1698         pj_str_t name;
1699
1700         dlg = sub->tree->dlg;
1701         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1702         pj_cstr(&name, header);
1703
1704         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1705 }
1706
1707 /*!
1708  * \internal
1709  * \brief Wrapper for pjsip_evsub_send_request
1710  *
1711  * This function (re)sets the transport before sending to catch cases
1712  * where the transport might have changed.
1713  *
1714  * If pjproject gives us the ability to resend, we'll only reset the transport
1715  * if PJSIP_ETPNOTAVAIL is returned from send.
1716  *
1717  * \returns pj_status_t
1718  */
1719 static pj_status_t internal_pjsip_evsub_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1720 {
1721         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
1722
1723         ast_sip_set_tpselector_from_transport_name(sub_tree->endpoint->transport, &selector);
1724         pjsip_dlg_set_transport(sub_tree->dlg, &selector);
1725
1726         return pjsip_evsub_send_request(sub_tree->evsub, tdata);
1727 }
1728
1729 /* XXX This function is not used. */
1730 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1731                 struct ast_sip_endpoint *endpoint, const char *resource)
1732 {
1733         struct ast_sip_subscription *sub;
1734         pjsip_dialog *dlg;
1735         struct ast_sip_contact *contact;
1736         pj_str_t event;
1737         pjsip_tx_data *tdata;
1738         pjsip_evsub *evsub;
1739         struct sip_subscription_tree *sub_tree = NULL;
1740
1741         sub_tree = allocate_subscription_tree(endpoint, NULL);
1742         if (!sub_tree) {
1743                 return NULL;
1744         }
1745
1746         sub = allocate_subscription(handler, resource, sub_tree);
1747         if (!sub) {
1748                 ao2_cleanup(sub_tree);
1749                 return NULL;
1750         }
1751
1752         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1753         if (!contact || ast_strlen_zero(contact->uri)) {
1754                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1755                                 ast_sorcery_object_get_id(endpoint));
1756                 ao2_ref(sub_tree, -1);
1757                 ao2_cleanup(contact);
1758                 return NULL;
1759         }
1760
1761         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1762         ao2_cleanup(contact);
1763         if (!dlg) {
1764                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1765                 ao2_ref(sub_tree, -1);
1766                 return NULL;
1767         }
1768
1769         pj_cstr(&event, handler->event_name);
1770         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1771         subscription_setup_dialog(sub_tree, dlg);
1772
1773         evsub = sub_tree->evsub;
1774
1775         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1776                 internal_pjsip_evsub_send_request(sub_tree, tdata);
1777         } else {
1778                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1779                  * being called and terminating the subscription. Therefore, we don't
1780                  * need to decrease the reference count of sub here.
1781                  */
1782                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1783                 ao2_ref(sub_tree, -1);
1784                 return NULL;
1785         }
1786
1787         add_subscription(sub_tree);
1788
1789         return sub;
1790 }
1791
1792 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1793 {
1794         ast_assert(sub->tree->dlg != NULL);
1795         return sub->tree->dlg;
1796 }
1797
1798 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1799 {
1800         ast_assert(sub->tree->endpoint != NULL);
1801         return ao2_bump(sub->tree->endpoint);
1802 }
1803
1804 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1805 {
1806         ast_assert(sub->tree->serializer != NULL);
1807         return sub->tree->serializer;
1808 }
1809
1810 /*!
1811  * \brief Pre-allocate a buffer for the transmission
1812  *
1813  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1814  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1815  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1816  * packet, then we get told the message is too long to be sent.
1817  *
1818  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1819  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1820  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1821  * if the message will fit, and resizing the buffer as required.
1822  *
1823  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1824  * it at 64000 for a couple of reasons:
1825  * 1) Allocating more than 64K at a time is hard to justify
1826  * 2) If the message goes through proxies, those proxies will want to add Via and
1827  *    Record-Route headers, making the message even larger. Giving some space for
1828  *    those headers is a nice thing to do.
1829  *
1830  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1831  * going to impose the same 64K limit as a memory savings.
1832  *
1833  * \param tdata The tdata onto which to allocate a buffer
1834  * \retval 0 Success
1835  * \retval -1 The message is too large
1836  */
1837 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1838 {
1839         int buf_size;
1840         int size = -1;
1841         char *buf;
1842
1843         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1844                 buf = pj_pool_alloc(tdata->pool, buf_size);
1845                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1846         }
1847
1848         if (size == -1) {
1849                 return -1;
1850         }
1851
1852         tdata->buf.start = buf;
1853         tdata->buf.cur = tdata->buf.start;
1854         tdata->buf.end = tdata->buf.start + buf_size;
1855
1856         return 0;
1857 }
1858
1859 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1860 {
1861 #ifdef TEST_FRAMEWORK
1862         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1863         pjsip_evsub *evsub = sub_tree->evsub;
1864 #endif
1865         int res;
1866
1867         if (allocate_tdata_buffer(tdata)) {
1868                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1869                 pjsip_tx_data_dec_ref(tdata);
1870                 return -1;
1871         }
1872
1873         res = internal_pjsip_evsub_send_request(sub_tree, tdata);
1874
1875         subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST);
1876
1877         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1878                 "StateText: %s\r\n"
1879                 "Endpoint: %s\r\n",
1880                 pjsip_evsub_get_state_name(evsub),
1881                 ast_sorcery_object_get_id(endpoint));
1882
1883         return (res == PJ_SUCCESS ? 0 : -1);
1884 }
1885
1886 /*!
1887  * \brief Add a resource XML element to an RLMI body
1888  *
1889  * Each resource element represents a subscribed resource in the list. This function currently
1890  * will unconditionally add an instance element to each created resource element. Instance
1891  * elements refer to later parts in the multipart body.
1892  *
1893  * \param pool PJLIB allocation pool
1894  * \param cid Content-ID header of the resource
1895  * \param resource_name Name of the resource
1896  * \param resource_uri URI of the resource
1897  * \param state State of the subscribed resource
1898  */
1899 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1900                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1901 {
1902         static pj_str_t cid_name = { "cid", 3 };
1903         pj_xml_node *resource;
1904         pj_xml_node *name;
1905         pj_xml_node *instance;
1906         pj_xml_attr *cid_attr;
1907         char id[6];
1908         char uri[PJSIP_MAX_URL_SIZE];
1909
1910         /* This creates a string representing the Content-ID without the enclosing < > */
1911         const pj_str_t cid_stripped = {
1912                 .ptr = cid->hvalue.ptr + 1,
1913                 .slen = cid->hvalue.slen - 2,
1914         };
1915
1916         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1917         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1918         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1919
1920         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1921         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1922
1923         pj_strdup2(pool, &name->content, resource_name);
1924
1925         ast_generate_random_string(id, sizeof(id));
1926
1927         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1928         ast_sip_presence_xml_create_attr(pool, instance, "state",
1929                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1930
1931         /* Use the PJLIB-util XML library directly here since we are using a
1932          * pj_str_t
1933          */
1934
1935         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1936         pj_xml_add_attr(instance, cid_attr);
1937 }
1938
1939 /*!
1940  * \brief A multipart body part and meta-information
1941  *
1942  * When creating a multipart body part, the end result (the
1943  * pjsip_multipart_part) is hard to inspect without undoing
1944  * a lot of what was done to create it. Therefore, we use this
1945  * structure to store meta-information about the body part.
1946  *
1947  * The main consumer of this is the creator of the RLMI body
1948  * part of a multipart resource list body.
1949  */
1950 struct body_part {
1951         /*! Content-ID header for the body part */
1952         pjsip_generic_string_hdr *cid;
1953         /*! Subscribed resource represented in the body part */
1954         const char *resource;
1955         /*! URI for the subscribed body part */
1956         pjsip_sip_uri *uri;
1957         /*! Subscription state of the resource represented in the body part */
1958         pjsip_evsub_state state;
1959         /*! The actual body part that will be present in the multipart body */
1960         pjsip_multipart_part *part;
1961 };
1962
1963 /*!
1964  * \brief Type declaration for container of body part structures
1965  */
1966 AST_VECTOR(body_part_list, struct body_part *);
1967
1968 /*!
1969  * \brief Create a Content-ID header
1970  *
1971  * Content-ID headers are required by RFC2387 for multipart/related
1972  * bodies. They serve as identifiers for each part of the multipart body.
1973  *
1974  * \param pool PJLIB allocation pool
1975  * \param sub Subscription to a resource
1976  */
1977 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1978                 const struct ast_sip_subscription *sub)
1979 {
1980         static const pj_str_t cid_name = { "Content-ID", 10 };
1981         pjsip_generic_string_hdr *cid;
1982         char id[6];
1983         size_t alloc_size;
1984         pj_str_t cid_value;
1985
1986         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1987         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1988         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1989         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1990                         ast_generate_random_string(id, sizeof(id)),
1991                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1992         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1993
1994         return cid;
1995 }
1996
1997 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1998 {
1999         int num_printed;
2000         pj_xml_node *rlmi = msg_body->data;
2001
2002         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
2003         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
2004                 return -1;
2005         }
2006
2007         return num_printed;
2008 }
2009
2010 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
2011 {
2012         const pj_xml_node *rlmi = data;
2013
2014         return pj_xml_clone(pool, rlmi);
2015 }
2016
2017 /*!
2018  * \brief Create an RLMI body part for a multipart resource list body
2019  *
2020  * RLMI (Resource list meta information) is a special body type that lists
2021  * the subscribed resources and tells subscribers the number of subscribed
2022  * resources and what other body parts are in the multipart body. The
2023  * RLMI body also has a version number that a subscriber can use to ensure
2024  * that the locally-stored state corresponds to server state.
2025  *
2026  * \param pool The allocation pool
2027  * \param sub The subscription representing the subscribed resource list
2028  * \param body_parts A container of body parts that RLMI will refer to
2029  * \param full_state Indicates whether this is a full or partial state notification
2030  * \return The multipart part representing the RLMI body
2031  */
2032 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2033                 struct body_part_list *body_parts, unsigned int full_state)
2034 {
2035         static const pj_str_t rlmi_type = { "application", 11 };
2036         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
2037         pj_xml_node *rlmi;
2038         pj_xml_node *name;
2039         pjsip_multipart_part *rlmi_part;
2040         char version_str[32];
2041         char uri[PJSIP_MAX_URL_SIZE];
2042         pjsip_generic_string_hdr *cid;
2043         int i;
2044
2045         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
2046         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
2047
2048         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
2049         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
2050
2051         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
2052         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
2053         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
2054
2055         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
2056         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
2057
2058         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
2059                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
2060
2061                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
2062         }
2063
2064         rlmi_part = pjsip_multipart_create_part(pool);
2065
2066         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
2067         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
2068         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
2069         pj_list_init(&rlmi_part->body->content_type.param);
2070
2071         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2072         rlmi_part->body->clone_data = rlmi_clone_data;
2073         rlmi_part->body->print_body = rlmi_print_body;
2074
2075         cid = generate_content_id_hdr(pool, sub);
2076         pj_list_insert_before(&rlmi_part->hdr, cid);
2077
2078         return rlmi_part;
2079 }
2080
2081 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2082                 unsigned int force_full_state);
2083
2084 /*!
2085  * \brief Destroy a list of body parts
2086  *
2087  * \param parts The container of parts to destroy
2088  */
2089 static void free_body_parts(struct body_part_list *parts)
2090 {
2091         int i;
2092
2093         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2094                 struct body_part *part = AST_VECTOR_GET(parts, i);
2095                 ast_free(part);
2096         }
2097
2098         AST_VECTOR_FREE(parts);
2099 }
2100
2101 /*!
2102  * \brief Allocate and initialize a body part structure
2103  *
2104  * \param pool PJLIB allocation pool
2105  * \param sub Subscription representing a subscribed resource
2106  */
2107 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2108 {
2109         struct body_part *bp;
2110
2111         bp = ast_calloc(1, sizeof(*bp));
2112         if (!bp) {
2113                 return NULL;
2114         }
2115
2116         bp->cid = generate_content_id_hdr(pool, sub);
2117         bp->resource = sub->resource;
2118         bp->state = sub->subscription_state;
2119         bp->uri = sub->uri;
2120
2121         return bp;
2122 }
2123
2124 /*!
2125  * \brief Create a multipart body part for a subscribed resource
2126  *
2127  * \param pool PJLIB allocation pool
2128  * \param sub The subscription representing a subscribed resource
2129  * \param parts A vector of parts to append the created part to.
2130  * \param use_full_state Unused locally, but may be passed to other functions
2131  */
2132 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2133                 struct body_part_list *parts, unsigned int use_full_state)
2134 {
2135         struct body_part *bp;
2136         pjsip_msg_body *body;
2137
2138         bp = allocate_body_part(pool, sub);
2139         if (!bp) {
2140                 return;
2141         }
2142
2143         body = generate_notify_body(pool, sub, use_full_state);
2144         if (!body) {
2145                 /* Partial state was requested and the resource has not changed state */
2146                 ast_free(bp);
2147                 return;
2148         }
2149
2150         bp->part = pjsip_multipart_create_part(pool);
2151         bp->part->body = body;
2152         pj_list_insert_before(&bp->part->hdr, bp->cid);
2153
2154         AST_VECTOR_APPEND(parts, bp);
2155 }
2156
2157 /*!
2158  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2159  *
2160  * \param pool
2161  * \return The multipart message body
2162  */
2163 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2164 {
2165         pjsip_media_type media_type;
2166         pjsip_param *media_type_param;
2167         char boundary[6];
2168         pj_str_t pj_boundary;
2169
2170         pjsip_media_type_init2(&media_type, "multipart", "related");
2171
2172         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2173         pj_list_init(media_type_param);
2174
2175         pj_strdup2(pool, &media_type_param->name, "type");
2176         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2177
2178         pj_list_insert_before(&media_type.param, media_type_param);
2179
2180         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2181         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2182 }
2183
2184 /*!
2185  * \brief Create a resource list body for NOTIFY requests
2186  *
2187  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2188  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2189  * convey state of individual subscribed resources.
2190  *
2191  * \param pool PJLIB allocation pool
2192  * \param sub Subscription details from which to generate body
2193  * \param force_full_state If true, ignore resource list settings and send a full state notification
2194  * \return The generated multipart/related body
2195  */
2196 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2197                 unsigned int force_full_state)
2198 {
2199         int i;
2200         pjsip_multipart_part *rlmi_part;
2201         pjsip_msg_body *multipart;
2202         struct body_part_list body_parts;
2203         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2204
2205         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2206                 return NULL;
2207         }
2208
2209         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2210                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2211         }
2212
2213         /* This can happen if issuing partial state and no children of the list have changed state */
2214         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2215                 return NULL;
2216         }
2217
2218         multipart = create_multipart_body(pool);
2219
2220         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2221         if (!rlmi_part) {
2222                 return NULL;
2223         }
2224         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2225
2226         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2227                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2228         }
2229
2230         free_body_parts(&body_parts);
2231         return multipart;
2232 }
2233
2234 /*!
2235  * \brief Create the body for a NOTIFY request.
2236  *
2237  * \param pool The pool used for allocations
2238  * \param root The root of the subscription tree
2239  * \param force_full_state If true, ignore resource list settings and send a full state notification
2240  */
2241 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2242                 unsigned int force_full_state)
2243 {
2244         pjsip_msg_body *body;
2245
2246         if (AST_VECTOR_SIZE(&root->children) == 0) {
2247                 if (force_full_state || root->body_changed) {
2248                         /* Not a list. We've already generated the body and saved it on the subscription.
2249                          * Use that directly.
2250                          */
2251                         pj_str_t type;
2252                         pj_str_t subtype;
2253                         pj_str_t text;
2254
2255                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2256                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2257                         pj_cstr(&text, ast_str_buffer(root->body_text));
2258
2259                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2260                         root->body_changed = 0;
2261                 } else {
2262                         body = NULL;
2263                 }
2264         } else {
2265                 body = generate_list_body(pool, root, force_full_state);
2266         }
2267
2268         return body;
2269 }
2270
2271 /*!
2272  * \brief Shortcut method to create a Require: eventlist header
2273  */
2274 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2275 {
2276         pjsip_require_hdr *require;
2277
2278         require = pjsip_require_hdr_create(pool);
2279         pj_strdup2(pool, &require->values[0], "eventlist");
2280         require->count = 1;
2281
2282         return require;
2283 }
2284
2285 /*!
2286  * \brief Send a NOTIFY request to a subscriber
2287  *
2288  * \pre sub_tree->dlg is locked
2289  *
2290  * \param sub_tree The subscription tree representing the subscription
2291  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2292  * \retval 0 Success
2293  * \retval non-zero Failure
2294  */
2295 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2296 {
2297         pjsip_evsub *evsub = sub_tree->evsub;
2298         pjsip_tx_data *tdata;
2299
2300         if (ast_shutdown_final()
2301                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2302                 && sub_tree->persistence) {
2303                 return 0;
2304         }
2305
2306         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2307                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2308                 return -1;
2309         }
2310
2311         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2312         if (!tdata->msg->body) {
2313                 pjsip_tx_data_dec_ref(tdata);
2314                 return -1;
2315         }
2316
2317         if (sub_tree->is_list) {
2318                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2319                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2320         }
2321
2322         if (sip_subscription_send_request(sub_tree, tdata)) {
2323                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2324                 return -1;
2325         }
2326
2327         sub_tree->send_scheduled_notify = 0;
2328
2329         return 0;
2330 }
2331
2332 static int serialized_send_notify(void *userdata)
2333 {
2334         struct sip_subscription_tree *sub_tree = userdata;
2335         pjsip_dialog *dlg = sub_tree->dlg;
2336
2337         pjsip_dlg_inc_lock(dlg);
2338
2339         /* It's possible that between when the notification was scheduled
2340          * and now a new SUBSCRIBE arrived requiring full state to be
2341          * sent out in an immediate NOTIFY. It's also possible that we're
2342          * already processing a terminate.  If that has happened, we need to
2343          * bail out here instead of sending the batched NOTIFY.
2344          */
2345
2346         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2347                 || !sub_tree->send_scheduled_notify) {
2348                 pjsip_dlg_dec_lock(dlg);
2349                 ao2_cleanup(sub_tree);
2350                 return 0;
2351         }
2352
2353         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2354                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2355         }
2356
2357         send_notify(sub_tree, 0);
2358
2359         ast_test_suite_event_notify(
2360                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2361                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2362                 "Resource: %s", sub_tree->root->resource);
2363
2364         sub_tree->notify_sched_id = -1;
2365         pjsip_dlg_dec_lock(dlg);
2366         ao2_cleanup(sub_tree);
2367         return 0;
2368 }
2369
2370 static int sched_cb(const void *data)
2371 {
2372         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2373
2374         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2375         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2376                 ao2_cleanup(sub_tree);
2377         }
2378
2379         return 0;
2380 }
2381
2382 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2383 {
2384         /* There's already a notification scheduled */
2385         if (sub_tree->notify_sched_id > -1) {
2386                 return 0;
2387         }
2388
2389         sub_tree->send_scheduled_notify = 1;
2390         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2391         if (sub_tree->notify_sched_id < 0) {
2392                 ao2_cleanup(sub_tree);
2393                 return -1;
2394         }
2395
2396         return 0;
2397 }
2398
2399 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2400                 int terminate)
2401 {
2402         int res;
2403         pjsip_dialog *dlg = sub->tree->dlg;
2404
2405         pjsip_dlg_inc_lock(dlg);
2406
2407         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2408                 pjsip_dlg_dec_lock(dlg);
2409                 return 0;
2410         }
2411
2412         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2413                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2414                 pjsip_dlg_dec_lock(dlg);
2415                 return -1;
2416         }
2417
2418         sub->body_changed = 1;
2419         if (terminate) {
2420                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2421                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2422         }
2423
2424         if (sub->tree->notification_batch_interval) {
2425                 res = schedule_notification(sub->tree);
2426         } else {
2427                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2428                 ao2_ref(sub->tree, +1);
2429                 if (terminate) {
2430                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2431                 }
2432                 res = send_notify(sub->tree, 0);
2433                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2434                                 "Resource: %s",
2435                                 sub->tree->root->resource);
2436                 ao2_ref(sub->tree, -1);
2437         }
2438
2439         pjsip_dlg_dec_lock(dlg);
2440         return res;
2441 }
2442
2443 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2444 {
2445         return sub->uri;
2446 }
2447
2448 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2449 {
2450         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2451 }
2452
2453 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2454 {
2455         pjsip_dialog *dlg;
2456
2457         dlg = sub->tree->dlg;
2458         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2459 }
2460
2461 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2462 {
2463         return sub->resource;
2464 }
2465
2466 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2467 {
2468         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2469 }
2470
2471 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2472 {
2473         pjsip_hdr res_hdr;
2474
2475         /* If this is a persistence recreation the subscription has already been accepted */
2476         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2477                 return 0;
2478         }
2479
2480         pj_list_init(&res_hdr);
2481         if (sub_tree->is_list) {
2482                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2483                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2484         }
2485
2486         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2487 }
2488
2489 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2490 {
2491         return ast_datastores_alloc_datastore(info, uid);
2492 }
2493
2494 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2495 {
2496         return ast_datastores_add(subscription->datastores, datastore);
2497 }
2498
2499 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2500 {
2501         return ast_datastores_find(subscription->datastores, name);
2502 }
2503
2504 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2505 {
2506         ast_datastores_remove(subscription->datastores, name);
2507 }
2508
2509 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2510 {
2511         return subscription->datastores;
2512 }
2513
2514 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2515 {
2516         return ast_datastores_add(publication->datastores, datastore);
2517 }
2518
2519 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2520 {
2521         return ast_datastores_find(publication->datastores, name);
2522 }
2523
2524 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2525 {
2526         ast_datastores_remove(publication->datastores, name);
2527 }
2528
2529 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2530 {
2531         return publication->datastores;
2532 }
2533
2534 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2535
2536 static int publication_hash_fn(const void *obj, const int flags)
2537 {
2538         const struct ast_sip_publication *publication = obj;
2539         const int *entity_tag = obj;
2540
2541         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2542 }
2543
2544 static int publication_cmp_fn(void *obj, void *arg, int flags)
2545 {
2546         const struct ast_sip_publication *publication1 = obj;
2547         const struct ast_sip_publication *publication2 = arg;
2548         const int *entity_tag = arg;
2549
2550         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2551                 CMP_MATCH | CMP_STOP : 0);
2552 }
2553
2554 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2555 {
2556         AST_RWLIST_WRLOCK(&publish_handlers);
2557         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2558         AST_RWLIST_UNLOCK(&publish_handlers);
2559 }
2560
2561 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2562 {
2563         if (ast_strlen_zero(handler->event_name)) {
2564                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2565                 return -1;
2566         }
2567
2568         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2569                 publication_hash_fn, publication_cmp_fn))) {
2570                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2571                         handler->event_name);
2572                 return -1;
2573         }
2574
2575         publish_add_handler(handler);
2576
2577         ast_module_ref(ast_module_info->self);
2578
2579         return 0;
2580 }
2581
2582 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2583 {
2584         struct ast_sip_publish_handler *iter;
2585
2586         AST_RWLIST_WRLOCK(&publish_handlers);
2587         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2588                 if (handler == iter) {
2589                         AST_RWLIST_REMOVE_CURRENT(next);
2590                         ao2_cleanup(handler->publications);
2591                         ast_module_unref(ast_module_info->self);
2592                         break;
2593                 }
2594         }
2595         AST_RWLIST_TRAVERSE_SAFE_END;
2596         AST_RWLIST_UNLOCK(&publish_handlers);
2597 }
2598
2599 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2600
2601 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2602 {
2603         AST_RWLIST_WRLOCK(&subscription_handlers);
2604         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2605         ast_module_ref(ast_module_info->self);
2606         AST_RWLIST_UNLOCK(&subscription_handlers);
2607 }
2608
2609 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2610 {
2611         struct ast_sip_subscription_handler *iter;
2612
2613         AST_RWLIST_RDLOCK(&subscription_handlers);
2614         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2615                 if (!strcmp(iter->event_name, event_name)) {
2616                         break;
2617                 }
2618         }
2619         AST_RWLIST_UNLOCK(&subscription_handlers);
2620         return iter;
2621 }
2622
2623 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2624 {
2625         pj_str_t event;
2626         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2627         struct ast_sip_subscription_handler *existing;
2628         int i = 0;
2629
2630         if (ast_strlen_zero(handler->event_name)) {
2631                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2632                 return -1;
2633         }
2634
2635         existing = find_sub_handler_for_event_name(handler->event_name);
2636         if (existing) {
2637                 ast_log(LOG_ERROR,
2638                         "Unable to register subscription handler for event %s.  A handler is already registered\n",
2639                         handler->event_name);
2640                 return -1;
2641         }
2642
2643         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2644                 pj_cstr(&accept[i], handler->accept[i]);
2645         }
2646
2647         pj_cstr(&event, handler->event_name);
2648
2649         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2650
2651         sub_add_handler(handler);
2652
2653         return 0;
2654 }
2655
2656 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2657 {
2658         struct ast_sip_subscription_handler *iter;
2659
2660         AST_RWLIST_WRLOCK(&subscription_handlers);
2661         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2662                 if (handler == iter) {
2663                         AST_RWLIST_REMOVE_CURRENT(next);
2664                         ast_module_unref(ast_module_info->self);
2665                         break;
2666                 }
2667         }
2668         AST_RWLIST_TRAVERSE_SAFE_END;
2669         AST_RWLIST_UNLOCK(&subscription_handlers);
2670 }
2671
2672 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2673 {
2674         struct ast_sip_pubsub_body_generator *gen;
2675
2676         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2677                 if (!strcmp(gen->type, type)
2678                         && !strcmp(gen->subtype, subtype)) {
2679                         break;
2680                 }
2681         }
2682
2683         return gen;
2684 }
2685
2686 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2687 {
2688         struct ast_sip_pubsub_body_generator *gen;
2689
2690         AST_RWLIST_RDLOCK(&body_generators);
2691         gen = find_body_generator_type_subtype_nolock(type, subtype);
2692         AST_RWLIST_UNLOCK(&body_generators);
2693         return gen;
2694 }
2695
2696 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2697 {
2698         char *accept_copy = ast_strdupa(accept);
2699         char *subtype = accept_copy;
2700         char *type = strsep(&subtype, "/");
2701
2702         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2703                 return NULL;
2704         }
2705
2706         return find_body_generator_type_subtype(type, subtype);
2707 }
2708
2709 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2710                 size_t num_accept, const char *body_type)
2711 {
2712         int i;
2713         struct ast_sip_pubsub_body_generator *generator = NULL;
2714
2715         for (i = 0; i < num_accept; ++i) {
2716                 generator = find_body_generator_accept(accept[i]);
2717                 if (generator) {
2718                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2719                         if (strcmp(generator->body_type, body_type)) {
2720                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2721                                                 generator->type, generator->subtype, generator);
2722                                 generator = NULL;
2723                                 continue;
2724                         }
2725                         break;
2726                 } else {
2727                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2728                 }
2729         }
2730
2731         return generator;
2732 }
2733
2734 static int generate_initial_notify(struct ast_sip_subscription *sub)
2735 {
2736         void *notify_data;
2737         int res;
2738         struct ast_sip_body_data data = {
2739                 .body_type = sub->handler->body_type,
2740         };
2741
2742         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2743                 int i;
2744
2745                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2746                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2747                                 return -1;
2748                         }
2749                 }
2750
2751                 return 0;
2752         }
2753
2754         /* We notify subscription establishment only on the tree leaves. */
2755         if (sub->handler->notifier->subscription_established(sub)) {
2756                 return -1;
2757         }
2758
2759         notify_data = sub->handler->notifier->get_notify_data(sub);
2760         if (!notify_data) {
2761                 return -1;
2762         }
2763
2764         data.body_data = notify_data;
2765
2766         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2767                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2768
2769         ao2_cleanup(notify_data);
2770
2771         return res;
2772 }
2773
2774 static int pubsub_on_refresh_timeout(void *userdata);
2775
2776 static int initial_notify_task(void * obj)
2777 {
2778         struct initial_notify_data *ind = obj;
2779
2780         if (generate_initial_notify(ind->sub_tree->root)) {
2781                 pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE);
2782         } else {
2783                 send_notify(ind->sub_tree, 1);
2784                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2785                         "Resource: %s",
2786                         ind->sub_tree->root->resource);
2787         }
2788
2789         if (ind->expires > -1) {
2790                 char *name = ast_alloca(strlen("->/ ") +
2791                         strlen(ind->sub_tree->persistence->endpoint) +
2792                         strlen(ind->sub_tree->root->resource) +
2793                         strlen(ind->sub_tree->root->handler->event_name) +
2794                         ind->sub_tree->dlg->call_id->id.slen + 1);
2795
2796                 sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint,
2797                         ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name,
2798                         (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr);
2799
2800                 ast_debug(3, "Scheduling timer: %s\n", name);
2801                 ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer,
2802                         ind->expires * 1000, pubsub_on_refresh_timeout, name,
2803                         ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2);
2804                 if (!ind->sub_tree->expiration_task) {
2805                         ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n",
2806                                 ind->expires, name);
2807                 }
2808         }
2809
2810         ao2_ref(ind->sub_tree, -1);
2811         ast_free(ind);
2812
2813         return 0;
2814 }
2815
2816 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2817 {
2818         pjsip_expires_hdr *expires_header;
2819         struct ast_sip_subscription_handler *handler;
2820         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2821         struct sip_subscription_tree *sub_tree;
2822         struct ast_sip_pubsub_body_generator *generator;
2823         char *resource;
2824         pjsip_uri *request_uri;
2825         pjsip_sip_uri *request_uri_sip;
2826         size_t resource_size;
2827         int resp;
2828         struct resource_tree tree;
2829         pj_status_t dlg_status;
2830
2831         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2832         ast_assert(endpoint != NULL);
2833
2834         if (!endpoint->subscription.allow) {
2835                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2836                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2837                 return PJ_TRUE;
2838         }
2839
2840         request_uri = rdata->msg_info.msg->line.req.uri;
2841
2842         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2843                 char uri_str[PJSIP_MAX_URL_SIZE];
2844
2845                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2846                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2847                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2848                 return PJ_TRUE;
2849         }
2850
2851         request_uri_sip = pjsip_uri_get_uri(request_uri);
2852         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2853         resource = ast_alloca(resource_size);
2854         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2855
2856         /*
2857          * We may want to match without any user options getting
2858          * in the way.
2859          */
2860         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
2861
2862         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2863         if (expires_header) {
2864                 if (expires_header->ivalue == 0) {
2865                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2866                                 ast_sorcery_object_get_id(endpoint));
2867                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2868                                 return PJ_TRUE;
2869                 }
2870                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2871                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2872                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2873                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2874                         return PJ_TRUE;
2875                 }
2876         }
2877
2878         handler = subscription_get_handler_from_rdata(rdata);
2879         if (!handler) {
2880                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2881                 return PJ_TRUE;
2882         }
2883
2884         generator = subscription_get_generator_from_rdata(rdata, handler);
2885         if (!generator) {
2886                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2887                 return PJ_TRUE;
2888         }
2889
2890         memset(&tree, 0, sizeof(tree));
2891         resp = build_resource_tree(endpoint, handler, resource, &tree,
2892                 ast_sip_pubsub_has_eventlist_support(rdata));
2893         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2894                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2895                 resource_tree_destroy(&tree);
2896                 return PJ_TRUE;
2897         }
2898
2899         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2900         if (!sub_tree) {
2901                 if (dlg_status != PJ_EEXISTS) {
2902                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2903                 }
2904         } else {
2905                 struct initial_notify_data *ind = ast_malloc(sizeof(*ind));
2906
2907                 if (!ind) {
2908                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2909                         resource_tree_destroy(&tree);
2910                         return PJ_TRUE;
2911                 }
2912
2913                 ind->sub_tree = ao2_bump(sub_tree);
2914                 /* Since this is a normal subscribe, pjproject takes care of the timer */
2915                 ind->expires = -1;
2916
2917                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2918                 subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED);
2919                 sip_subscription_accept(sub_tree, rdata, resp);
2920                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) {
2921                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2922                         ao2_ref(sub_tree, -1);
2923                         ast_free(ind);
2924                 }
2925         }
2926
2927         resource_tree_destroy(&tree);
2928         return PJ_TRUE;
2929 }
2930
2931 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2932 {
2933         struct ast_sip_publish_handler *iter = NULL;
2934
2935         AST_RWLIST_RDLOCK(&publish_handlers);
2936         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2937                 if (strcmp(event, iter->event_name)) {
2938                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2939                         continue;
2940                 }
2941                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2942                 break;
2943         }
2944         AST_RWLIST_UNLOCK(&publish_handlers);
2945
2946         return iter;
2947 }
2948
2949 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2950         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2951 {
2952         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2953
2954         if (etag_hdr) {
2955                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2956
2957                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2958
2959                 if (sscanf(etag, "%30d", entity_id) != 1) {
2960                         return SIP_PUBLISH_UNKNOWN;
2961                 }
2962         }
2963
2964         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2965
2966         if (!(*expires)) {
2967                 return SIP_PUBLISH_REMOVE;
2968         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2969                 return SIP_PUBLISH_INITIAL;
2970         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2971                 return SIP_PUBLISH_REFRESH;
2972         } else if (etag_hdr && rdata->msg_info.msg->body) {
2973                 return SIP_PUBLISH_MODIFY;
2974         }
2975
2976         return SIP_PUBLISH_UNKNOWN;
2977 }
2978
2979 /*! \brief Internal destructor for publications */
2980 static void publication_destroy_fn(void *obj)
2981 {
2982         struct ast_sip_publication *publication = obj;
2983
2984         ast_debug(3, "Destroying SIP publication\n");
2985
2986         ao2_cleanup(publication->datastores);
2987         ao2_cleanup(publication->endpoint);
2988 }
2989
2990 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2991         const char *resource, const char *event_configuration_name)
2992 {
2993         struct ast_sip_publication *publication;
2994         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2995         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2996         char *dst;
2997
2998         ast_assert(endpoint != NULL);
2999
3000         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
3001                 return NULL;
3002         }
3003
3004         if (!(publication->datastores = ast_datastores_alloc())) {
3005                 ao2_ref(publication, -1);
3006                 return NULL;
3007         }
3008
3009         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3010         ao2_ref(endpoint, +1);
3011         publication->endpoint = endpoint;
3012         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
3013         publication->sched_id = -1;
3014         dst = publication->data;
3015         publication->resource = strcpy(dst, resource);
3016         dst += resource_len;
3017         publication->event_configuration_name = strcpy(dst, event_configuration_name);
3018
3019         return publication;
3020 }
3021
3022 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
3023                 pjsip_rx_data *rdata)
3024 {
3025         pjsip_tx_data *tdata;
3026         pjsip_transaction *tsx;
3027
3028         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
3029                 return -1;
3030         }
3031
3032         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
3033                 char buf[30];
3034
3035                 snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
3036                 ast_sip_add_header(tdata, "SIP-ETag", buf);
3037
3038                 snprintf(buf, sizeof(buf), "%d", pub->expires);
3039                 ast_sip_add_header(tdata, "Expires", buf);
3040         }
3041
3042         if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
3043                 pjsip_tx_data_dec_ref(tdata);
3044                 return -1;
3045         }
3046
3047         pjsip_tsx_recv_msg(tsx, rdata);
3048
3049         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
3050                 pjsip_tx_data_dec_ref(tdata);
3051                 return -1;
3052         }
3053
3054         return 0;
3055 }
3056
3057 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
3058         struct ast_sip_publish_handler *handler)
3059 {
3060         struct ast_sip_publication *publication;
3061         char *resource_name;
3062         size_t resource_size;
3063         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
3064         struct ast_variable *event_configuration_name = NULL;
3065         pjsip_uri *request_uri;
3066         pjsip_sip_uri *request_uri_sip;
3067         int resp;
3068
3069         request_uri = rdata->msg_info.msg->line.req.uri;
3070
3071         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
3072                 char uri_str[PJSIP_MAX_URL_SIZE];
3073
3074                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
3075                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
3076                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
3077                 return NULL;
3078         }
3079
3080         request_uri_sip = pjsip_uri_get_uri(request_uri);
3081         resource_size = pj_strlen(&request_uri_sip->user) + 1;
3082         resource_name = ast_alloca(resource_size);
3083         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
3084
3085         /*
3086          * We may want to match without any user options getting
3087          * in the way.
3088          */
3089         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
3090
3091         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
3092         if (!resource) {
3093                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
3094                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3095                 return NULL;
3096         }
3097
3098         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
3099                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
3100                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
3101                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
3102                 return NULL;
3103         }
3104
3105         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
3106                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
3107                         break;
3108                 }
3109         }
3110
3111         if (!event_configuration_name) {
3112                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
3113                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3114                 return NULL;
3115         }
3116
3117         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
3118
3119         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
3120                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
3121                 return NULL;
3122         }
3123
3124         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3125
3126         if (!publication) {
3127                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3128                 return NULL;
3129         }
3130
3131         publication->handler = handler;
3132         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3133                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3134                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3135                 ao2_cleanup(publication);
3136                 return NULL;
3137         }
3138
3139         sip_publication_respond(publication, resp, rdata);
3140
3141         return publication;
3142 }
3143
3144 static int publish_expire_callback(void *data)
3145 {
3146         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3147
3148         if (publication->handler->publish_expire) {
3149                 publication->handler->publish_expire(publication);
3150         }
3151
3152         return 0;
3153 }
3154
3155 static int publish_expire(const void *data)
3156 {
3157         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3158
3159         ao2_unlink(publication->handler->publications, publication);
3160         publication->sched_id = -1;
3161
3162         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3163                 ao2_cleanup(publication);
3164         }
3165
3166         return 0;
3167 }
3168
3169 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3170 {
3171         pjsip_event_hdr *event_header;
3172         struct ast_sip_publish_handler *handler;
3173         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3174         char event[32];
3175         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3176         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3177         enum sip_publish_type publish_type;
3178         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3179         int expires = 0, entity_id, response = 0;
3180
3181         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3182         ast_assert(endpoint != NULL);
3183
3184         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3185         if (!event_header) {
3186                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3187                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3188                 return PJ_TRUE;
3189         }
3190         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3191
3192         handler = find_pub_handler(event);
3193         if (!handler) {
3194                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3195                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3196                 return PJ_TRUE;
3197         }
3198
3199         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3200
3201         /* If this is not an initial publish ensure that a publication is present */
3202         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3203                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3204                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3205
3206                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3207                                 NULL, NULL);
3208                         return PJ_TRUE;
3209                 }
3210
3211                 /* Per the RFC every response has to have a new entity tag */
3212                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3213
3214                 /* Update the expires here so that the created responses will contain the correct value */
3215                 publication->expires = expires;
3216         }
3217
3218         switch (publish_type) {
3219                 case SIP_PUBLISH_INITIAL:
3220                         publication = publish_request_initial(endpoint, rdata, handler);
3221                         break;
3222                 case SIP_PUBLISH_REFRESH:
3223                 case SIP_PUBLISH_MODIFY:
3224                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3225                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3226                                 /* If an error occurs we want to terminate the publication */
3227                                 expires = 0;
3228                         }
3229                         response = 200;
3230                         break;
3231                 case SIP_PUBLISH_REMOVE:
3232                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3233                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3234                         response = 200;
3235                         break;
3236                 case SIP_PUBLISH_UNKNOWN:
3237                 default:
3238                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3239                         break;
3240         }
3241
3242         if (publication) {
3243                 if (expires) {
3244                         ao2_link(handler->publications, publication);
3245
3246                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3247                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3248                 } else {
3249                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3250                 }
3251         }
3252
3253         if (response) {
3254                 sip_publication_respond(publication, response, rdata);
3255         }
3256
3257         return PJ_TRUE;
3258 }
3259
3260 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3261 {
3262         return pub->endpoint;
3263 }
3264
3265 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3266 {
3267         return pub->resource;
3268 }
3269
3270 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3271 {
3272         return pub->event_configuration_name;
3273 }
3274
3275 int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype)
3276 {
3277         return !!find_body_generator_type_subtype(type, subtype);
3278 }
3279
3280 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3281 {
3282         struct ast_sip_pubsub_body_generator *existing;
3283         pj_str_t accept;
3284         pj_size_t accept_len;
3285
3286         AST_RWLIST_WRLOCK(&body_generators);
3287         existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype);
3288         if (existing) {
3289                 AST_RWLIST_UNLOCK(&body_generators);
3290                 ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n",
3291                         generator->type, generator->subtype);
3292                 return -1;
3293         }
3294         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3295         AST_RWLIST_UNLOCK(&body_generators);
3296
3297         /* Lengths of type and subtype plus a slash. */
3298         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3299
3300         /* Add room for null terminator that sprintf() will set. */
3301         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3302         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3303
3304         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3305                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3306
3307         return 0;
3308 }
3309
3310 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3311 {
3312         struct ast_sip_pubsub_body_generator *iter;
3313
3314         AST_RWLIST_WRLOCK(&body_generators);
3315         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3316                 if (iter == generator) {
3317                         AST_LIST_REMOVE_CURRENT(list);
3318                         break;
3319                 }
3320         }
3321         AST_RWLIST_TRAVERSE_SAFE_END;
3322         AST_RWLIST_UNLOCK(&body_generators);
3323 }
3324
3325 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3326 {
3327         AST_RWLIST_WRLOCK(&body_supplements);
3328         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3329         AST_RWLIST_UNLOCK(&body_supplements);
3330
3331         return 0;
3332 }
3333
3334 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3335 {
3336         struct ast_sip_pubsub_body_supplement *iter;
3337
3338         AST_RWLIST_WRLOCK(&body_supplements);
3339         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3340                 if (iter == supplement) {
3341                         AST_LIST_REMOVE_CURRENT(list);
3342                         break;
3343                 }
3344         }
3345         AST_RWLIST_TRAVERSE_SAFE_END;
3346         AST_RWLIST_UNLOCK(&body_supplements);
3347 }
3348
3349 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3350 {
3351         return sub->body_generator->type;
3352 }
3353
3354 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3355 {
3356         return sub->body_generator->subtype;
3357 }
3358
3359 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3360                 struct ast_sip_body_data *data, struct ast_str **str)
3361 {
3362         struct ast_sip_pubsub_body_supplement *supplement;
3363         struct ast_sip_pubsub_body_generator *generator;
3364         int res = 0;
3365         void *body;
3366
3367         generator = find_body_generator_type_subtype(type, subtype);
3368         if (!generator) {
3369                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3370                                 type, subtype);
3371                 return -1;
3372         }
3373
3374         if (strcmp(data->body_type, generator->body_type)) {
3375                 ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n",
3376                         type, subtype);
3377                 return -1;
3378         }
3379
3380         body = generator->allocate_body(data->body_data);
3381         if (!body) {
3382                 ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n",
3383                         type, subtype);
3384                 return -1;
3385         }
3386
3387         if (generator->generate_body_content(body, data->body_data)) {
3388                 res = -1;
3389                 goto end;
3390         }
3391
3392         AST_RWLIST_RDLOCK(&body_supplements);
3393         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3394                 if (!strcmp(generator->type, supplement->type) &&
3395                                 !strcmp(generator->subtype, supplement->subtype)) {
3396                         res = supplement->supplement_body(body, data->body_data);
3397                         if (res) {
3398                                 break;
3399                         }
3400                 }
3401         }
3402         AST_RWLIST_UNLOCK(&body_supplements);
3403
3404         if (!res) {
3405                 generator->to_string(body, str);
3406         }
3407
3408 end:
3409         if (generator->destroy_body) {
3410                 generator->destroy_body(body);
3411         }
3412
3413         return res;
3414 }
3415
3416 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3417 {
3418         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3419                 return pubsub_on_rx_subscribe_request(rdata);
3420         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3421                 return pubsub_on_rx_publish_request(rdata);
3422         }
3423
3424         return PJ_FALSE;
3425 }
3426
3427 static void set_state_terminated(struct ast_sip_subscription *sub)
3428 {
3429         int i;
3430
3431         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3432         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3433                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3434         }
3435 }
3436
3437 /*!
3438  * \brief Callback sequence for subscription terminate:
3439  *
3440  * * Client initiated:
3441  *     pjproject receives SUBSCRIBE on the subscription's serializer thread
3442  *         calls pubsub_on_rx_refresh with dialog locked
3443  *             pubsub_on_rx_refresh sets TERMINATE_PENDING
3444  *             pushes serialized_pubsub_on_refresh_timeout
3445  *             returns to pjproject
3446  *         pjproject calls pubsub_on_evsub_state
3447  *             pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no)
3448  *             ignore and return
3449  *         pjproject unlocks dialog
3450  *     serialized_pubsub_on_refresh_timeout starts (1)
3451  *       locks dialog
3452  *       checks state == TERMINATE_PENDING
3453  *       sets TERMINATE_IN_PROGRESS
3454  *       calls send_notify (2)
3455  *           send_notify ultimately calls pjsip_evsub_send_request
3456  *               pjsip_evsub_send_request calls evsub's set_state
3457  *                   set_state calls pubsub_evsub_set_state
3458  *                       pubsub_on_evsub_state checks state == TERMINATE_IN_PROGRESS
3459  *                       removes the subscriptions
3460  *                       cleans up references to evsub
3461  *                       sets state = TERMINATED
3462  *       serialized_pubsub_on_refresh_timeout unlocks dialog
3463  *
3464  * * Subscription timer expires:
3465  *     pjproject timer expires
3466  *         locks dialog
3467  *         calls pubsub_on_server_timeout
3468  *             pubsub_on_server_timeout checks state == NORMAL
3469  *             sets TERMINATE_PENDING
3470  *             pushes serialized_pubsub_on_refresh_timeout
3471  *             returns to pjproject
3472  *         pjproject unlocks dialog
3473  *     serialized_pubsub_on_refresh_timeout starts
3474  *         See (1) Above
3475  *
3476  * * Transmission failure sending NOTIFY or error response from client
3477  *     pjproject transaction timer expires or non OK response
3478  *         pjproject locks dialog
3479  *         calls pubsub_on_evsub_state with event TSX_STATE
3480  *             pubsub_on_evsub_state checks event == TSX_STATE
3481  *             removes the subscriptions
3482  *             cleans up references to evsub
3483  *             sets state = TERMINATED
3484  *         pjproject unlocks dialog
3485  *
3486  * * ast_sip_subscription_notify is called
3487  *       checks state == NORMAL
3488  *       if not batched...
3489  *           sets TERMINATE_IN_PROGRESS (if terminate is requested)
3490  *           calls send_notify
3491  *               See (2) Above
3492  *       if batched...
3493  *           sets TERMINATE_PENDING
3494  *           schedules task
3495  *       scheduler runs sched_task
3496  *           sched_task pushes serialized_send_notify
3497  *       serialized_send_notify starts
3498  *           checks state <= TERMINATE_PENDING
3499  *           if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS
3500  *           call send_notify
3501  *               See (2) Above
3502  *
3503  */
3504
3505 /*!
3506  * \brief PJSIP callback when underlying SIP subscription changes state
3507  *
3508  * Although this function is called for every state change, we only care
3509  * about the TERMINATED state, and only when we're actually processing the final
3510  * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS) OR when a transmission failure
3511  * occurs (PJSIP_EVENT_TSX_STATE).  In this case, we do all the subscription tree
3512  * cleanup tasks and decrement the evsub reference.
3513  */
3514 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3515 {
3516         struct sip_subscription_tree *sub_tree =
3517                 pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3518
3519         ast_debug(3, "evsub %p state %s event %s sub_tree %p sub_tree state %s\n", evsub,
3520                 pjsip_evsub_get_state_name(evsub), pjsip_event_str(event->type), sub_tree,
3521                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3522
3523         if (!sub_tree || pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3524                 return;
3525         }
3526
3527         /* It's easier to write this as what we WANT to process, then negate it. */
3528         if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS
3529                 || (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL)
3530                 )) {
3531                 ast_debug(3, "Do nothing.\n");
3532                 return;
3533         }
3534
3535         if (sub_tree->expiration_task) {
3536                 char task_name[256];
3537
3538                 ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name));
3539                 ast_debug(3, "Cancelling timer: %s\n", task_name);
3540                 ast_sip_sched_task_cancel(sub_tree->expiration_task);
3541                 ao2_cleanup(sub_tree->expiration_task);
3542                 sub_tree->expiration_task = NULL;
3543         }
3544
3545         remove_subscription(sub_tree);
3546
3547         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3548
3549 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
3550         pjsip_evsub_dec_ref(sub_tree->evsub);
3551 #endif
3552
3553         sub_tree->evsub = NULL;
3554
3555         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
3556         ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
3557
3558         subscription_persistence_remove(sub_tree);
3559         shutdown_subscriptions(sub_tree->root);
3560
3561         sub_tree->state = SIP_SUB_TREE_TERMINATED;
3562         /* Remove evsub's reference to the sub_tree */
3563         ao2_ref(sub_tree, -1);
3564 }
3565
3566 static int pubsub_on_refresh_timeout(void *userdata)
3567 {
3568         struct sip_subscription_tree *sub_tree = userdata;
3569         pjsip_dialog *dlg = sub_tree->dlg;
3570
3571         ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
3572                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3573
3574         pjsip_dlg_inc_lock(dlg);
3575         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3576                 pjsip_dlg_dec_lock(dlg);
3577                 return 0;
3578         }
3579
3580         if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) {
3581                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
3582                 set_state_terminated(sub_tree->root);
3583         }
3584
3585         send_notify(sub_tree, 1);
3586
3587         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3588                                 "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3589                                 "Resource: %s", sub_tree->root->resource);
3590
3591         pjsip_dlg_dec_lock(dlg);
3592
3593         return 0;
3594 }
3595
3596 static int serialized_pubsub_on_refresh_timeout(void *userdata)
3597 {
3598         struct sip_subscription_tree *sub_tree = userdata;
3599
3600         ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree,
3601                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3602
3603         pubsub_on_refresh_timeout(userdata);
3604         ao2_cleanup(sub_tree);
3605
3606         return 0;
3607 }
3608
3609 /*!
3610  * \brief Called whenever an in-dialog SUBSCRIBE is received
3611  *
3612  * This includes both SUBSCRIBE requests that actually refresh the subscription
3613  * as well as SUBSCRIBE requests that end the subscription.
3614  *
3615  * In either case we push serialized_pubsub_on_refresh_timeout to send an
3616  * appropriate NOTIFY request.
3617  */
3618 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
3619                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3620 {
3621         struct sip_subscription_tree *sub_tree;
3622
3623         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3624         ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree,
3625                 (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN"));
3626
3627         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3628                 return;
3629         }
3630
3631         if (sub_tree->expiration_task) {
3632                 char task_name[256];
3633
3634                 ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name));
3635                 ast_debug(3, "Cancelling timer: %s\n", task_name);
3636                 ast_sip_sched_task_cancel(sub_tree->expiration_task);
3637                 ao2_cleanup(sub_tree->expiration_task);
3638                 sub_tree->expiration_task = NULL;
3639         }
3640
3641         /* PJSIP will set the evsub's state to terminated before calling into this function
3642          * if the Expires value of the incoming SUBSCRIBE is 0.
3643          */
3644
3645         if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
3646                 sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3647         }
3648
3649         subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_REFRESHED);
3650
3651         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3652                 /* If we can't push the NOTIFY refreshing task...we'll just go with it. */
3653                 ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n");
3654                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3655                 ao2_ref(sub_tree, -1);
3656         }
3657
3658         if (sub_tree->is_list) {
3659                 pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
3660         }
3661 }
3662
3663 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
3664                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3665 {
3666         struct ast_sip_subscription *sub;
3667
3668         if (!(sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3669                 return;
3670         }
3671
3672         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
3673                         pjsip_evsub_get_state(evsub));
3674 }
3675
3676 static int serialized_pubsub_on_client_refresh(void *userdata)
3677 {
3678         struct sip_subscription_tree *sub_tree = userdata;
3679         pjsip_tx_data *tdata;
3680
3681         if (!sub_tree->evsub) {
3682                 ao2_cleanup(sub_tree);
3683                 return 0;
3684         }
3685
3686         if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
3687                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
3688         } else {
3689                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
3690         }
3691
3692         ao2_cleanup(sub_tree);
3693         return 0;
3694 }
3695
3696 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
3697 {
3698         struct sip_subscription_tree *sub_tree;
3699
3700         if (!(sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3701                 return;
3702         }
3703
3704         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, ao2_bump(sub_tree))) {
3705                 ao2_cleanup(sub_tree);
3706         }
3707 }
3708
3709 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
3710 {
3711         struct sip_subscription_tree *sub_tree;
3712
3713         /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE
3714          * with Expires: 0 arrives to end a subscription, nor does it terminate
3715          * this timer when we send a NOTIFY request in response to receiving such
3716          * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the
3717          * NOTIFY transaction has finished (either through receiving a response
3718          * or through a transaction timeout).
3719          *
3720          * Therefore, it is possible that we can be told that a server timeout
3721          * occurred after we already thought that the subscription had been
3722          * terminated. In such a case, we will have already removed the sub_tree
3723          * from the evsub's mod_data array.
3724          */
3725
3726         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3727         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3728         return;
3729         }
3730
3731         sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3732         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3733                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3734                 ao2_cleanup(sub_tree);
3735         }
3736 }
3737
3738 static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
3739                                    struct ast_sip_ami *ami,
3740                                    const char *event)
3741 {
3742         struct ast_str *buf;
3743
3744         buf = ast_sip_create_ami_event(event, ami);
3745         if (!buf) {
3746                 return -1;
3747         }
3748
3749         sip_subscription_to_ami(sub_tree, &buf);
3750         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3751         ast_free(buf);
3752