res/res_pjsip_pubsub: Fix typo in WARNING message
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="message-summary"><para>
152                                                                 Message-waiting indication (MWI) reporting.
153                                                         </para></enum>
154                                                 </enumlist>
155                                         </description>
156                                 </configOption>
157                                 <configOption name="list_item">
158                                         <synopsis>The name of a resource to report state on</synopsis>
159                                         <description>
160                                                 <para>In general Asterisk looks up list items in the following way:</para>
161                                                 <para>1. Check if the list item refers to another configured resource list.</para>
162                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
163                                                    to find the specified resource.</para>
164                                                 <para>The second part means that the way the list item is specified depends
165                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
166                                                 set to <literal>presence</literal>, then list items should be in the form of
167                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
168                                                 names should be listed.</para>
169                                         </description>
170                                 </configOption>
171                                 <configOption name="full_state" default="no">
172                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
173                                         <description>
174                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
175                                                 a notification that contains the state of all resources in the list. If the option is
176                                                 disabled, Asterisk will construct a notification that only contains the states of
177                                                 resources that have changed.</para>
178                                                 <note>
179                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
180                                                         to send a notification with the states of all resources in the list. When a subscriber
181                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
182                                                         notification.</para>
183                                                 </note>
184                                         </description>
185                                 </configOption>
186                                 <configOption name="notification_batch_interval" default="0">
187                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
188                                         <description>
189                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
190                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
191                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
192                                                 many notifications.</para>
193                                         </description>
194                                 </configOption>
195                         </configObject>
196                         <configObject name="inbound-publication">
197                                 <synopsis>The configuration for inbound publications</synopsis>
198                                 <configOption name="endpoint" default="">
199                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
200                                 </configOption>
201                                 <configOption name="type">
202                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
203                                 </configOption>
204                         </configObject>
205                 </configFile>
206         </configInfo>
207  ***/
208
209 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
210
211 static struct pjsip_module pubsub_module = {
212         .name = { "PubSub Module", 13 },
213         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
214         .on_rx_request = pubsub_on_rx_request,
215 };
216
217 #define MOD_DATA_PERSISTENCE "sub_persistence"
218 #define MOD_DATA_MSG "sub_msg"
219
220 static const pj_str_t str_event_name = { "Event", 5 };
221
222 /*! \brief Scheduler used for automatically expiring publications */
223 static struct ast_sched_context *sched;
224
225 /*! \brief Number of buckets for publications (on a per handler) */
226 #define PUBLICATIONS_BUCKETS 37
227
228 /*! \brief Default expiration time for PUBLISH if one is not specified */
229 #define DEFAULT_PUBLISH_EXPIRES 3600
230
231 /*! \brief Number of buckets for subscription datastore */
232 #define DATASTORE_BUCKETS 53
233
234 /*! \brief Default expiration for subscriptions */
235 #define DEFAULT_EXPIRES 3600
236
237 /*! \brief Defined method for PUBLISH */
238 const pjsip_method pjsip_publish_method =
239 {
240         PJSIP_OTHER_METHOD,
241         { "PUBLISH", 7 }
242 };
243
244 /*!
245  * \brief The types of PUBLISH messages defined in RFC 3903
246  */
247 enum sip_publish_type {
248         /*!
249          * \brief Unknown
250          *
251          * \details
252          * This actually is not defined in RFC 3903. We use this as a constant
253          * to indicate that an incoming PUBLISH does not fit into any of the
254          * other categories and is thus invalid.
255          */
256         SIP_PUBLISH_UNKNOWN,
257
258         /*!
259          * \brief Initial
260          *
261          * \details
262          * The first PUBLISH sent. This will contain a non-zero Expires header
263          * as well as a body that indicates the current state of the endpoint
264          * that has sent the message. The initial PUBLISH is the only type
265          * of PUBLISH to not contain a Sip-If-Match header in it.
266          */
267         SIP_PUBLISH_INITIAL,
268
269         /*!
270          * \brief Refresh
271          *
272          * \details
273          * Used to keep a published state from expiring. This will contain a
274          * non-zero Expires header but no body since its purpose is not to
275          * update state.
276          */
277         SIP_PUBLISH_REFRESH,
278
279         /*!
280          * \brief Modify
281          *
282          * \details
283          * Used to change state from its previous value. This will contain
284          * a body updating the published state. May or may not contain an
285          * Expires header.
286          */
287         SIP_PUBLISH_MODIFY,
288
289         /*!
290          * \brief Remove
291          *
292          * \details
293          * Used to remove published state from an ESC. This will contain
294          * an Expires header set to 0 and likely no body.
295          */
296         SIP_PUBLISH_REMOVE,
297 };
298
299 /*!
300  * \brief A vector of strings commonly used throughout this module
301  */
302 AST_VECTOR(resources, const char *);
303
304 /*!
305  * \brief Resource list configuration item
306  */
307 struct resource_list {
308         SORCERY_OBJECT(details);
309         /*! SIP event package the list uses. */
310         char event[32];
311         /*! Strings representing resources in the list. */
312         struct resources items;
313         /*! Indicates if Asterisk sends full or partial state on notifications. */
314         unsigned int full_state;
315         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
316         unsigned int notification_batch_interval;
317 };
318
319 /*!
320  * Used to create new entity IDs by ESCs.
321  */
322 static int esc_etag_counter;
323
324 /*!
325  * \brief Structure representing a SIP publication
326  */
327 struct ast_sip_publication {
328         /*! Publication datastores set up by handlers */
329         struct ao2_container *datastores;
330         /*! \brief Entity tag for the publication */
331         int entity_tag;
332         /*! \brief Handler for this publication */
333         struct ast_sip_publish_handler *handler;
334         /*! \brief The endpoint with which the subscription is communicating */
335         struct ast_sip_endpoint *endpoint;
336         /*! \brief Expiration time of the publication */
337         int expires;
338         /*! \brief Scheduled item for expiration of publication */
339         int sched_id;
340         /*! \brief The resource the publication is to */
341         char *resource;
342         /*! \brief The name of the event type configuration */
343         char *event_configuration_name;
344         /*! \brief Data containing the above */
345         char data[0];
346 };
347
348
349 /*!
350  * \brief Structure used for persisting an inbound subscription
351  */
352 struct subscription_persistence {
353         /*! Sorcery object details */
354         SORCERY_OBJECT(details);
355         /*! The name of the endpoint involved in the subscrption */
356         char *endpoint;
357         /*! SIP message that creates the subscription */
358         char packet[PJSIP_MAX_PKT_LEN];
359         /*! Source address of the message */
360         char src_name[PJ_INET6_ADDRSTRLEN];
361         /*! Source port of the message */
362         int src_port;
363         /*! Local transport key type */
364         char transport_key[32];
365         /*! Local transport address */
366         char local_name[PJ_INET6_ADDRSTRLEN];
367         /*! Local transport port */
368         int local_port;
369         /*! Next CSeq to use for message */
370         unsigned int cseq;
371         /*! Local tag of the dialog */
372         char *tag;
373         /*! When this subscription expires */
374         struct timeval expires;
375 };
376
377 /*!
378  * \brief A tree of SIP subscriptions
379  *
380  * Because of the ability to subscribe to resource lists, a SIP
381  * subscription can result in a tree of subscriptions being created.
382  * This structure represents the information relevant to the subscription
383  * as a whole, to include the underlying PJSIP structure for the
384  * subscription.
385  */
386 struct sip_subscription_tree {
387         /*! The endpoint with which the subscription is communicating */
388         struct ast_sip_endpoint *endpoint;
389         /*! Serializer on which to place operations for this subscription */
390         struct ast_taskprocessor *serializer;
391         /*! The role for this subscription */
392         enum ast_sip_subscription_role role;
393         /*! Persistence information */
394         struct subscription_persistence *persistence;
395         /*! The underlying PJSIP event subscription structure */
396         pjsip_evsub *evsub;
397         /*! The underlying PJSIP dialog */
398         pjsip_dialog *dlg;
399         /*! Interval to use for batching notifications */
400         unsigned int notification_batch_interval;
401         /*! Scheduler ID for batched notification */
402         int notify_sched_id;
403         /*! Indicator if scheduled batched notification should be sent */
404         unsigned int send_scheduled_notify;
405         /*! The root of the subscription tree */
406         struct ast_sip_subscription *root;
407         /*! Is this subscription to a list? */
408         int is_list;
409         /*! Next item in the list */
410         AST_LIST_ENTRY(sip_subscription_tree) next;
411 };
412
413 /*!
414  * \brief Structure representing a "virtual" SIP subscription.
415  *
416  * This structure serves a dual purpose. Structurally, it is
417  * the constructed tree of subscriptions based on the resources
418  * being subscribed to. API-wise, this serves as the handle that
419  * subscription handlers use in order to interact with the pubsub API.
420  */
421 struct ast_sip_subscription {
422         /*! Subscription datastores set up by handlers */
423         struct ao2_container *datastores;
424         /*! The handler for this subscription */
425         const struct ast_sip_subscription_handler *handler;
426         /*! Pointer to the base of the tree */
427         struct sip_subscription_tree *tree;
428         /*! Body generaator for NOTIFYs */
429         struct ast_sip_pubsub_body_generator *body_generator;
430         /*! Vector of child subscriptions */
431         AST_VECTOR(, struct ast_sip_subscription *) children;
432         /*! Saved NOTIFY body text for this subscription */
433         struct ast_str *body_text;
434         /*! Indicator that the body text has changed since the last notification */
435         int body_changed;
436         /*! The current state of the subscription */
437         pjsip_evsub_state subscription_state;
438         /*! For lists, the current version to place in the RLMI body */
439         unsigned int version;
440         /*! For lists, indicates if full state should always be communicated. */
441         unsigned int full_state;
442         /*! URI associated with the subscription */
443         pjsip_sip_uri *uri;
444         /*! Name of resource being subscribed to */
445         char resource[0];
446 };
447
448 /*!
449  * \brief Structure representing a publication resource
450  */
451 struct ast_sip_publication_resource {
452         /*! \brief Sorcery object details */
453         SORCERY_OBJECT(details);
454         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
455         char *endpoint;
456         /*! \brief Mapping for event types to configuration */
457         struct ast_variable *events;
458 };
459
460 static const char *sip_subscription_roles_map[] = {
461         [AST_SIP_SUBSCRIBER] = "Subscriber",
462         [AST_SIP_NOTIFIER] = "Notifier"
463 };
464
465 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
466
467 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
468 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
469
470 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
471 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
472                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
473 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
474                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
475 static void pubsub_on_client_refresh(pjsip_evsub *sub);
476 static void pubsub_on_server_timeout(pjsip_evsub *sub);
477  
478 static pjsip_evsub_user pubsub_cb = {
479         .on_evsub_state = pubsub_on_evsub_state,
480         .on_rx_refresh = pubsub_on_rx_refresh,
481         .on_rx_notify = pubsub_on_rx_notify,
482         .on_client_refresh = pubsub_on_client_refresh,
483         .on_server_timeout = pubsub_on_server_timeout,
484 };
485
486 /*! \brief Destructor for publication resource */
487 static void publication_resource_destroy(void *obj)
488 {
489         struct ast_sip_publication_resource *resource = obj;
490
491         ast_free(resource->endpoint);
492         ast_variables_destroy(resource->events);
493 }
494
495 /*! \brief Allocator for publication resource */
496 static void *publication_resource_alloc(const char *name)
497 {
498         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
499 }
500
501 /*! \brief Destructor for subscription persistence */
502 static void subscription_persistence_destroy(void *obj)
503 {
504         struct subscription_persistence *persistence = obj;
505
506         ast_free(persistence->endpoint);
507         ast_free(persistence->tag);
508 }
509
510 /*! \brief Allocator for subscription persistence */
511 static void *subscription_persistence_alloc(const char *name)
512 {
513         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
514 }
515
516 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
517 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
518 {
519         char tag[PJ_GUID_STRING_LENGTH + 1];
520
521         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
522          * look it up by id at all.
523          */
524         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
525                 "subscription_persistence", NULL);
526
527         pjsip_dialog *dlg = sub_tree->dlg;
528
529         if (!persistence) {
530                 return NULL;
531         }
532
533         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
534         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
535         persistence->tag = ast_strdup(tag);
536
537         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
538         return persistence;
539 }
540
541 /*! \brief Function which updates persistence information of a subscription in sorcery */
542 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
543         pjsip_rx_data *rdata)
544 {
545         pjsip_dialog *dlg;
546
547         if (!sub_tree->persistence) {
548                 return;
549         }
550
551         dlg = sub_tree->dlg;
552         sub_tree->persistence->cseq = dlg->local.cseq;
553
554         if (rdata) {
555                 int expires;
556                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
557
558                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
559                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
560
561                 ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
562                                 sizeof(sub_tree->persistence->packet));
563                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
564                                 sizeof(sub_tree->persistence->src_name));
565                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
566                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
567                         sizeof(sub_tree->persistence->transport_key));
568                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
569                         sizeof(sub_tree->persistence->local_name));
570                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
571         }
572
573         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
574 }
575
576 /*! \brief Function which removes persistence of a subscription from sorcery */
577 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
578 {
579         if (!sub_tree->persistence) {
580                 return;
581         }
582
583         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
584         ao2_ref(sub_tree->persistence, -1);
585 }
586
587
588 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
589 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
590                 size_t num_accept, const char *body_type);
591
592 /*! \brief Retrieve a handler using the Event header of an rdata message */
593 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
594 {
595         pjsip_event_hdr *event_header;
596         char event[32];
597         struct ast_sip_subscription_handler *handler;
598
599         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
600         if (!event_header) {
601                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
602                 return NULL;
603         }
604         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
605
606         handler = find_sub_handler_for_event_name(event);
607         if (!handler) {
608                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
609         }
610
611         return handler;
612 }
613
614 /*!
615  * \brief Accept headers that are exceptions to the rule
616  *
617  * Typically, when a SUBSCRIBE arrives, we attempt to find a
618  * body generator that matches one of the Accept headers in
619  * the request. When subscribing to a single resource, this works
620  * great. However, when subscribing to a list, things work
621  * differently. Most Accept header values are fine, but there
622  * are a couple that are endemic to resource lists that need
623  * to be ignored when searching for a body generator to use
624  * for the individual resources of the subscription.
625  */
626 const char *accept_exceptions[] =  {
627         "multipart/related",
628         "application/rlmi+xml",
629 };
630
631 /*!
632  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
633  *
634  * \retval 1 This Accept header value is an exception to the rule.
635  * \retval 0 This Accept header is not an exception to the rule.
636  */
637 static int exceptional_accept(const pj_str_t *accept)
638 {
639         int i;
640
641         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
642                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
643                         return 1;
644                 }
645         }
646
647         return 0;
648 }
649
650 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
651 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
652         const struct ast_sip_subscription_handler *handler)
653 {
654         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
655         char accept[AST_SIP_MAX_ACCEPT][64];
656         size_t num_accept_headers = 0;
657
658         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
659                 int i;
660
661                 for (i = 0; i < accept_header->count; ++i) {
662                         if (!exceptional_accept(&accept_header->values[i])) {
663                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
664                                 ++num_accept_headers;
665                         }
666                 }
667         }
668
669         if (num_accept_headers == 0) {
670                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
671                  * the default accept type for the event package is to be used.
672                  */
673                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
674                 num_accept_headers = 1;
675         }
676
677         return find_body_generator(accept, num_accept_headers, handler->body_type);
678 }
679
680 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
681  *
682  *  \retval 1 rdata has an eventlist containing supported header
683  *  \retval 0 rdata doesn't have an eventlist containing supported header
684  */
685 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
686 {
687         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
688
689         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
690                 int i;
691
692                 for (i = 0; i < supported_header->count; i++) {
693                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
694                                 return 1;
695                         }
696                 }
697         }
698
699         return 0;
700 }
701
702 struct resource_tree;
703
704 /*!
705  * \brief A node for a resource tree.
706  */
707 struct tree_node {
708         AST_VECTOR(, struct tree_node *) children;
709         unsigned int full_state;
710         char resource[0];
711 };
712
713 /*!
714  * \brief Helper function for retrieving a resource list for a given event.
715  *
716  * This will retrieve a resource list that corresponds to the resource and event provided.
717  *
718  * \param resource The name of the resource list to retrieve
719  * \param event The expected event name on the resource list
720  */
721 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
722 {
723         struct resource_list *list;
724
725         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
726         if (!list) {
727                 return NULL;
728         }
729
730         if (strcmp(list->event, event)) {
731                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
732                                 resource, list->event, event);
733                 ao2_cleanup(list);
734                 return NULL;
735         }
736
737         return list;
738 }
739
740 /*!
741  * \brief Allocate a tree node
742  *
743  * In addition to allocating and initializing the tree node, the node is also added
744  * to the vector of visited resources. See \ref build_resource_tree for more information
745  * on the visited resources.
746  *
747  * \param resource The name of the resource for this tree node.
748  * \param visited The vector of resources that have been visited.
749  * \param if allocating a list, indicate whether full state is requested in notifications.
750  * \retval NULL Allocation failure.
751  * \retval non-NULL The newly-allocated tree_node
752  */
753 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
754 {
755         struct tree_node *node;
756
757         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
758         if (!node) {
759                 return NULL;
760         }
761
762         strcpy(node->resource, resource);
763         if (AST_VECTOR_INIT(&node->children, 4)) {
764                 ast_free(node);
765                 return NULL;
766         }
767         node->full_state = full_state;
768
769         if (visited) {
770                 AST_VECTOR_APPEND(visited, resource);
771         }
772         return node;
773 }
774
775 /*!
776  * \brief Destructor for a tree node
777  *
778  * This function calls recursively in order to destroy
779  * all nodes lower in the tree from the given node in
780  * addition to the node itself.
781  *
782  * \param node The node to destroy.
783  */
784 static void tree_node_destroy(struct tree_node *node)
785 {
786         int i;
787         if (!node) {
788                 return;
789         }
790
791         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
792                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
793         }
794         AST_VECTOR_FREE(&node->children);
795         ast_free(node);
796 }
797
798 /*!
799  * \brief Determine if this resource has been visited already
800  *
801  * See \ref build_resource_tree for more information
802  *
803  * \param resource The resource currently being visited
804  * \param visited The resources that have previously been visited
805  */
806 static int have_visited(const char *resource, struct resources *visited)
807 {
808         int i;
809
810         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
811                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
812                         return 1;
813                 }
814         }
815
816         return 0;
817 }
818
819 /*!
820  * \brief Build child nodes for a given parent.
821  *
822  * This iterates through the items on a resource list and creates tree nodes for each one. The
823  * tree nodes created are children of the supplied parent node. If an item in the resource
824  * list is itself a list, then this function is called recursively to provide children for
825  * the the new node.
826  *
827  * If an item in a resource list is not a list, then the supplied subscription handler is
828  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
829  * is used to determine if the node can be added to the tree or not.
830  *
831  * If a parent node ends up having no child nodes added under it, then the parent node is
832  * pruned from the tree.
833  *
834  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
835  * \param handler The subscription handler for leaf nodes in the tree.
836  * \param list The configured resource list from which the child node is being built.
837  * \param parent The parent node for these children.
838  * \param visited The resources that have already been visited.
839  */
840 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
841                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
842 {
843         int i;
844
845         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
846                 struct tree_node *current;
847                 struct resource_list *child_list;
848                 const char *resource = AST_VECTOR_GET(&list->items, i);
849
850                 if (have_visited(resource, visited)) {
851                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
852                         continue;
853                 }
854
855                 child_list = retrieve_resource_list(resource, list->event);
856                 if (!child_list) {
857                         int resp = handler->notifier->new_subscribe(endpoint, resource);
858                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
859                                 current = tree_node_alloc(resource, visited, 0);
860                                 if (!current) {
861                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
862                                                         "allocation error afterwards\n", resource);
863                                         continue;
864                                 }
865                                 ast_debug(1, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
866                                                 resource, parent->resource);
867                                 AST_VECTOR_APPEND(&parent->children, current);
868                         } else {
869                                 ast_debug(1, "Subscription to leaf resource %s resulted in error response %d\n",
870                                                 resource, resp);
871                         }
872                 } else {
873                         ast_debug(1, "Resource %s (child of %s) is a list\n", resource, parent->resource);
874                         current = tree_node_alloc(resource, visited, child_list->full_state);
875                         if (!current) {
876                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
877                                 continue;
878                         }
879                         build_node_children(endpoint, handler, child_list, current, visited);
880                         if (AST_VECTOR_SIZE(&current->children) > 0) {
881                                 ast_debug(1, "List %s had no successful children.\n", resource);
882                                 AST_VECTOR_APPEND(&parent->children, current);
883                         } else {
884                                 ast_debug(1, "List %s had successful children. Adding to parent %s\n",
885                                                 resource, parent->resource);
886                                 tree_node_destroy(current);
887                         }
888                         ao2_cleanup(child_list);
889                 }
890         }
891 }
892
893 /*!
894  * \brief A resource tree
895  *
896  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
897  * be a resource list. If this is the case, the resource list may contain resources
898  * that are themselves lists. The structure needed to hold the resources is
899  * a tree.
900  *
901  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
902  * to the individual resources in the tree would be successful or not. Any successful
903  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
904  * result in no node being created.
905  *
906  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
907  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
908  */
909 struct resource_tree {
910         struct tree_node *root;
911         unsigned int notification_batch_interval;
912 };
913
914 /*!
915  * \brief Destroy a resource tree.
916  *
917  * This function makes no assumptions about how the tree itself was
918  * allocated and does not attempt to free the tree itself. Callers
919  * of this function are responsible for freeing the tree.
920  *
921  * \param tree The tree to destroy.
922  */
923 static void resource_tree_destroy(struct resource_tree *tree)
924 {
925         if (tree) {
926                 tree_node_destroy(tree->root);
927         }
928 }
929
930 /*!
931  * \brief Build a resource tree
932  *
933  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
934  *
935  * This function also creates a container that has all resources that have been visited during
936  * creation of the tree, whether those resources resulted in a tree node being created or not.
937  * Keeping this container of visited resources allows for misconfigurations such as loops in
938  * the tree or duplicated resources to be detected.
939  *
940  * \param endpoint The endpoint that sent the SUBSCRIBE request.
941  * \param handler The subscription handler for leaf nodes in the tree.
942  * \param resource The resource requested in the SUBSCRIBE request.
943  * \param tree The tree that is to be built.
944  * \param has_eventlist_support
945  *
946  * \retval 200-299 Successfully subscribed to at least one resource.
947  * \retval 300-699 Failure to subscribe to requested resource.
948  */
949 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
950                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
951 {
952         struct resource_list *list;
953         struct resources visited;
954
955         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
956                 ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
957                 tree->root = tree_node_alloc(resource, NULL, 0);
958                 if (!tree->root) {
959                         return 500;
960                 }
961                 return handler->notifier->new_subscribe(endpoint, resource);
962         }
963
964         ast_debug(1, "Subscription to resource %s is a list\n", resource);
965         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
966                 return 500;
967         }
968
969         tree->root = tree_node_alloc(resource, &visited, list->full_state);
970         if (!tree->root) {
971                 return 500;
972         }
973
974         tree->notification_batch_interval = list->notification_batch_interval;
975
976         build_node_children(endpoint, handler, list, tree->root, &visited);
977         AST_VECTOR_FREE(&visited);
978         ao2_cleanup(list);
979
980         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
981                 return 200;
982         } else {
983                 return 500;
984         }
985 }
986
987 static int datastore_hash(const void *obj, int flags)
988 {
989         const struct ast_datastore *datastore = obj;
990         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
991
992         ast_assert(uid != NULL);
993
994         return ast_str_hash(uid);
995 }
996
997 static int datastore_cmp(void *obj, void *arg, int flags)
998 {
999         const struct ast_datastore *datastore1 = obj;
1000         const struct ast_datastore *datastore2 = arg;
1001         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
1002
1003         ast_assert(datastore1->uid != NULL);
1004         ast_assert(uid2 != NULL);
1005
1006         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
1007 }
1008
1009 static int subscription_remove_serializer(void *obj)
1010 {
1011         struct sip_subscription_tree *sub_tree = obj;
1012
1013         /* This is why we keep the dialog on the subscription. When the subscription
1014          * is destroyed, there is no guarantee that the underlying dialog is ready
1015          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1016          * either. The dialog could be destroyed before our subscription is. We fix
1017          * this problem by keeping a reference to the dialog until it is time to
1018          * destroy the subscription. We need to have the dialog available when the
1019          * subscription is destroyed so that we can guarantee that our attempt to
1020          * remove the serializer will be successful.
1021          */
1022         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
1023         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1024
1025         return 0;
1026 }
1027
1028 static void add_subscription(struct sip_subscription_tree *obj)
1029 {
1030         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1031         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1032 }
1033
1034 static void remove_subscription(struct sip_subscription_tree *obj)
1035 {
1036         struct sip_subscription_tree *i;
1037         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1038         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1039                 if (i == obj) {
1040                         AST_RWLIST_REMOVE_CURRENT(next);
1041                         if (i->root) {
1042                                 ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
1043                                                 ast_sip_subscription_get_resource_name(i->root));
1044                         }
1045                         break;
1046                 }
1047         }
1048         AST_RWLIST_TRAVERSE_SAFE_END;
1049 }
1050
1051 static void subscription_destructor(void *obj)
1052 {
1053         struct ast_sip_subscription *sub = obj;
1054
1055         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1056         ast_free(sub->body_text);
1057
1058         ao2_cleanup(sub->datastores);
1059 }
1060
1061 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1062                 const char *resource, struct sip_subscription_tree *tree)
1063 {
1064         struct ast_sip_subscription *sub;
1065         pjsip_sip_uri *contact_uri;
1066
1067         sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
1068         if (!sub) {
1069                 return NULL;
1070         }
1071         strcpy(sub->resource, resource); /* Safe */
1072
1073         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1074         if (!sub->datastores) {
1075                 ao2_ref(sub, -1);
1076                 return NULL;
1077         }
1078
1079         sub->body_text = ast_str_create(128);
1080         if (!sub->body_text) {
1081                 ao2_ref(sub, -1);
1082                 return NULL;
1083         }
1084
1085         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1086         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1087         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1088         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1089
1090         sub->handler = handler;
1091         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1092         sub->tree = tree;
1093
1094         return sub;
1095 }
1096
1097 /*!
1098  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1099  *
1100  * \param handler The handler to supply to leaf subscriptions.
1101  * \param resource The requested resource for this subscription.
1102  * \param generator Body generator to use for leaf subscriptions.
1103  * \param tree The root of the subscription tree.
1104  * \param current The tree node that corresponds to the subscription being created.
1105  */
1106 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1107                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1108                 struct sip_subscription_tree *tree, struct tree_node *current)
1109 {
1110         int i;
1111         struct ast_sip_subscription *sub;
1112
1113         sub = allocate_subscription(handler, resource, tree);
1114         if (!sub) {
1115                 return NULL;
1116         }
1117
1118         sub->full_state = current->full_state;
1119         sub->body_generator = generator;
1120
1121         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1122                 struct ast_sip_subscription *child;
1123                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1124
1125                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1126                                 tree, child_node);
1127
1128                 if (!child) {
1129                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1130                                         child_node->resource);
1131                         continue;
1132                 }
1133
1134                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1135                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1136                                         child_node->resource);
1137                 }
1138         }
1139
1140         return sub;
1141 }
1142
1143 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1144 {
1145         int i;
1146
1147         if (!sub) {
1148                 return;
1149         }
1150
1151         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1152                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1153                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1154                         ao2_cleanup(AST_VECTOR_GET(&sub->children, i));
1155                 }
1156                 return;
1157         }
1158
1159         if (sub->handler->subscription_shutdown) {
1160                 sub->handler->subscription_shutdown(sub);
1161         }
1162 }
1163
1164 static void subscription_tree_destructor(void *obj)
1165 {
1166         struct sip_subscription_tree *sub_tree = obj;
1167
1168         remove_subscription(sub_tree);
1169
1170         subscription_persistence_remove(sub_tree);
1171         ao2_cleanup(sub_tree->endpoint);
1172
1173         if (sub_tree->dlg) {
1174                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub_tree);
1175         }
1176
1177         shutdown_subscriptions(sub_tree->root);
1178         ao2_cleanup(sub_tree->root);
1179
1180         ast_taskprocessor_unreference(sub_tree->serializer);
1181         ast_module_unref(ast_module_info->self);
1182 }
1183
1184 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1185 {
1186         /* We keep a reference to the dialog until our subscription is destroyed. See
1187          * the subscription_destructor for more details
1188          */
1189         pjsip_dlg_inc_session(dlg, &pubsub_module);
1190         sub_tree->dlg = dlg;
1191         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1192         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1193 }
1194
1195 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
1196 {
1197         struct sip_subscription_tree *sub_tree;
1198
1199         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1200         if (!sub_tree) {
1201                 return NULL;
1202         }
1203
1204         ast_module_ref(ast_module_info->self);
1205
1206         sub_tree->serializer = ast_sip_create_serializer();
1207         if (!sub_tree->serializer) {
1208                 ao2_ref(sub_tree, -1);
1209                 return NULL;
1210         }
1211
1212         sub_tree->endpoint = ao2_bump(endpoint);
1213         sub_tree->notify_sched_id = -1;
1214
1215         add_subscription(sub_tree);
1216         return sub_tree;
1217 }
1218
1219 /*!
1220  * \brief Create a subscription tree based on a resource tree.
1221  *
1222  * Using the previously-determined valid resources in the provided resource tree,
1223  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1224  * subscription tree is a real subscription, and the rest in the tree are
1225  * virtual subscriptions.
1226  *
1227  * \param handler The handler to use for leaf subscriptions
1228  * \param endpoint The endpoint that sent the SUBSCRIBE request
1229  * \param rdata The SUBSCRIBE content
1230  * \param resource The requested resource in the SUBSCRIBE request
1231  * \param generator The body generator to use in leaf subscriptions
1232  * \param tree The resource tree on which the subscription tree is based
1233  *
1234  * \retval NULL Could not create the subscription tree
1235  * \retval non-NULL The root of the created subscription tree
1236  */
1237
1238 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1239                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1240                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree)
1241 {
1242         struct sip_subscription_tree *sub_tree;
1243         pjsip_dialog *dlg;
1244         struct subscription_persistence *persistence;
1245
1246         sub_tree = allocate_subscription_tree(endpoint);
1247         if (!sub_tree) {
1248                 return NULL;
1249         }
1250         sub_tree->role = AST_SIP_NOTIFIER;
1251
1252         dlg = ast_sip_create_dialog_uas(endpoint, rdata);
1253         if (!dlg) {
1254                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1255                 ao2_ref(sub_tree, -1);
1256                 return NULL;
1257         }
1258
1259         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1260                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1261         if (persistence) {
1262                 /* Update the created dialog with the persisted information */
1263                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1264                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1265                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1266                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1267                 dlg->local.cseq = persistence->cseq;
1268                 dlg->remote.cseq = persistence->cseq;
1269         }
1270
1271         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1272         subscription_setup_dialog(sub_tree, dlg);
1273
1274         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1275                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1276
1277         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1278
1279         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1280         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1281                 sub_tree->is_list = 1;
1282         }
1283
1284         return sub_tree;
1285 }
1286
1287 /*! \brief Callback function to perform the actual recreation of a subscription */
1288 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1289 {
1290         struct subscription_persistence *persistence = obj;
1291         pj_pool_t *pool = arg;
1292         pjsip_rx_data rdata = { { 0, }, };
1293         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1294         struct sip_subscription_tree *sub_tree;
1295         struct ast_sip_pubsub_body_generator *generator;
1296         int resp;
1297         char *resource;
1298         size_t resource_size;
1299         pjsip_sip_uri *request_uri;
1300         struct resource_tree tree;
1301         pjsip_expires_hdr *expires_header;
1302         struct ast_sip_subscription_handler *handler;
1303
1304         /* If this subscription has already expired remove it */
1305         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1306                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1307                 return 0;
1308         }
1309
1310         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
1311         if (!endpoint) {
1312                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
1313                         persistence->endpoint);
1314                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1315                 return 0;
1316         }
1317
1318         pj_pool_reset(pool);
1319         rdata.tp_info.pool = pool;
1320
1321         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1322                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1323                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
1324                         persistence->endpoint);
1325                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1326                 return 0;
1327         }
1328
1329         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
1330         resource_size = pj_strlen(&request_uri->user) + 1;
1331         resource = alloca(resource_size);
1332         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1333
1334         /* Update the expiration header with the new expiration */
1335         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
1336         if (!expires_header) {
1337                 expires_header = pjsip_expires_hdr_create(pool, 0);
1338                 if (!expires_header) {
1339                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1340                         return 0;
1341                 }
1342                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
1343         }
1344         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1345
1346         handler = subscription_get_handler_from_rdata(&rdata);
1347         if (!handler || !handler->notifier) {
1348                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1349                 return 0;
1350         }
1351
1352         generator = subscription_get_generator_from_rdata(&rdata, handler);
1353         if (!generator) {
1354                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1355                 return 0;
1356         }
1357
1358         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
1359                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1360
1361         memset(&tree, 0, sizeof(tree));
1362         resp = build_resource_tree(endpoint, handler, resource, &tree,
1363                 ast_sip_pubsub_has_eventlist_support(&rdata));
1364         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1365                 sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree);
1366                 if (!sub_tree) {
1367                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1368                         ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
1369                         return 0;
1370                 }
1371                 sub_tree->persistence = ao2_bump(persistence);
1372                 subscription_persistence_update(sub_tree, &rdata);
1373         } else {
1374                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1375         }
1376         resource_tree_destroy(&tree);
1377
1378         return 0;
1379 }
1380
1381 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1382 static int subscription_persistence_load(void *data)
1383 {
1384         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1385                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1386         pj_pool_t *pool;
1387
1388         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1389                 PJSIP_POOL_RDATA_INC);
1390         if (!pool) {
1391                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1392                 return 0;
1393         }
1394
1395         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1396
1397         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1398
1399         ao2_ref(persisted_subscriptions, -1);
1400         return 0;
1401 }
1402
1403 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1404 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1405 {
1406         struct ast_json_payload *payload;
1407         const char *type;
1408
1409         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1410                 return;
1411         }
1412
1413         payload = stasis_message_data(message);
1414         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1415
1416         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1417          * recreate SIP subscriptions.
1418          */
1419         if (strcmp(type, "FullyBooted")) {
1420                 return;
1421         }
1422
1423         /* This has to be here so the subscription is recreated when the body generator is available */
1424         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1425
1426         /* Once the system is fully booted we don't care anymore */
1427         stasis_unsubscribe(sub);
1428 }
1429
1430 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1431
1432 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1433 {
1434         int num = 0;
1435         struct sip_subscription_tree *i;
1436         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1437
1438         if (!on_subscription) {
1439                 return num;
1440         }
1441
1442         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1443                 if (on_subscription(i, arg)) {
1444                         break;
1445                 }
1446                 ++num;
1447         }
1448         return num;
1449 }
1450
1451 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1452                                     struct ast_str **buf)
1453 {
1454         char str[256];
1455         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1456
1457         ast_str_append(buf, 0, "Role: %s\r\n",
1458                        sip_subscription_roles_map[sub_tree->role]);
1459         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1460                        ast_sorcery_object_get_id(sub_tree->endpoint));
1461
1462         ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1463         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1464
1465         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1466
1467         ast_callerid_merge(str, sizeof(str),
1468                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1469                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1470                            "Unknown");
1471
1472         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1473
1474         /* XXX This needs to be done recursively for lists */
1475         if (sub_tree->root->handler->to_ami) {
1476                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1477         }
1478 }
1479
1480
1481 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1482 {
1483         pjsip_dialog *dlg = sub->tree->dlg;
1484         pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1485         pj_str_t name;
1486
1487         pj_cstr(&name, header);
1488
1489         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1490 }
1491
1492 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1493                 struct ast_sip_endpoint *endpoint, const char *resource)
1494 {
1495         struct ast_sip_subscription *sub;
1496         pjsip_dialog *dlg;
1497         struct ast_sip_contact *contact;
1498         pj_str_t event;
1499         pjsip_tx_data *tdata;
1500         pjsip_evsub *evsub;
1501         struct sip_subscription_tree *sub_tree = NULL;
1502
1503         sub_tree = allocate_subscription_tree(endpoint);
1504         if (!sub_tree) {
1505                 return NULL;
1506         }
1507
1508         sub = allocate_subscription(handler, resource, sub_tree);
1509         if (!sub) {
1510                 ao2_cleanup(sub_tree);
1511                 return NULL;
1512         }
1513
1514         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1515         if (!contact || ast_strlen_zero(contact->uri)) {
1516                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1517                                 ast_sorcery_object_get_id(endpoint));
1518                 ao2_ref(sub_tree, -1);
1519                 ao2_cleanup(contact);
1520                 return NULL;
1521         }
1522
1523         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1524         ao2_cleanup(contact);
1525         if (!dlg) {
1526                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1527                 ao2_ref(sub_tree, -1);
1528                 return NULL;
1529         }
1530
1531         pj_cstr(&event, handler->event_name);
1532         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1533         subscription_setup_dialog(sub_tree, dlg);
1534
1535         evsub = sub_tree->evsub;
1536
1537         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1538                 pjsip_evsub_send_request(evsub, tdata);
1539         } else {
1540                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1541                  * being called and terminating the subscription. Therefore, we don't
1542                  * need to decrease the reference count of sub here.
1543                  */
1544                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1545                 ao2_ref(sub_tree, -1);
1546                 return NULL;
1547         }
1548
1549         return sub;
1550 }
1551
1552 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1553 {
1554         ast_assert(sub->tree->endpoint != NULL);
1555         return ao2_bump(sub->tree->endpoint);
1556 }
1557
1558 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1559 {
1560         ast_assert(sub->tree->serializer != NULL);
1561         return sub->tree->serializer;
1562 }
1563
1564 /*!
1565  * \brief Pre-allocate a buffer for the transmission
1566  *
1567  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1568  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1569  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1570  * packet, then we get told the message is too long to be sent.
1571  *
1572  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1573  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1574  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1575  * if the message will fit, and resizing the buffer as required.
1576  *
1577  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1578  * it at 64000 for a couple of reasons:
1579  * 1) Allocating more than 64K at a time is hard to justify
1580  * 2) If the message goes through proxies, those proxies will want to add Via and
1581  *    Record-Route headers, making the message even larger. Giving some space for
1582  *    those headers is a nice thing to do.
1583  *
1584  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1585  * going to impose the same 64K limit as a memory savings.
1586  *
1587  * \param tdata The tdata onto which to allocate a buffer
1588  * \retval 0 Success
1589  * \retval -1 The message is too large
1590  */
1591 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1592 {
1593         int buf_size;
1594         int size = -1;
1595         char *buf;
1596
1597         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1598                 buf = pj_pool_alloc(tdata->pool, buf_size);
1599                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1600         }
1601
1602         if (size == -1) {
1603                 return -1;
1604         }
1605
1606         tdata->buf.start = buf;
1607         tdata->buf.cur = tdata->buf.start;
1608         tdata->buf.end = tdata->buf.start + buf_size;
1609
1610         return 0;
1611 }
1612
1613 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1614 {
1615 #ifdef TEST_FRAMEWORK
1616         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1617 #endif
1618         int res;
1619
1620         if (allocate_tdata_buffer(tdata)) {
1621                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1622                 return -1;
1623         }
1624
1625         res = pjsip_evsub_send_request(sub_tree->evsub, tdata) == PJ_SUCCESS ? 0 : -1;
1626         subscription_persistence_update(sub_tree, NULL);
1627
1628         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1629                 "StateText: %s\r\n"
1630                 "Endpoint: %s\r\n",
1631                 pjsip_evsub_get_state_name(sub_tree->evsub),
1632                 ast_sorcery_object_get_id(endpoint));
1633
1634         return res;
1635 }
1636
1637 /*!
1638  * \brief Add a resource XML element to an RLMI body
1639  *
1640  * Each resource element represents a subscribed resource in the list. This function currently
1641  * will unconditionally add an instance element to each created resource element. Instance
1642  * elements refer to later parts in the multipart body.
1643  *
1644  * \param pool PJLIB allocation pool
1645  * \param cid Content-ID header of the resource
1646  * \param resource_name Name of the resource
1647  * \param resource_uri URI of the resource
1648  * \param state State of the subscribed resource
1649  */
1650 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1651                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1652 {
1653         static pj_str_t cid_name = { "cid", 3 };
1654         pj_xml_node *resource;
1655         pj_xml_node *name;
1656         pj_xml_node *instance;
1657         pj_xml_attr *cid_attr;
1658         char id[6];
1659         char uri[PJSIP_MAX_URL_SIZE];
1660
1661         /* This creates a string representing the Content-ID without the enclosing < > */
1662         const pj_str_t cid_stripped = {
1663                 .ptr = cid->hvalue.ptr + 1,
1664                 .slen = cid->hvalue.slen - 2,
1665         };
1666
1667         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1668         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1669         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1670
1671         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1672         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1673
1674         pj_strdup2(pool, &name->content, resource_name);
1675
1676         ast_generate_random_string(id, sizeof(id));
1677
1678         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1679         ast_sip_presence_xml_create_attr(pool, instance, "state",
1680                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1681
1682         /* Use the PJLIB-util XML library directly here since we are using a
1683          * pj_str_t
1684          */
1685
1686         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1687         pj_xml_add_attr(instance, cid_attr);
1688 }
1689
1690 /*!
1691  * \brief A multipart body part and meta-information
1692  *
1693  * When creating a multipart body part, the end result (the
1694  * pjsip_multipart_part) is hard to inspect without undoing
1695  * a lot of what was done to create it. Therefore, we use this
1696  * structure to store meta-information about the body part.
1697  *
1698  * The main consumer of this is the creator of the RLMI body
1699  * part of a multipart resource list body.
1700  */
1701 struct body_part {
1702         /*! Content-ID header for the body part */
1703         pjsip_generic_string_hdr *cid;
1704         /*! Subscribed resource represented in the body part */
1705         const char *resource;
1706         /*! URI for the subscribed body part */
1707         pjsip_sip_uri *uri;
1708         /*! Subscription state of the resource represented in the body part */
1709         pjsip_evsub_state state;
1710         /*! The actual body part that will be present in the multipart body */
1711         pjsip_multipart_part *part;
1712 };
1713
1714 /*!
1715  * \brief Type declaration for container of body part structures
1716  */
1717 AST_VECTOR(body_part_list, struct body_part *);
1718
1719 /*!
1720  * \brief Create a Content-ID header
1721  *
1722  * Content-ID headers are required by RFC2387 for multipart/related
1723  * bodies. They serve as identifiers for each part of the multipart body.
1724  *
1725  * \param pool PJLIB allocation pool
1726  * \param sub Subscription to a resource
1727  */
1728 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1729                 const struct ast_sip_subscription *sub)
1730 {
1731         static const pj_str_t cid_name = { "Content-ID", 10 };
1732         pjsip_generic_string_hdr *cid;
1733         char id[6];
1734         size_t alloc_size;
1735         pj_str_t cid_value;
1736
1737         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1738         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1739         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1740         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1741                         ast_generate_random_string(id, sizeof(id)),
1742                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1743         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1744
1745         return cid;
1746 }
1747
1748 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1749 {
1750         int num_printed;
1751         pj_xml_node *rlmi = msg_body->data;
1752
1753         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1754         if (num_printed == AST_PJSIP_XML_PROLOG_LEN) {
1755                 return -1;
1756         }
1757
1758         return num_printed;
1759 }
1760
1761 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1762 {
1763         const pj_xml_node *rlmi = data;
1764
1765         return pj_xml_clone(pool, rlmi);
1766 }
1767
1768 /*!
1769  * \brief Create an RLMI body part for a multipart resource list body
1770  *
1771  * RLMI (Resource list meta information) is a special body type that lists
1772  * the subscribed resources and tells subscribers the number of subscribed
1773  * resources and what other body parts are in the multipart body. The
1774  * RLMI body also has a version number that a subscriber can use to ensure
1775  * that the locally-stored state corresponds to server state.
1776  *
1777  * \param pool The allocation pool
1778  * \param sub The subscription representing the subscribed resource list
1779  * \param body_parts A container of body parts that RLMI will refer to
1780  * \param full_state Indicates whether this is a full or partial state notification
1781  * \return The multipart part representing the RLMI body
1782  */
1783 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1784                 struct body_part_list *body_parts, unsigned int full_state)
1785 {
1786         static const pj_str_t rlmi_type = { "application", 11 };
1787         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1788         pj_xml_node *rlmi;
1789         pj_xml_node *name;
1790         pjsip_multipart_part *rlmi_part;
1791         char version_str[32];
1792         char uri[PJSIP_MAX_URL_SIZE];
1793         pjsip_generic_string_hdr *cid;
1794         int i;
1795
1796         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1797         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1798
1799         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1800         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1801
1802         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1803         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1804         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1805
1806         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1807         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1808
1809         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1810                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1811
1812                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1813         }
1814
1815         rlmi_part = pjsip_multipart_create_part(pool);
1816
1817         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
1818         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
1819         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
1820         pj_list_init(&rlmi_part->body->content_type.param);
1821
1822         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
1823         rlmi_part->body->clone_data = rlmi_clone_data;
1824         rlmi_part->body->print_body = rlmi_print_body;
1825
1826         cid = generate_content_id_hdr(pool, sub);
1827         pj_list_insert_before(&rlmi_part->hdr, cid);
1828
1829         return rlmi_part;
1830 }
1831
1832 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
1833                 unsigned int force_full_state);
1834
1835 /*!
1836  * \brief Destroy a list of body parts
1837  *
1838  * \param parts The container of parts to destroy
1839  */
1840 static void free_body_parts(struct body_part_list *parts)
1841 {
1842         int i;
1843
1844         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
1845                 struct body_part *part = AST_VECTOR_GET(parts, i);
1846                 ast_free(part);
1847         }
1848
1849         AST_VECTOR_FREE(parts);
1850 }
1851
1852 /*!
1853  * \brief Allocate and initialize a body part structure
1854  *
1855  * \param pool PJLIB allocation pool
1856  * \param sub Subscription representing a subscribed resource
1857  */
1858 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
1859 {
1860         struct body_part *bp;
1861
1862         bp = ast_calloc(1, sizeof(*bp));
1863         if (!bp) {
1864                 return NULL;
1865         }
1866
1867         bp->cid = generate_content_id_hdr(pool, sub);
1868         bp->resource = sub->resource;
1869         bp->state = sub->subscription_state;
1870         bp->uri = sub->uri;
1871
1872         return bp;
1873 }
1874
1875 /*!
1876  * \brief Create a multipart body part for a subscribed resource
1877  *
1878  * \param pool PJLIB allocation pool
1879  * \param sub The subscription representing a subscribed resource
1880  * \param parts A vector of parts to append the created part to.
1881  * \param use_full_state Unused locally, but may be passed to other functions
1882  */
1883 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
1884                 struct body_part_list *parts, unsigned int use_full_state)
1885 {
1886         struct body_part *bp;
1887         pjsip_msg_body *body;
1888
1889         bp = allocate_body_part(pool, sub);
1890         if (!bp) {
1891                 return;
1892         }
1893
1894         body = generate_notify_body(pool, sub, use_full_state);
1895         if (!body) {
1896                 /* Partial state was requested and the resource has not changed state */
1897                 ast_free(bp);
1898                 return;
1899         }
1900
1901         bp->part = pjsip_multipart_create_part(pool);
1902         bp->part->body = body;
1903         pj_list_insert_before(&bp->part->hdr, bp->cid);
1904
1905         AST_VECTOR_APPEND(parts, bp);
1906 }
1907
1908 /*!
1909  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
1910  *
1911  * \param pool
1912  * \return The multipart message body
1913  */
1914 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
1915 {
1916         pjsip_media_type media_type;
1917         pjsip_param *media_type_param;
1918         char boundary[6];
1919         pj_str_t pj_boundary;
1920
1921         pjsip_media_type_init2(&media_type, "multipart", "related");
1922
1923         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
1924         pj_list_init(media_type_param);
1925
1926         pj_strdup2(pool, &media_type_param->name, "type");
1927         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
1928
1929         pj_list_insert_before(&media_type.param, media_type_param);
1930
1931         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
1932         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
1933 }
1934
1935 /*!
1936  * \brief Create a resource list body for NOTIFY requests
1937  *
1938  * Resource list bodies are multipart/related bodies. The first part of the multipart body
1939  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
1940  * convey state of individual subscribed resources.
1941  *
1942  * \param pool PJLIB allocation pool
1943  * \param sub Subscription details from which to generate body
1944  * \param force_full_state If true, ignore resource list settings and send a full state notification
1945  * \return The generated multipart/related body
1946  */
1947 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1948                 unsigned int force_full_state)
1949 {
1950         int i;
1951         pjsip_multipart_part *rlmi_part;
1952         pjsip_msg_body *multipart;
1953         struct body_part_list body_parts;
1954         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
1955
1956         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
1957                 return NULL;
1958         }
1959
1960         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1961                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
1962         }
1963
1964         /* This can happen if issuing partial state and no children of the list have changed state */
1965         if (AST_VECTOR_SIZE(&body_parts) == 0) {
1966                 return NULL;
1967         }
1968
1969         multipart = create_multipart_body(pool);
1970
1971         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
1972         if (!rlmi_part) {
1973                 return NULL;
1974         }
1975         pjsip_multipart_add_part(pool, multipart, rlmi_part);
1976
1977         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
1978                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
1979         }
1980
1981         free_body_parts(&body_parts);
1982         return multipart;
1983 }
1984
1985 /*!
1986  * \brief Create the body for a NOTIFY request.
1987  *
1988  * \param pool The pool used for allocations
1989  * \param root The root of the subscription tree
1990  * \param force_full_state If true, ignore resource list settings and send a full state notification
1991  */
1992 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
1993                 unsigned int force_full_state)
1994 {
1995         pjsip_msg_body *body;
1996
1997         if (AST_VECTOR_SIZE(&root->children) == 0) {
1998                 if (force_full_state || root->body_changed) {
1999                         /* Not a list. We've already generated the body and saved it on the subscription.
2000                          * Use that directly.
2001                          */
2002                         pj_str_t type;
2003                         pj_str_t subtype;
2004                         pj_str_t text;
2005
2006                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2007                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2008                         pj_cstr(&text, ast_str_buffer(root->body_text));
2009
2010                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2011                         root->body_changed = 0;
2012                 } else {
2013                         body = NULL;
2014                 }
2015         } else {
2016                 body = generate_list_body(pool, root, force_full_state);
2017         }
2018
2019         return body;
2020 }
2021
2022 /*!
2023  * \brief Shortcut method to create a Require: eventlist header
2024  */
2025 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2026 {
2027         pjsip_require_hdr *require;
2028
2029         require = pjsip_require_hdr_create(pool);
2030         pj_strdup2(pool, &require->values[0], "eventlist");
2031         require->count = 1;
2032
2033         return require;
2034 }
2035
2036 /*!
2037  * \brief Send a NOTIFY request to a subscriber
2038  *
2039  * \param sub_tree The subscription tree representing the subscription
2040  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2041  * \retval 0 Success
2042  * \retval non-zero Failure
2043  */
2044 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2045 {
2046         pjsip_evsub *evsub = sub_tree->evsub;
2047         pjsip_tx_data *tdata;
2048
2049         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2050                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2051                 return -1;
2052         }
2053
2054         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2055         if (!tdata->msg->body) {
2056                 pjsip_tx_data_dec_ref(tdata);
2057                 return -1;
2058         }
2059
2060         if (sub_tree->is_list) {
2061                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2062                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2063         }
2064
2065         if (sip_subscription_send_request(sub_tree, tdata)) {
2066                 return -1;
2067         }
2068
2069         sub_tree->send_scheduled_notify = 0;
2070
2071         return 0;
2072 }
2073
2074 static int serialized_send_notify(void *userdata)
2075 {
2076         struct sip_subscription_tree *sub_tree = userdata;
2077
2078         /* It's possible that between when the notification was scheduled
2079          * and now, that a new SUBSCRIBE arrived, requiring full state to be
2080          * sent out in an immediate NOTIFY. If that has happened, we need to
2081          * bail out here instead of sending the batched NOTIFY.
2082          */
2083         if (!sub_tree->send_scheduled_notify) {
2084                 ao2_cleanup(sub_tree);
2085                 return 0;
2086         }
2087
2088         send_notify(sub_tree, 0);
2089         ast_test_suite_event_notify("SUBSCRIPTION_STATE_CHANGED",
2090                         "Resource: %s",
2091                         sub_tree->root->resource);
2092         sub_tree->notify_sched_id = -1;
2093         ao2_cleanup(sub_tree);
2094         return 0;
2095 }
2096
2097 static int sched_cb(const void *data)
2098 {
2099         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2100
2101         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2102         ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree);
2103         return 0;
2104 }
2105
2106 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2107 {
2108         /* There's already a notification scheduled */
2109         if (sub_tree->notify_sched_id > -1) {
2110                 return 0;
2111         }
2112
2113         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2114         if (sub_tree->notify_sched_id < 0) {
2115                 return -1;
2116         }
2117
2118         sub_tree->send_scheduled_notify = 1;
2119         return 0;
2120 }
2121
2122 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2123                 int terminate)
2124 {
2125         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2126                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2127                 return -1;
2128         }
2129
2130         sub->body_changed = 1;
2131         if (terminate) {
2132                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2133         }
2134
2135         if (sub->tree->notification_batch_interval) {
2136                 return schedule_notification(sub->tree);
2137         } else {
2138                 int res;
2139                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2140                 ao2_ref(sub->tree, +1);
2141                 res = send_notify(sub->tree, 0);
2142                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2143                                 "Resource: %s",
2144                                 sub->tree->root->resource);
2145                 ao2_ref(sub->tree, -1);
2146
2147                 return res;
2148         }
2149 }
2150
2151 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2152 {
2153         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2154 }
2155
2156 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2157 {
2158         pjsip_dialog *dlg = sub->tree->dlg;
2159         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2160 }
2161
2162 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2163 {
2164         return sub->resource;
2165 }
2166
2167 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2168 {
2169         pjsip_hdr res_hdr;
2170
2171         /* If this is a persistence recreation the subscription has already been accepted */
2172         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2173                 return 0;
2174         }
2175
2176         pj_list_init(&res_hdr);
2177         if (sub_tree->is_list) {
2178                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2179                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2180         }
2181
2182         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2183 }
2184
2185 static void subscription_datastore_destroy(void *obj)
2186 {
2187         struct ast_datastore *datastore = obj;
2188
2189         /* Using the destroy function (if present) destroy the data */
2190         if (datastore->info->destroy != NULL && datastore->data != NULL) {
2191                 datastore->info->destroy(datastore->data);
2192                 datastore->data = NULL;
2193         }
2194
2195         ast_free((void *) datastore->uid);
2196         datastore->uid = NULL;
2197 }
2198
2199 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2200 {
2201         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
2202         char uuid_buf[AST_UUID_STR_LEN];
2203         const char *uid_ptr = uid;
2204
2205         if (!info) {
2206                 return NULL;
2207         }
2208
2209         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
2210         if (!datastore) {
2211                 return NULL;
2212         }
2213
2214         datastore->info = info;
2215         if (ast_strlen_zero(uid)) {
2216                 /* They didn't provide an ID so we'll provide one ourself */
2217                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
2218         }
2219
2220         datastore->uid = ast_strdup(uid_ptr);
2221         if (!datastore->uid) {
2222                 return NULL;
2223         }
2224
2225         ao2_ref(datastore, +1);
2226         return datastore;
2227 }
2228
2229 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2230 {
2231         ast_assert(datastore != NULL);
2232         ast_assert(datastore->info != NULL);
2233         ast_assert(!ast_strlen_zero(datastore->uid));
2234
2235         if (!ao2_link(subscription->datastores, datastore)) {
2236                 return -1;
2237         }
2238         return 0;
2239 }
2240
2241 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2242 {
2243         return ao2_find(subscription->datastores, name, OBJ_KEY);
2244 }
2245
2246 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2247 {
2248         ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
2249 }
2250
2251 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2252 {
2253         ast_assert(datastore != NULL);
2254         ast_assert(datastore->info != NULL);
2255         ast_assert(!ast_strlen_zero(datastore->uid));
2256
2257         if (!ao2_link(publication->datastores, datastore)) {
2258                 return -1;
2259         }
2260         return 0;
2261 }
2262
2263 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2264 {
2265         return ao2_find(publication->datastores, name, OBJ_KEY);
2266 }
2267
2268 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2269 {
2270         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
2271 }
2272
2273 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2274
2275 static int publication_hash_fn(const void *obj, const int flags)
2276 {
2277         const struct ast_sip_publication *publication = obj;
2278         const int *entity_tag = obj;
2279
2280         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2281 }
2282
2283 static int publication_cmp_fn(void *obj, void *arg, int flags)
2284 {
2285         const struct ast_sip_publication *publication1 = obj;
2286         const struct ast_sip_publication *publication2 = arg;
2287         const int *entity_tag = arg;
2288
2289         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2290                 CMP_MATCH | CMP_STOP : 0);
2291 }
2292
2293 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2294 {
2295         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2296         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2297 }
2298
2299 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2300 {
2301         if (ast_strlen_zero(handler->event_name)) {
2302                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2303                 return -1;
2304         }
2305
2306         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2307                 publication_hash_fn, publication_cmp_fn))) {
2308                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2309                         handler->event_name);
2310                 return -1;
2311         }
2312
2313         publish_add_handler(handler);
2314
2315         ast_module_ref(ast_module_info->self);
2316
2317         return 0;
2318 }
2319
2320 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2321 {
2322         struct ast_sip_publish_handler *iter;
2323         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2324         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2325                 if (handler == iter) {
2326                         AST_RWLIST_REMOVE_CURRENT(next);
2327                         ao2_cleanup(handler->publications);
2328                         ast_module_unref(ast_module_info->self);
2329                         break;
2330                 }
2331         }
2332         AST_RWLIST_TRAVERSE_SAFE_END;
2333 }
2334
2335 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2336
2337 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2338 {
2339         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2340         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2341         ast_module_ref(ast_module_info->self);
2342 }
2343
2344 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2345 {
2346         struct ast_sip_subscription_handler *iter;
2347         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2348
2349         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2350                 if (!strcmp(iter->event_name, event_name)) {
2351                         break;
2352                 }
2353         }
2354         return iter;
2355 }
2356
2357 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2358 {
2359         pj_str_t event;
2360         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2361         struct ast_sip_subscription_handler *existing;
2362         int i = 0;
2363
2364         if (ast_strlen_zero(handler->event_name)) {
2365                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2366                 return -1;
2367         }
2368
2369         existing = find_sub_handler_for_event_name(handler->event_name);
2370         if (existing) {
2371                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2372                                 "A handler is already registered\n", handler->event_name);
2373                 return -1;
2374         }
2375
2376         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2377                 pj_cstr(&accept[i], handler->accept[i]);
2378         }
2379
2380         pj_cstr(&event, handler->event_name);
2381
2382         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2383
2384         sub_add_handler(handler);
2385
2386         return 0;
2387 }
2388
2389 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2390 {
2391         struct ast_sip_subscription_handler *iter;
2392         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2393         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2394                 if (handler == iter) {
2395                         AST_RWLIST_REMOVE_CURRENT(next);
2396                         ast_module_unref(ast_module_info->self);
2397                         break;
2398                 }
2399         }
2400         AST_RWLIST_TRAVERSE_SAFE_END;
2401 }
2402
2403 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
2404                 const char *content_subtype)
2405 {
2406         struct ast_sip_pubsub_body_generator *iter;
2407         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2408
2409         AST_LIST_TRAVERSE(&body_generators, iter, list) {
2410                 if (!strcmp(iter->type, content_type) &&
2411                                 !strcmp(iter->subtype, content_subtype)) {
2412                         break;
2413                 }
2414         };
2415
2416         return iter;
2417 }
2418
2419 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2420 {
2421         char *accept_copy = ast_strdupa(accept);
2422         char *subtype = accept_copy;
2423         char *type = strsep(&subtype, "/");
2424
2425         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2426                 return NULL;
2427         }
2428
2429         return find_body_generator_type_subtype(type, subtype);
2430 }
2431
2432 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2433                 size_t num_accept, const char *body_type)
2434 {
2435         int i;
2436         struct ast_sip_pubsub_body_generator *generator = NULL;
2437
2438         for (i = 0; i < num_accept; ++i) {
2439                 generator = find_body_generator_accept(accept[i]);
2440                 if (generator) {
2441                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2442                         if (strcmp(generator->body_type, body_type)) {
2443                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2444                                                 generator->type, generator->subtype, generator);
2445                                 generator = NULL;
2446                                 continue;
2447                         }
2448                         break;
2449                 } else {
2450                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2451                 }
2452         }
2453
2454         return generator;
2455 }
2456
2457 static int generate_initial_notify(struct ast_sip_subscription *sub)
2458 {
2459         void *notify_data;
2460         int res;
2461         struct ast_sip_body_data data = {
2462                 .body_type = sub->handler->body_type,
2463         };
2464
2465         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2466                 int i;
2467
2468                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2469                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2470                                 return -1;
2471                         }
2472                 }
2473
2474                 return 0;
2475         }
2476
2477         if (sub->handler->notifier->subscription_established(sub)) {
2478                 return -1;
2479         }
2480
2481         notify_data = sub->handler->notifier->get_notify_data(sub);
2482         if (!notify_data) {
2483                 return -1;
2484         }
2485
2486         data.body_data = notify_data;
2487
2488         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2489                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2490
2491         ao2_cleanup(notify_data);
2492
2493         return res;
2494 }
2495
2496 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2497 {
2498         pjsip_expires_hdr *expires_header;
2499         struct ast_sip_subscription_handler *handler;
2500         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2501         struct sip_subscription_tree *sub_tree;
2502         struct ast_sip_pubsub_body_generator *generator;
2503         char *resource;
2504         pjsip_uri *request_uri;
2505         pjsip_sip_uri *request_uri_sip;
2506         size_t resource_size;
2507         int resp;
2508         struct resource_tree tree;
2509
2510         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2511         ast_assert(endpoint != NULL);
2512
2513         if (!endpoint->subscription.allow) {
2514                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2515                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2516                 return PJ_TRUE;
2517         }
2518
2519         request_uri = rdata->msg_info.msg->line.req.uri;
2520
2521         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2522                 char uri_str[PJSIP_MAX_URL_SIZE];
2523
2524                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2525                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2526                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2527                 return PJ_TRUE;
2528         }
2529
2530         request_uri_sip = pjsip_uri_get_uri(request_uri);
2531         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2532         resource = alloca(resource_size);
2533         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2534
2535         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2536
2537         if (expires_header) {
2538                 if (expires_header->ivalue == 0) {
2539                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2540                                 ast_sorcery_object_get_id(endpoint));
2541                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2542                                 return PJ_TRUE;
2543                 }
2544                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2545                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2546                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2547                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2548                         return PJ_TRUE;
2549                 }
2550         }
2551
2552         handler = subscription_get_handler_from_rdata(rdata);
2553         if (!handler) {
2554                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2555                 return PJ_TRUE;
2556         }
2557
2558         generator = subscription_get_generator_from_rdata(rdata, handler);
2559         if (!generator) {
2560                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2561                 return PJ_TRUE;
2562         }
2563
2564         memset(&tree, 0, sizeof(tree));
2565         resp = build_resource_tree(endpoint, handler, resource, &tree,
2566                 ast_sip_pubsub_has_eventlist_support(rdata));
2567         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2568                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2569                 resource_tree_destroy(&tree);
2570                 return PJ_TRUE;
2571         }
2572
2573         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree);
2574         if (!sub_tree) {
2575                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2576         } else {
2577                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2578                 subscription_persistence_update(sub_tree, rdata);
2579                 sip_subscription_accept(sub_tree, rdata, resp);
2580                 if (generate_initial_notify(sub_tree->root)) {
2581                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2582                 }
2583                 send_notify(sub_tree, 1);
2584                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2585                                 "Resource: %s",
2586                                 sub_tree->root->resource);
2587         }
2588
2589         resource_tree_destroy(&tree);
2590         return PJ_TRUE;
2591 }
2592
2593 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2594 {
2595         struct ast_sip_publish_handler *iter = NULL;
2596         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2597
2598         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2599                 if (strcmp(event, iter->event_name)) {
2600                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2601                         continue;
2602                 }
2603                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2604                 break;
2605         }
2606
2607         return iter;
2608 }
2609
2610 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2611         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2612 {
2613         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2614
2615         if (etag_hdr) {
2616                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2617
2618                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2619
2620                 if (sscanf(etag, "%30d", entity_id) != 1) {
2621                         return SIP_PUBLISH_UNKNOWN;
2622                 }
2623         }
2624
2625         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2626
2627         if (!(*expires)) {
2628                 return SIP_PUBLISH_REMOVE;
2629         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2630                 return SIP_PUBLISH_INITIAL;
2631         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2632                 return SIP_PUBLISH_REFRESH;
2633         } else if (etag_hdr && rdata->msg_info.msg->body) {
2634                 return SIP_PUBLISH_MODIFY;
2635         }
2636
2637         return SIP_PUBLISH_UNKNOWN;
2638 }
2639
2640 /*! \brief Internal destructor for publications */
2641 static void publication_destroy_fn(void *obj)
2642 {
2643         struct ast_sip_publication *publication = obj;
2644
2645         ast_debug(3, "Destroying SIP publication\n");
2646
2647         ao2_cleanup(publication->datastores);
2648         ao2_cleanup(publication->endpoint);
2649 }
2650
2651 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2652         const char *resource, const char *event_configuration_name)
2653 {
2654         struct ast_sip_publication *publication;
2655         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2656         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2657         char *dst;
2658
2659         ast_assert(endpoint != NULL);
2660
2661         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2662                 return NULL;
2663         }
2664
2665         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
2666                 ao2_ref(publication, -1);
2667                 return NULL;
2668         }
2669
2670         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2671         ao2_ref(endpoint, +1);
2672         publication->endpoint = endpoint;
2673         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2674         publication->sched_id = -1;
2675         dst = publication->data;
2676         publication->resource = strcpy(dst, resource);
2677         dst += resource_len;
2678         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2679
2680         return publication;
2681 }
2682
2683 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2684                 pjsip_rx_data *rdata)
2685 {
2686         pj_status_t status;
2687         pjsip_tx_data *tdata;
2688         pjsip_transaction *tsx;
2689
2690         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2691                 return -1;
2692         }
2693
2694         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2695                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
2696                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
2697
2698                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
2699                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
2700                         pjsip_tx_data_dec_ref(tdata);
2701                         return -1;
2702                 }
2703
2704                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
2705                 ast_sip_add_header(tdata, "Expires", expires);
2706         }
2707
2708         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
2709                 return -1;
2710         }
2711
2712         pjsip_tsx_recv_msg(tsx, rdata);
2713
2714         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2715                 return -1;
2716         }
2717
2718         return 0;
2719 }
2720
2721 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2722         struct ast_sip_publish_handler *handler)
2723 {
2724         struct ast_sip_publication *publication;
2725         char *resource_name;
2726         size_t resource_size;
2727         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2728         struct ast_variable *event_configuration_name = NULL;
2729         pjsip_uri *request_uri;
2730         pjsip_sip_uri *request_uri_sip;
2731         int resp;
2732
2733         request_uri = rdata->msg_info.msg->line.req.uri;
2734
2735         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2736                 char uri_str[PJSIP_MAX_URL_SIZE];
2737
2738                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2739                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2740                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2741                 return NULL;
2742         }
2743
2744         request_uri_sip = pjsip_uri_get_uri(request_uri);
2745         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2746         resource_name = alloca(resource_size);
2747         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2748
2749         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2750         if (!resource) {
2751                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2752                 return NULL;
2753         }
2754
2755         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2756                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2757                 return NULL;
2758         }
2759
2760         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2761                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2762                         break;
2763                 }
2764         }
2765
2766         if (!event_configuration_name) {
2767                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2768                 return NULL;
2769         }
2770
2771         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
2772
2773         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2774                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2775                 return NULL;
2776         }
2777
2778         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
2779
2780         if (!publication) {
2781                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
2782                 return NULL;
2783         }
2784
2785         publication->handler = handler;
2786         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
2787                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
2788                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2789                 ao2_cleanup(publication);
2790                 return NULL;
2791         }
2792
2793         sip_publication_respond(publication, resp, rdata);
2794
2795         return publication;
2796 }
2797
2798 static int publish_expire_callback(void *data)
2799 {
2800         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
2801
2802         if (publication->handler->publish_expire) {
2803                 publication->handler->publish_expire(publication);
2804         }
2805
2806         return 0;
2807 }
2808
2809 static int publish_expire(const void *data)
2810 {
2811         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
2812
2813         ao2_unlink(publication->handler->publications, publication);
2814         publication->sched_id = -1;
2815
2816         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
2817                 ao2_cleanup(publication);
2818         }
2819
2820         return 0;
2821 }
2822
2823 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
2824 {
2825         pjsip_event_hdr *event_header;
2826         struct ast_sip_publish_handler *handler;
2827         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2828         char event[32];
2829         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
2830         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
2831         enum sip_publish_type publish_type;
2832         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
2833         int expires = 0, entity_id, response = 0;
2834
2835         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2836         ast_assert(endpoint != NULL);
2837
2838         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
2839         if (!event_header) {
2840                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
2841                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2842                 return PJ_TRUE;
2843         }
2844         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
2845
2846         handler = find_pub_handler(event);
2847         if (!handler) {
2848                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
2849                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2850                 return PJ_TRUE;
2851         }
2852
2853         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
2854
2855         /* If this is not an initial publish ensure that a publication is present */
2856         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
2857                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
2858                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
2859
2860                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
2861                                 NULL, NULL);
2862                         return PJ_TRUE;
2863                 }
2864
2865                 /* Per the RFC every response has to have a new entity tag */
2866                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2867
2868                 /* Update the expires here so that the created responses will contain the correct value */
2869                 publication->expires = expires;
2870         }
2871
2872         switch (publish_type) {
2873                 case SIP_PUBLISH_INITIAL:
2874                         publication = publish_request_initial(endpoint, rdata, handler);
2875                         break;
2876                 case SIP_PUBLISH_REFRESH:
2877                 case SIP_PUBLISH_MODIFY:
2878                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
2879                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
2880                                 /* If an error occurs we want to terminate the publication */
2881                                 expires = 0;
2882                         }
2883                         response = 200;
2884                         break;
2885                 case SIP_PUBLISH_REMOVE:
2886                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
2887                                         AST_SIP_PUBLISH_STATE_TERMINATED);
2888                         response = 200;
2889                         break;
2890                 case SIP_PUBLISH_UNKNOWN:
2891                 default:
2892                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2893                         break;
2894         }
2895
2896         if (publication) {
2897                 if (expires) {
2898                         ao2_link(handler->publications, publication);
2899
2900                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
2901                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
2902                 } else {
2903                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
2904                 }
2905         }
2906
2907         if (response) {
2908                 sip_publication_respond(publication, response, rdata);
2909         }
2910
2911         return PJ_TRUE;
2912 }
2913
2914 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
2915 {
2916         return pub->endpoint;
2917 }
2918
2919 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
2920 {
2921         return pub->resource;
2922 }
2923
2924 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
2925 {
2926         return pub->event_configuration_name;
2927 }
2928
2929 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
2930 {
2931         struct ast_sip_pubsub_body_generator *existing;
2932         pj_str_t accept;
2933         pj_size_t accept_len;
2934
2935         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
2936         if (existing) {
2937                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
2938                                 "One is already registered.\n", generator->type, generator->subtype);
2939                 return -1;
2940         }
2941
2942         AST_RWLIST_WRLOCK(&body_generators);
2943         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
2944         AST_RWLIST_UNLOCK(&body_generators);
2945
2946         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
2947          * null-terminated, so there is no need to allocate for the extra null
2948          * byte
2949          */
2950         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
2951
2952         accept.ptr = alloca(accept_len);
2953         accept.slen = accept_len;
2954         /* Safe use of sprintf */
2955         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
2956         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
2957                         PJSIP_H_ACCEPT, NULL, 1, &accept);
2958
2959         return 0;
2960 }
2961
2962 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
2963 {
2964         struct ast_sip_pubsub_body_generator *iter;
2965         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2966
2967         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
2968                 if (iter == generator) {
2969                         AST_LIST_REMOVE_CURRENT(list);
2970                         break;
2971                 }
2972         }
2973         AST_RWLIST_TRAVERSE_SAFE_END;
2974 }
2975
2976 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
2977 {
2978         AST_RWLIST_WRLOCK(&body_supplements);
2979         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
2980         AST_RWLIST_UNLOCK(&body_supplements);
2981
2982         return 0;
2983 }
2984
2985 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
2986 {
2987         struct ast_sip_pubsub_body_supplement *iter;
2988         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2989
2990         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
2991                 if (iter == supplement) {
2992                         AST_LIST_REMOVE_CURRENT(list);
2993                         break;
2994                 }
2995         }
2996         AST_RWLIST_TRAVERSE_SAFE_END;
2997 }
2998
2999 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3000 {
3001         return sub->body_generator->type;
3002 }
3003
3004 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3005 {
3006         return sub->body_generator->subtype;
3007 }
3008
3009 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3010                 struct ast_sip_body_data *data, struct ast_str **str)
3011 {
3012         struct ast_sip_pubsub_body_supplement *supplement;
3013         struct ast_sip_pubsub_body_generator *generator;
3014         int res = 0;
3015         void *body;
3016
3017         generator = find_body_generator_type_subtype(type, subtype);
3018         if (!generator) {
3019                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3020                                 type, subtype);
3021                 return -1;
3022         }
3023
3024         if (strcmp(data->body_type, generator->body_type)) {
3025                 ast_log(LOG_WARNING, "Body generator does not accept the type of data provided\n");
3026                 return -1;
3027         }
3028
3029         body = generator->allocate_body(data->body_data);
3030         if (!body) {
3031                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
3032                                 type, subtype);
3033                 return -1;
3034         }
3035
3036         if (generator->generate_body_content(body, data->body_data)) {
3037                 res = -1;
3038                 goto end;
3039         }
3040
3041         AST_RWLIST_RDLOCK(&body_supplements);
3042         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3043                 if (!strcmp(generator->type, supplement->type) &&
3044                                 !strcmp(generator->subtype, supplement->subtype)) {
3045                         res = supplement->supplement_body(body, data->body_data);
3046                         if (res) {
3047                                 break;
3048                         }
3049                 }
3050         }
3051         AST_RWLIST_UNLOCK(&body_supplements);
3052
3053         if (!res) {
3054                 generator->to_string(body, str);
3055         }
3056
3057 end:
3058         if (generator->destroy_body) {
3059                 generator->destroy_body(body);
3060         }
3061
3062         return res;
3063 }
3064
3065 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3066 {
3067         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3068                 return pubsub_on_rx_subscribe_request(rdata);
3069         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3070                 return pubsub_on_rx_publish_request(rdata);
3071         }
3072
3073         return PJ_FALSE;
3074 }
3075
3076 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3077 {
3078         struct sip_subscription_tree *sub_tree;
3079
3080         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3081                 return;
3082         }
3083
3084         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3085         if (!sub_tree) {
3086                 return;
3087         }
3088
3089         ao2_cleanup(sub_tree);
3090
3091         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3092 }
3093
3094 static void set_state_terminated(struct ast_sip_subscription *sub)
3095 {
3096         int i;
3097
3098         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3099         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3100                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3101         }
3102 }
3103
3104 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
3105                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3106 {
3107         struct sip_subscription_tree *sub_tree;
3108
3109         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3110         if (!sub_tree) {
3111                 return;
3112         }
3113
3114         /* If sending a NOTIFY to terminate a subscription, then pubsub_on_evsub_state()
3115          * will be called when we send the NOTIFY, and that will result in dropping the
3116          * refcount of sub_tree by one, and possibly destroying the sub_tree. We need to
3117          * hold a reference to the sub_tree until this function returns so that we don't
3118          * try to read from or write to freed memory by accident
3119          */
3120         ao2_ref(sub_tree, +1);
3121
3122         if (pjsip_evsub_get_state(evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
3123                 set_state_terminated(sub_tree->root);
3124         }
3125
3126         if (send_notify(sub_tree, 1)) {
3127                 *p_st_code = 500;
3128         }
3129
3130         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3131                         "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3132                         "Resource: %s", sub_tree->root->resource);
3133
3134         if (sub_tree->is_list) {
3135                 pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
3136         }
3137
3138         ao2_ref(sub_tree, -1);
3139 }
3140
3141 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
3142                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3143 {
3144         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3145
3146         if (!sub) {
3147                 return;
3148         }
3149
3150         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
3151                         pjsip_evsub_get_state(evsub));
3152 }
3153
3154 static int serialized_pubsub_on_client_refresh(void *userdata)
3155 {
3156         struct sip_subscription_tree *sub_tree = userdata;
3157         pjsip_tx_data *tdata;
3158
3159         if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
3160                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
3161         } else {
3162                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
3163                 return 0;
3164         }
3165         ao2_cleanup(sub_tree);
3166         return 0;
3167 }
3168
3169 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
3170 {
3171         struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3172
3173         ao2_ref(sub_tree, +1);
3174         ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, sub_tree);
3175 }
3176
3177 static int serialized_pubsub_on_server_timeout(void *userdata)
3178 {
3179         struct sip_subscription_tree *sub_tree = userdata;
3180
3181         set_state_terminated(sub_tree->root);
3182         send_notify(sub_tree, 1);
3183         ast_test_suite_event_notify("SUBSCRIPTION_TERMINATED",
3184                         "Resource: %s",
3185                         sub_tree->root->resource);
3186
3187         ao2_cleanup(sub_tree);
3188         return 0;
3189 }
3190
3191 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
3192 {
3193         struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3194
3195         if (!sub_tree) {
3196                 /* if a subscription has been terminated and the subscription
3197                    timeout/expires is less than the time it takes for all pending
3198                    transactions to end then the subscription timer will not have
3199                    been canceled yet and sub will be null, so do nothing since
3200                    the subscription has already been terminated. */
3201                 return;
3202         }
3203
3204         ao2_ref(sub_tree, +1);
3205         ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_server_timeout, sub_tree);
3206 }
3207
3208 static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
3209                                    struct ast_sip_ami *ami,
3210                                    const char *event)
3211 {
3212         RAII_VAR(struct ast_str *, buf,
3213                  ast_sip_create_ami_event(event, ami), ast_free);
3214
3215         if (!buf) {
3216                 return -1;
3217         }
3218
3219         sip_subscription_to_ami(sub_tree, &buf);
3220         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3221         return 0;
3222 }
3223
3224 static int ami_subscription_detail_inbound(struct sip_subscription_tree *sub_tree, void *arg)
3225 {
3226         return sub_tree->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
3227                 sub_tree, arg, "InboundSubscriptionDetail") : 0;
3228 }
3229
3230 static int ami_subscription_detail_outbound(struct sip_subscription_tree *sub_tree, void *arg)
3231 {
3232         return sub_tree->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
3233                 sub_tree, arg, "OutboundSubscriptionDetail") : 0;
3234 }
3235
3236 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
3237 {
3238         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3239         int num;
3240
3241         astman_send_listack(s, m, "Following are Events for "
3242                             "each inbound Subscription", "start");
3243
3244         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
3245
3246         astman_append(s, "Event: InboundSubscriptionDetailComplete\r\n");
3247         if (!ast_strlen_zero(ami.action_id)) {
3248                 astman_append(s, "ActionID: %s\r\n", ami.action_id);
3249         }
3250         astman_append(s, "EventList: Complete\r\n"
3251                       "ListItems: %d\r\n\r\n", num);
3252         return 0;
3253 }
3254
3255 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
3256 {
3257         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3258         int num;
3259
3260         astman_send_listack(s, m, "Following are Events for "
3261                             &qu