res_pjsip_pubsub: Prevent sending NOTIFY on destroyed dialog.
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="dialog"><para>
152                                                                 This is identical to <replaceable>presence</replaceable>.
153                                                         </para></enum>
154                                                         <enum name="message-summary"><para>
155                                                                 Message-waiting indication (MWI) reporting.
156                                                         </para></enum>
157                                                 </enumlist>
158                                         </description>
159                                 </configOption>
160                                 <configOption name="list_item">
161                                         <synopsis>The name of a resource to report state on</synopsis>
162                                         <description>
163                                                 <para>In general Asterisk looks up list items in the following way:</para>
164                                                 <para>1. Check if the list item refers to another configured resource list.</para>
165                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
166                                                    to find the specified resource.</para>
167                                                 <para>The second part means that the way the list item is specified depends
168                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
169                                                 set to <literal>presence</literal>, then list items should be in the form of
170                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
171                                                 names should be listed.</para>
172                                         </description>
173                                 </configOption>
174                                 <configOption name="full_state" default="no">
175                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
176                                         <description>
177                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
178                                                 a notification that contains the state of all resources in the list. If the option is
179                                                 disabled, Asterisk will construct a notification that only contains the states of
180                                                 resources that have changed.</para>
181                                                 <note>
182                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
183                                                         to send a notification with the states of all resources in the list. When a subscriber
184                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
185                                                         notification.</para>
186                                                 </note>
187                                         </description>
188                                 </configOption>
189                                 <configOption name="notification_batch_interval" default="0">
190                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
191                                         <description>
192                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
193                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
194                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
195                                                 many notifications.</para>
196                                         </description>
197                                 </configOption>
198                         </configObject>
199                         <configObject name="inbound-publication">
200                                 <synopsis>The configuration for inbound publications</synopsis>
201                                 <configOption name="endpoint" default="">
202                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
203                                 </configOption>
204                                 <configOption name="type">
205                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
206                                 </configOption>
207                         </configObject>
208                 </configFile>
209         </configInfo>
210  ***/
211
212 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
213
214 static struct pjsip_module pubsub_module = {
215         .name = { "PubSub Module", 13 },
216         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
217         .on_rx_request = pubsub_on_rx_request,
218 };
219
220 #define MOD_DATA_PERSISTENCE "sub_persistence"
221 #define MOD_DATA_MSG "sub_msg"
222
223 static const pj_str_t str_event_name = { "Event", 5 };
224
225 /*! \brief Scheduler used for automatically expiring publications */
226 static struct ast_sched_context *sched;
227
228 /*! \brief Number of buckets for publications (on a per handler) */
229 #define PUBLICATIONS_BUCKETS 37
230
231 /*! \brief Default expiration time for PUBLISH if one is not specified */
232 #define DEFAULT_PUBLISH_EXPIRES 3600
233
234 /*! \brief Number of buckets for subscription datastore */
235 #define DATASTORE_BUCKETS 53
236
237 /*! \brief Default expiration for subscriptions */
238 #define DEFAULT_EXPIRES 3600
239
240 /*! \brief Defined method for PUBLISH */
241 const pjsip_method pjsip_publish_method =
242 {
243         PJSIP_OTHER_METHOD,
244         { "PUBLISH", 7 }
245 };
246
247 /*!
248  * \brief The types of PUBLISH messages defined in RFC 3903
249  */
250 enum sip_publish_type {
251         /*!
252          * \brief Unknown
253          *
254          * \details
255          * This actually is not defined in RFC 3903. We use this as a constant
256          * to indicate that an incoming PUBLISH does not fit into any of the
257          * other categories and is thus invalid.
258          */
259         SIP_PUBLISH_UNKNOWN,
260
261         /*!
262          * \brief Initial
263          *
264          * \details
265          * The first PUBLISH sent. This will contain a non-zero Expires header
266          * as well as a body that indicates the current state of the endpoint
267          * that has sent the message. The initial PUBLISH is the only type
268          * of PUBLISH to not contain a Sip-If-Match header in it.
269          */
270         SIP_PUBLISH_INITIAL,
271
272         /*!
273          * \brief Refresh
274          *
275          * \details
276          * Used to keep a published state from expiring. This will contain a
277          * non-zero Expires header but no body since its purpose is not to
278          * update state.
279          */
280         SIP_PUBLISH_REFRESH,
281
282         /*!
283          * \brief Modify
284          *
285          * \details
286          * Used to change state from its previous value. This will contain
287          * a body updating the published state. May or may not contain an
288          * Expires header.
289          */
290         SIP_PUBLISH_MODIFY,
291
292         /*!
293          * \brief Remove
294          *
295          * \details
296          * Used to remove published state from an ESC. This will contain
297          * an Expires header set to 0 and likely no body.
298          */
299         SIP_PUBLISH_REMOVE,
300 };
301
302 /*!
303  * \brief A vector of strings commonly used throughout this module
304  */
305 AST_VECTOR(resources, const char *);
306
307 /*!
308  * \brief Resource list configuration item
309  */
310 struct resource_list {
311         SORCERY_OBJECT(details);
312         /*! SIP event package the list uses. */
313         char event[32];
314         /*! Strings representing resources in the list. */
315         struct resources items;
316         /*! Indicates if Asterisk sends full or partial state on notifications. */
317         unsigned int full_state;
318         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
319         unsigned int notification_batch_interval;
320 };
321
322 /*!
323  * Used to create new entity IDs by ESCs.
324  */
325 static int esc_etag_counter;
326
327 /*!
328  * \brief Structure representing a SIP publication
329  */
330 struct ast_sip_publication {
331         /*! Publication datastores set up by handlers */
332         struct ao2_container *datastores;
333         /*! \brief Entity tag for the publication */
334         int entity_tag;
335         /*! \brief Handler for this publication */
336         struct ast_sip_publish_handler *handler;
337         /*! \brief The endpoint with which the subscription is communicating */
338         struct ast_sip_endpoint *endpoint;
339         /*! \brief Expiration time of the publication */
340         int expires;
341         /*! \brief Scheduled item for expiration of publication */
342         int sched_id;
343         /*! \brief The resource the publication is to */
344         char *resource;
345         /*! \brief The name of the event type configuration */
346         char *event_configuration_name;
347         /*! \brief Data containing the above */
348         char data[0];
349 };
350
351
352 /*!
353  * \brief Structure used for persisting an inbound subscription
354  */
355 struct subscription_persistence {
356         /*! Sorcery object details */
357         SORCERY_OBJECT(details);
358         /*! The name of the endpoint involved in the subscrption */
359         char *endpoint;
360         /*! SIP message that creates the subscription */
361         char packet[PJSIP_MAX_PKT_LEN];
362         /*! Source address of the message */
363         char src_name[PJ_INET6_ADDRSTRLEN];
364         /*! Source port of the message */
365         int src_port;
366         /*! Local transport key type */
367         char transport_key[32];
368         /*! Local transport address */
369         char local_name[PJ_INET6_ADDRSTRLEN];
370         /*! Local transport port */
371         int local_port;
372         /*! Next CSeq to use for message */
373         unsigned int cseq;
374         /*! Local tag of the dialog */
375         char *tag;
376         /*! When this subscription expires */
377         struct timeval expires;
378 };
379
380 /*!
381  * \brief A tree of SIP subscriptions
382  *
383  * Because of the ability to subscribe to resource lists, a SIP
384  * subscription can result in a tree of subscriptions being created.
385  * This structure represents the information relevant to the subscription
386  * as a whole, to include the underlying PJSIP structure for the
387  * subscription.
388  */
389 struct sip_subscription_tree {
390         /*! The endpoint with which the subscription is communicating */
391         struct ast_sip_endpoint *endpoint;
392         /*! Serializer on which to place operations for this subscription */
393         struct ast_taskprocessor *serializer;
394         /*! The role for this subscription */
395         enum ast_sip_subscription_role role;
396         /*! Persistence information */
397         struct subscription_persistence *persistence;
398         /*! The underlying PJSIP event subscription structure */
399         pjsip_evsub *evsub;
400         /*! The underlying PJSIP dialog */
401         pjsip_dialog *dlg;
402         /*! Interval to use for batching notifications */
403         unsigned int notification_batch_interval;
404         /*! Scheduler ID for batched notification */
405         int notify_sched_id;
406         /*! Indicator if scheduled batched notification should be sent */
407         unsigned int send_scheduled_notify;
408         /*! The root of the subscription tree */
409         struct ast_sip_subscription *root;
410         /*! Is this subscription to a list? */
411         int is_list;
412         /*! Next item in the list */
413         AST_LIST_ENTRY(sip_subscription_tree) next;
414         /*! Indicates that a NOTIFY is currently being sent on the SIP subscription */
415         int last_notify;
416 };
417
418 /*!
419  * \brief Structure representing a "virtual" SIP subscription.
420  *
421  * This structure serves a dual purpose. Structurally, it is
422  * the constructed tree of subscriptions based on the resources
423  * being subscribed to. API-wise, this serves as the handle that
424  * subscription handlers use in order to interact with the pubsub API.
425  */
426 struct ast_sip_subscription {
427         /*! Subscription datastores set up by handlers */
428         struct ao2_container *datastores;
429         /*! The handler for this subscription */
430         const struct ast_sip_subscription_handler *handler;
431         /*! Pointer to the base of the tree */
432         struct sip_subscription_tree *tree;
433         /*! Body generaator for NOTIFYs */
434         struct ast_sip_pubsub_body_generator *body_generator;
435         /*! Vector of child subscriptions */
436         AST_VECTOR(, struct ast_sip_subscription *) children;
437         /*! Saved NOTIFY body text for this subscription */
438         struct ast_str *body_text;
439         /*! Indicator that the body text has changed since the last notification */
440         int body_changed;
441         /*! The current state of the subscription */
442         pjsip_evsub_state subscription_state;
443         /*! For lists, the current version to place in the RLMI body */
444         unsigned int version;
445         /*! For lists, indicates if full state should always be communicated. */
446         unsigned int full_state;
447         /*! URI associated with the subscription */
448         pjsip_sip_uri *uri;
449         /*! Name of resource being subscribed to */
450         char resource[0];
451 };
452
453 /*!
454  * \brief Structure representing a publication resource
455  */
456 struct ast_sip_publication_resource {
457         /*! \brief Sorcery object details */
458         SORCERY_OBJECT(details);
459         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
460         char *endpoint;
461         /*! \brief Mapping for event types to configuration */
462         struct ast_variable *events;
463 };
464
465 static const char *sip_subscription_roles_map[] = {
466         [AST_SIP_SUBSCRIBER] = "Subscriber",
467         [AST_SIP_NOTIFIER] = "Notifier"
468 };
469
470 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
471
472 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
473 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
474
475 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
476 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
477                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
478 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
479                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
480 static void pubsub_on_client_refresh(pjsip_evsub *sub);
481 static void pubsub_on_server_timeout(pjsip_evsub *sub);
482  
483 static pjsip_evsub_user pubsub_cb = {
484         .on_evsub_state = pubsub_on_evsub_state,
485         .on_rx_refresh = pubsub_on_rx_refresh,
486         .on_rx_notify = pubsub_on_rx_notify,
487         .on_client_refresh = pubsub_on_client_refresh,
488         .on_server_timeout = pubsub_on_server_timeout,
489 };
490
491 /*! \brief Destructor for publication resource */
492 static void publication_resource_destroy(void *obj)
493 {
494         struct ast_sip_publication_resource *resource = obj;
495
496         ast_free(resource->endpoint);
497         ast_variables_destroy(resource->events);
498 }
499
500 /*! \brief Allocator for publication resource */
501 static void *publication_resource_alloc(const char *name)
502 {
503         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
504 }
505
506 /*! \brief Destructor for subscription persistence */
507 static void subscription_persistence_destroy(void *obj)
508 {
509         struct subscription_persistence *persistence = obj;
510
511         ast_free(persistence->endpoint);
512         ast_free(persistence->tag);
513 }
514
515 /*! \brief Allocator for subscription persistence */
516 static void *subscription_persistence_alloc(const char *name)
517 {
518         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
519 }
520
521 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
522 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
523 {
524         char tag[PJ_GUID_STRING_LENGTH + 1];
525
526         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
527          * look it up by id at all.
528          */
529         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
530                 "subscription_persistence", NULL);
531
532         pjsip_dialog *dlg = sub_tree->dlg;
533
534         if (!persistence) {
535                 return NULL;
536         }
537
538         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
539         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
540         persistence->tag = ast_strdup(tag);
541
542         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
543         return persistence;
544 }
545
546 /*! \brief Function which updates persistence information of a subscription in sorcery */
547 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
548         pjsip_rx_data *rdata)
549 {
550         pjsip_dialog *dlg;
551
552         if (!sub_tree->persistence) {
553                 return;
554         }
555
556         dlg = sub_tree->dlg;
557         sub_tree->persistence->cseq = dlg->local.cseq;
558
559         if (rdata) {
560                 int expires;
561                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
562
563                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
564                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
565
566                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
567                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
568                  * will always point to the proper SIP message that is to be processed. When updating subscription
569                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
570                  * only ever have a single SIP message on it, and so we base persistence on that.
571                  */
572                 if (rdata->msg_info.msg_buf) {
573                         ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
574                                         MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
575                 } else {
576                         ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
577                                         sizeof(sub_tree->persistence->packet));
578                 }
579                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
580                                 sizeof(sub_tree->persistence->src_name));
581                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
582                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
583                         sizeof(sub_tree->persistence->transport_key));
584                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
585                         sizeof(sub_tree->persistence->local_name));
586                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
587         }
588
589         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
590 }
591
592 /*! \brief Function which removes persistence of a subscription from sorcery */
593 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
594 {
595         if (!sub_tree->persistence) {
596                 return;
597         }
598
599         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
600         ao2_ref(sub_tree->persistence, -1);
601         sub_tree->persistence = NULL;
602 }
603
604
605 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
606 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
607                 size_t num_accept, const char *body_type);
608
609 /*! \brief Retrieve a handler using the Event header of an rdata message */
610 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
611 {
612         pjsip_event_hdr *event_header;
613         char event[32];
614         struct ast_sip_subscription_handler *handler;
615
616         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
617         if (!event_header) {
618                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
619                 return NULL;
620         }
621         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
622
623         handler = find_sub_handler_for_event_name(event);
624         if (!handler) {
625                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
626         }
627
628         return handler;
629 }
630
631 /*!
632  * \brief Accept headers that are exceptions to the rule
633  *
634  * Typically, when a SUBSCRIBE arrives, we attempt to find a
635  * body generator that matches one of the Accept headers in
636  * the request. When subscribing to a single resource, this works
637  * great. However, when subscribing to a list, things work
638  * differently. Most Accept header values are fine, but there
639  * are a couple that are endemic to resource lists that need
640  * to be ignored when searching for a body generator to use
641  * for the individual resources of the subscription.
642  */
643 const char *accept_exceptions[] =  {
644         "multipart/related",
645         "application/rlmi+xml",
646 };
647
648 /*!
649  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
650  *
651  * \retval 1 This Accept header value is an exception to the rule.
652  * \retval 0 This Accept header is not an exception to the rule.
653  */
654 static int exceptional_accept(const pj_str_t *accept)
655 {
656         int i;
657
658         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
659                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
660                         return 1;
661                 }
662         }
663
664         return 0;
665 }
666
667 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
668 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
669         const struct ast_sip_subscription_handler *handler)
670 {
671         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
672         char accept[AST_SIP_MAX_ACCEPT][64];
673         size_t num_accept_headers = 0;
674
675         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
676                 int i;
677
678                 for (i = 0; i < accept_header->count; ++i) {
679                         if (!exceptional_accept(&accept_header->values[i])) {
680                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
681                                 ++num_accept_headers;
682                         }
683                 }
684         }
685
686         if (num_accept_headers == 0) {
687                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
688                  * the default accept type for the event package is to be used.
689                  */
690                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
691                 num_accept_headers = 1;
692         }
693
694         return find_body_generator(accept, num_accept_headers, handler->body_type);
695 }
696
697 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
698  *
699  *  \retval 1 rdata has an eventlist containing supported header
700  *  \retval 0 rdata doesn't have an eventlist containing supported header
701  */
702 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
703 {
704         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
705
706         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
707                 int i;
708
709                 for (i = 0; i < supported_header->count; i++) {
710                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
711                                 return 1;
712                         }
713                 }
714         }
715
716         return 0;
717 }
718
719 struct resource_tree;
720
721 /*!
722  * \brief A node for a resource tree.
723  */
724 struct tree_node {
725         AST_VECTOR(, struct tree_node *) children;
726         unsigned int full_state;
727         char resource[0];
728 };
729
730 /*!
731  * \brief Helper function for retrieving a resource list for a given event.
732  *
733  * This will retrieve a resource list that corresponds to the resource and event provided.
734  *
735  * \param resource The name of the resource list to retrieve
736  * \param event The expected event name on the resource list
737  */
738 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
739 {
740         struct resource_list *list;
741
742         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
743         if (!list) {
744                 return NULL;
745         }
746
747         if (strcmp(list->event, event)) {
748                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
749                                 resource, list->event, event);
750                 ao2_cleanup(list);
751                 return NULL;
752         }
753
754         return list;
755 }
756
757 /*!
758  * \brief Allocate a tree node
759  *
760  * In addition to allocating and initializing the tree node, the node is also added
761  * to the vector of visited resources. See \ref build_resource_tree for more information
762  * on the visited resources.
763  *
764  * \param resource The name of the resource for this tree node.
765  * \param visited The vector of resources that have been visited.
766  * \param if allocating a list, indicate whether full state is requested in notifications.
767  * \retval NULL Allocation failure.
768  * \retval non-NULL The newly-allocated tree_node
769  */
770 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
771 {
772         struct tree_node *node;
773
774         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
775         if (!node) {
776                 return NULL;
777         }
778
779         strcpy(node->resource, resource);
780         if (AST_VECTOR_INIT(&node->children, 4)) {
781                 ast_free(node);
782                 return NULL;
783         }
784         node->full_state = full_state;
785
786         if (visited) {
787                 AST_VECTOR_APPEND(visited, resource);
788         }
789         return node;
790 }
791
792 /*!
793  * \brief Destructor for a tree node
794  *
795  * This function calls recursively in order to destroy
796  * all nodes lower in the tree from the given node in
797  * addition to the node itself.
798  *
799  * \param node The node to destroy.
800  */
801 static void tree_node_destroy(struct tree_node *node)
802 {
803         int i;
804         if (!node) {
805                 return;
806         }
807
808         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
809                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
810         }
811         AST_VECTOR_FREE(&node->children);
812         ast_free(node);
813 }
814
815 /*!
816  * \brief Determine if this resource has been visited already
817  *
818  * See \ref build_resource_tree for more information
819  *
820  * \param resource The resource currently being visited
821  * \param visited The resources that have previously been visited
822  */
823 static int have_visited(const char *resource, struct resources *visited)
824 {
825         int i;
826
827         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
828                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
829                         return 1;
830                 }
831         }
832
833         return 0;
834 }
835
836 /*!
837  * \brief Build child nodes for a given parent.
838  *
839  * This iterates through the items on a resource list and creates tree nodes for each one. The
840  * tree nodes created are children of the supplied parent node. If an item in the resource
841  * list is itself a list, then this function is called recursively to provide children for
842  * the the new node.
843  *
844  * If an item in a resource list is not a list, then the supplied subscription handler is
845  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
846  * is used to determine if the node can be added to the tree or not.
847  *
848  * If a parent node ends up having no child nodes added under it, then the parent node is
849  * pruned from the tree.
850  *
851  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
852  * \param handler The subscription handler for leaf nodes in the tree.
853  * \param list The configured resource list from which the child node is being built.
854  * \param parent The parent node for these children.
855  * \param visited The resources that have already been visited.
856  */
857 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
858                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
859 {
860         int i;
861
862         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
863                 struct tree_node *current;
864                 struct resource_list *child_list;
865                 const char *resource = AST_VECTOR_GET(&list->items, i);
866
867                 if (have_visited(resource, visited)) {
868                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
869                         continue;
870                 }
871
872                 child_list = retrieve_resource_list(resource, list->event);
873                 if (!child_list) {
874                         int resp = handler->notifier->new_subscribe(endpoint, resource);
875                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
876                                 current = tree_node_alloc(resource, visited, 0);
877                                 if (!current) {
878                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
879                                                         "allocation error afterwards\n", resource);
880                                         continue;
881                                 }
882                                 ast_debug(1, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
883                                                 resource, parent->resource);
884                                 AST_VECTOR_APPEND(&parent->children, current);
885                         } else {
886                                 ast_debug(1, "Subscription to leaf resource %s resulted in error response %d\n",
887                                                 resource, resp);
888                         }
889                 } else {
890                         ast_debug(1, "Resource %s (child of %s) is a list\n", resource, parent->resource);
891                         current = tree_node_alloc(resource, visited, child_list->full_state);
892                         if (!current) {
893                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
894                                 continue;
895                         }
896                         build_node_children(endpoint, handler, child_list, current, visited);
897                         if (AST_VECTOR_SIZE(&current->children) > 0) {
898                                 ast_debug(1, "List %s had no successful children.\n", resource);
899                                 AST_VECTOR_APPEND(&parent->children, current);
900                         } else {
901                                 ast_debug(1, "List %s had successful children. Adding to parent %s\n",
902                                                 resource, parent->resource);
903                                 tree_node_destroy(current);
904                         }
905                         ao2_cleanup(child_list);
906                 }
907         }
908 }
909
910 /*!
911  * \brief A resource tree
912  *
913  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
914  * be a resource list. If this is the case, the resource list may contain resources
915  * that are themselves lists. The structure needed to hold the resources is
916  * a tree.
917  *
918  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
919  * to the individual resources in the tree would be successful or not. Any successful
920  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
921  * result in no node being created.
922  *
923  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
924  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
925  */
926 struct resource_tree {
927         struct tree_node *root;
928         unsigned int notification_batch_interval;
929 };
930
931 /*!
932  * \brief Destroy a resource tree.
933  *
934  * This function makes no assumptions about how the tree itself was
935  * allocated and does not attempt to free the tree itself. Callers
936  * of this function are responsible for freeing the tree.
937  *
938  * \param tree The tree to destroy.
939  */
940 static void resource_tree_destroy(struct resource_tree *tree)
941 {
942         if (tree) {
943                 tree_node_destroy(tree->root);
944         }
945 }
946
947 /*!
948  * \brief Build a resource tree
949  *
950  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
951  *
952  * This function also creates a container that has all resources that have been visited during
953  * creation of the tree, whether those resources resulted in a tree node being created or not.
954  * Keeping this container of visited resources allows for misconfigurations such as loops in
955  * the tree or duplicated resources to be detected.
956  *
957  * \param endpoint The endpoint that sent the SUBSCRIBE request.
958  * \param handler The subscription handler for leaf nodes in the tree.
959  * \param resource The resource requested in the SUBSCRIBE request.
960  * \param tree The tree that is to be built.
961  * \param has_eventlist_support
962  *
963  * \retval 200-299 Successfully subscribed to at least one resource.
964  * \retval 300-699 Failure to subscribe to requested resource.
965  */
966 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
967                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
968 {
969         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
970         struct resources visited;
971
972         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
973                 ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
974                 tree->root = tree_node_alloc(resource, NULL, 0);
975                 if (!tree->root) {
976                         return 500;
977                 }
978                 return handler->notifier->new_subscribe(endpoint, resource);
979         }
980
981         ast_debug(1, "Subscription to resource %s is a list\n", resource);
982         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
983                 return 500;
984         }
985
986         tree->root = tree_node_alloc(resource, &visited, list->full_state);
987         if (!tree->root) {
988                 AST_VECTOR_FREE(&visited);
989                 return 500;
990         }
991
992         tree->notification_batch_interval = list->notification_batch_interval;
993
994         build_node_children(endpoint, handler, list, tree->root, &visited);
995         AST_VECTOR_FREE(&visited);
996
997         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
998                 return 200;
999         } else {
1000                 return 500;
1001         }
1002 }
1003
1004 static int datastore_hash(const void *obj, int flags)
1005 {
1006         const struct ast_datastore *datastore = obj;
1007         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
1008
1009         ast_assert(uid != NULL);
1010
1011         return ast_str_hash(uid);
1012 }
1013
1014 static int datastore_cmp(void *obj, void *arg, int flags)
1015 {
1016         const struct ast_datastore *datastore1 = obj;
1017         const struct ast_datastore *datastore2 = arg;
1018         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
1019
1020         ast_assert(datastore1->uid != NULL);
1021         ast_assert(uid2 != NULL);
1022
1023         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
1024 }
1025
1026 static void add_subscription(struct sip_subscription_tree *obj)
1027 {
1028         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1029         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1030 }
1031
1032 static void remove_subscription(struct sip_subscription_tree *obj)
1033 {
1034         struct sip_subscription_tree *i;
1035         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1036         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1037                 if (i == obj) {
1038                         AST_RWLIST_REMOVE_CURRENT(next);
1039                         if (i->root) {
1040                                 ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
1041                                                 ast_sip_subscription_get_resource_name(i->root));
1042                         }
1043                         break;
1044                 }
1045         }
1046         AST_RWLIST_TRAVERSE_SAFE_END;
1047 }
1048
1049 static void destroy_subscription(struct ast_sip_subscription *sub)
1050 {
1051         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1052         ast_free(sub->body_text);
1053
1054         AST_VECTOR_FREE(&sub->children);
1055         ao2_cleanup(sub->datastores);
1056         ast_free(sub);
1057 }
1058
1059 static void destroy_subscriptions(struct ast_sip_subscription *root)
1060 {
1061         int i;
1062
1063         if (!root) {
1064                 return;
1065         }
1066
1067         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1068                 struct ast_sip_subscription *child;
1069
1070                 child = AST_VECTOR_GET(&root->children, i);
1071                 destroy_subscriptions(child);
1072         }
1073
1074         destroy_subscription(root);
1075 }
1076
1077 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1078                 const char *resource, struct sip_subscription_tree *tree)
1079 {
1080         struct ast_sip_subscription *sub;
1081         pjsip_sip_uri *contact_uri;
1082
1083         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1084         if (!sub) {
1085                 return NULL;
1086         }
1087         strcpy(sub->resource, resource); /* Safe */
1088
1089         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1090         if (!sub->datastores) {
1091                 destroy_subscription(sub);
1092                 return NULL;
1093         }
1094
1095         sub->body_text = ast_str_create(128);
1096         if (!sub->body_text) {
1097                 destroy_subscription(sub);
1098                 return NULL;
1099         }
1100
1101         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1102         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1103         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1104         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1105
1106         sub->handler = handler;
1107         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1108         sub->tree = ao2_bump(tree);
1109
1110         return sub;
1111 }
1112
1113 /*!
1114  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1115  *
1116  * \param handler The handler to supply to leaf subscriptions.
1117  * \param resource The requested resource for this subscription.
1118  * \param generator Body generator to use for leaf subscriptions.
1119  * \param tree The root of the subscription tree.
1120  * \param current The tree node that corresponds to the subscription being created.
1121  */
1122 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1123                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1124                 struct sip_subscription_tree *tree, struct tree_node *current)
1125 {
1126         int i;
1127         struct ast_sip_subscription *sub;
1128
1129         sub = allocate_subscription(handler, resource, tree);
1130         if (!sub) {
1131                 return NULL;
1132         }
1133
1134         sub->full_state = current->full_state;
1135         sub->body_generator = generator;
1136         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1137
1138         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1139                 struct ast_sip_subscription *child;
1140                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1141
1142                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1143                                 tree, child_node);
1144
1145                 if (!child) {
1146                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1147                                         child_node->resource);
1148                         continue;
1149                 }
1150
1151                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1152                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1153                                         child_node->resource);
1154                 }
1155         }
1156
1157         return sub;
1158 }
1159
1160 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1161 {
1162         int i;
1163
1164         if (!sub) {
1165                 return;
1166         }
1167
1168         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1169                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1170                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1171                 }
1172                 return;
1173         }
1174
1175         /* We notify subscription shutdown only on the tree leaves. */
1176         if (sub->handler->subscription_shutdown) {
1177                 sub->handler->subscription_shutdown(sub);
1178         }
1179 }
1180 static int subscription_unreference_dialog(void *obj)
1181 {
1182         struct sip_subscription_tree *sub_tree = obj;
1183
1184         /* This is why we keep the dialog on the subscription. When the subscription
1185          * is destroyed, there is no guarantee that the underlying dialog is ready
1186          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1187          * either. The dialog could be destroyed before our subscription is. We fix
1188          * this problem by keeping a reference to the dialog until it is time to
1189          * destroy the subscription. We need to have the dialog available when the
1190          * subscription is destroyed so that we can guarantee that our attempt to
1191          * remove the serializer will be successful.
1192          */
1193         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1194         sub_tree->dlg = NULL;
1195
1196         return 0;
1197 }
1198
1199 static void subscription_tree_destructor(void *obj)
1200 {
1201         struct sip_subscription_tree *sub_tree = obj;
1202
1203         ast_debug(3, "Destroying subscription tree %p\n", sub_tree);
1204
1205         remove_subscription(sub_tree);
1206
1207         ao2_cleanup(sub_tree->endpoint);
1208
1209         destroy_subscriptions(sub_tree->root);
1210
1211         ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1212         ast_taskprocessor_unreference(sub_tree->serializer);
1213         ast_module_unref(ast_module_info->self);
1214 }
1215
1216 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1217 {
1218         ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree);
1219         ao2_cleanup(sub->tree);
1220 }
1221
1222 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1223 {
1224         sub_tree->dlg = dlg;
1225         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1226         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1227         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1228         pjsip_dlg_inc_session(dlg, &pubsub_module);
1229 }
1230
1231 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
1232 {
1233         struct sip_subscription_tree *sub_tree;
1234
1235         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1236         if (!sub_tree) {
1237                 return NULL;
1238         }
1239
1240         ast_module_ref(ast_module_info->self);
1241
1242         sub_tree->serializer = ast_sip_create_serializer();
1243         if (!sub_tree->serializer) {
1244                 ao2_ref(sub_tree, -1);
1245                 return NULL;
1246         }
1247
1248         sub_tree->endpoint = ao2_bump(endpoint);
1249         sub_tree->notify_sched_id = -1;
1250
1251         add_subscription(sub_tree);
1252         return sub_tree;
1253 }
1254
1255 /*!
1256  * \brief Create a subscription tree based on a resource tree.
1257  *
1258  * Using the previously-determined valid resources in the provided resource tree,
1259  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1260  * subscription tree is a real subscription, and the rest in the tree are
1261  * virtual subscriptions.
1262  *
1263  * \param handler The handler to use for leaf subscriptions
1264  * \param endpoint The endpoint that sent the SUBSCRIBE request
1265  * \param rdata The SUBSCRIBE content
1266  * \param resource The requested resource in the SUBSCRIBE request
1267  * \param generator The body generator to use in leaf subscriptions
1268  * \param tree The resource tree on which the subscription tree is based
1269  * \param dlg_status[out] The result of attempting to create a dialog.
1270  *
1271  * \retval NULL Could not create the subscription tree
1272  * \retval non-NULL The root of the created subscription tree
1273  */
1274
1275 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1276                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1277                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1278                 pj_status_t *dlg_status)
1279 {
1280         struct sip_subscription_tree *sub_tree;
1281         pjsip_dialog *dlg;
1282         struct subscription_persistence *persistence;
1283
1284         sub_tree = allocate_subscription_tree(endpoint);
1285         if (!sub_tree) {
1286                 *dlg_status = PJ_ENOMEM;
1287                 return NULL;
1288         }
1289         sub_tree->role = AST_SIP_NOTIFIER;
1290
1291         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1292         if (!dlg) {
1293                 if (*dlg_status != PJ_EEXISTS) {
1294                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1295                 }
1296                 ao2_ref(sub_tree, -1);
1297                 return NULL;
1298         }
1299
1300         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1301                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1302         if (persistence) {
1303                 /* Update the created dialog with the persisted information */
1304                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1305                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1306                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1307                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1308                 dlg->local.cseq = persistence->cseq;
1309                 dlg->remote.cseq = persistence->cseq;
1310         }
1311
1312         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1313         subscription_setup_dialog(sub_tree, dlg);
1314
1315         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1316                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1317
1318         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1319
1320         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1321         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1322                 sub_tree->is_list = 1;
1323         }
1324
1325         return sub_tree;
1326 }
1327
1328 static int initial_notify_task(void *obj);
1329 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1330
1331 /*! \brief Callback function to perform the actual recreation of a subscription */
1332 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1333 {
1334         struct subscription_persistence *persistence = obj;
1335         pj_pool_t *pool = arg;
1336         pjsip_rx_data rdata = { { 0, }, };
1337         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1338         struct sip_subscription_tree *sub_tree;
1339         struct ast_sip_pubsub_body_generator *generator;
1340         int resp;
1341         char *resource;
1342         size_t resource_size;
1343         pjsip_sip_uri *request_uri;
1344         struct resource_tree tree;
1345         pjsip_expires_hdr *expires_header;
1346         struct ast_sip_subscription_handler *handler;
1347
1348         /* If this subscription has already expired remove it */
1349         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1350                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1351                 return 0;
1352         }
1353
1354         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
1355         if (!endpoint) {
1356                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
1357                         persistence->endpoint);
1358                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1359                 return 0;
1360         }
1361
1362         pj_pool_reset(pool);
1363         rdata.tp_info.pool = pool;
1364
1365         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1366                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1367                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
1368                         persistence->endpoint);
1369                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1370                 return 0;
1371         }
1372
1373         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1374                 ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n",
1375                                 ast_sorcery_object_get_id(endpoint));
1376                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1377                 return 0;
1378         }
1379
1380         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
1381         resource_size = pj_strlen(&request_uri->user) + 1;
1382         resource = ast_alloca(resource_size);
1383         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1384
1385         /* Update the expiration header with the new expiration */
1386         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
1387         if (!expires_header) {
1388                 expires_header = pjsip_expires_hdr_create(pool, 0);
1389                 if (!expires_header) {
1390                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1391                         return 0;
1392                 }
1393                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
1394         }
1395         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1396
1397         handler = subscription_get_handler_from_rdata(&rdata);
1398         if (!handler || !handler->notifier) {
1399                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1400                 return 0;
1401         }
1402
1403         generator = subscription_get_generator_from_rdata(&rdata, handler);
1404         if (!generator) {
1405                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1406                 return 0;
1407         }
1408
1409         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
1410                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1411
1412         memset(&tree, 0, sizeof(tree));
1413         resp = build_resource_tree(endpoint, handler, resource, &tree,
1414                 ast_sip_pubsub_has_eventlist_support(&rdata));
1415         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1416                 pj_status_t dlg_status;
1417
1418                 sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status);
1419                 if (!sub_tree) {
1420                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1421                         ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
1422                         return 0;
1423                 }
1424                 sub_tree->persistence = ao2_bump(persistence);
1425                 subscription_persistence_update(sub_tree, &rdata);
1426                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
1427                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1428                         ao2_ref(sub_tree, -1);
1429                 }
1430         } else {
1431                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1432         }
1433         resource_tree_destroy(&tree);
1434
1435         return 0;
1436 }
1437
1438 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1439 static int subscription_persistence_load(void *data)
1440 {
1441         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1442                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1443         pj_pool_t *pool;
1444
1445         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1446                 PJSIP_POOL_RDATA_INC);
1447         if (!pool) {
1448                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1449                 return 0;
1450         }
1451
1452         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1453
1454         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1455
1456         ao2_ref(persisted_subscriptions, -1);
1457         return 0;
1458 }
1459
1460 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1461 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1462 {
1463         struct ast_json_payload *payload;
1464         const char *type;
1465
1466         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1467                 return;
1468         }
1469
1470         payload = stasis_message_data(message);
1471         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1472
1473         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1474          * recreate SIP subscriptions.
1475          */
1476         if (strcmp(type, "FullyBooted")) {
1477                 return;
1478         }
1479
1480         /* This has to be here so the subscription is recreated when the body generator is available */
1481         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1482
1483         /* Once the system is fully booted we don't care anymore */
1484         stasis_unsubscribe(sub);
1485 }
1486
1487 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1488
1489 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1490 {
1491         int num = 0;
1492         struct sip_subscription_tree *i;
1493         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1494
1495         if (!on_subscription) {
1496                 return num;
1497         }
1498
1499         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1500                 if (on_subscription(i, arg)) {
1501                         break;
1502                 }
1503                 ++num;
1504         }
1505         return num;
1506 }
1507
1508 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1509                                     struct ast_str **buf)
1510 {
1511         char str[256];
1512         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1513
1514         ast_str_append(buf, 0, "Role: %s\r\n",
1515                        sip_subscription_roles_map[sub_tree->role]);
1516         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1517                        ast_sorcery_object_get_id(sub_tree->endpoint));
1518
1519         if (sub_tree->dlg) {
1520                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1521         } else {
1522                 ast_copy_string(str, "<unknown>", sizeof(str));
1523         }
1524         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1525
1526         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1527
1528         ast_callerid_merge(str, sizeof(str),
1529                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1530                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1531                            "Unknown");
1532
1533         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1534
1535         /* XXX This needs to be done recursively for lists */
1536         if (sub_tree->root->handler->to_ami) {
1537                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1538         }
1539 }
1540
1541
1542 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1543 {
1544         pjsip_dialog *dlg;
1545         pjsip_msg *msg;
1546         pj_str_t name;
1547
1548         dlg = sub->tree->dlg;
1549         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1550         pj_cstr(&name, header);
1551
1552         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1553 }
1554
1555 /* XXX This function is not used. */
1556 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1557                 struct ast_sip_endpoint *endpoint, const char *resource)
1558 {
1559         struct ast_sip_subscription *sub;
1560         pjsip_dialog *dlg;
1561         struct ast_sip_contact *contact;
1562         pj_str_t event;
1563         pjsip_tx_data *tdata;
1564         pjsip_evsub *evsub;
1565         struct sip_subscription_tree *sub_tree = NULL;
1566
1567         sub_tree = allocate_subscription_tree(endpoint);
1568         if (!sub_tree) {
1569                 return NULL;
1570         }
1571
1572         sub = allocate_subscription(handler, resource, sub_tree);
1573         if (!sub) {
1574                 ao2_cleanup(sub_tree);
1575                 return NULL;
1576         }
1577
1578         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1579         if (!contact || ast_strlen_zero(contact->uri)) {
1580                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1581                                 ast_sorcery_object_get_id(endpoint));
1582                 ao2_ref(sub_tree, -1);
1583                 ao2_cleanup(contact);
1584                 return NULL;
1585         }
1586
1587         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1588         ao2_cleanup(contact);
1589         if (!dlg) {
1590                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1591                 ao2_ref(sub_tree, -1);
1592                 return NULL;
1593         }
1594
1595         pj_cstr(&event, handler->event_name);
1596         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1597         subscription_setup_dialog(sub_tree, dlg);
1598
1599         evsub = sub_tree->evsub;
1600
1601         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1602                 pjsip_evsub_send_request(evsub, tdata);
1603         } else {
1604                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1605                  * being called and terminating the subscription. Therefore, we don't
1606                  * need to decrease the reference count of sub here.
1607                  */
1608                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1609                 ao2_ref(sub_tree, -1);
1610                 return NULL;
1611         }
1612
1613         return sub;
1614 }
1615
1616 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1617 {
1618         ast_assert(sub->tree->endpoint != NULL);
1619         return ao2_bump(sub->tree->endpoint);
1620 }
1621
1622 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1623 {
1624         ast_assert(sub->tree->serializer != NULL);
1625         return sub->tree->serializer;
1626 }
1627
1628 /*!
1629  * \brief Pre-allocate a buffer for the transmission
1630  *
1631  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1632  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1633  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1634  * packet, then we get told the message is too long to be sent.
1635  *
1636  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1637  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1638  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1639  * if the message will fit, and resizing the buffer as required.
1640  *
1641  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1642  * it at 64000 for a couple of reasons:
1643  * 1) Allocating more than 64K at a time is hard to justify
1644  * 2) If the message goes through proxies, those proxies will want to add Via and
1645  *    Record-Route headers, making the message even larger. Giving some space for
1646  *    those headers is a nice thing to do.
1647  *
1648  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1649  * going to impose the same 64K limit as a memory savings.
1650  *
1651  * \param tdata The tdata onto which to allocate a buffer
1652  * \retval 0 Success
1653  * \retval -1 The message is too large
1654  */
1655 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1656 {
1657         int buf_size;
1658         int size = -1;
1659         char *buf;
1660
1661         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1662                 buf = pj_pool_alloc(tdata->pool, buf_size);
1663                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1664         }
1665
1666         if (size == -1) {
1667                 return -1;
1668         }
1669
1670         tdata->buf.start = buf;
1671         tdata->buf.cur = tdata->buf.start;
1672         tdata->buf.end = tdata->buf.start + buf_size;
1673
1674         return 0;
1675 }
1676
1677 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1678 {
1679 #ifdef TEST_FRAMEWORK
1680         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1681 #endif
1682         pjsip_evsub *evsub = sub_tree->evsub;
1683         int res;
1684
1685         if (allocate_tdata_buffer(tdata)) {
1686                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1687                 return -1;
1688         }
1689
1690         res = pjsip_evsub_send_request(evsub, tdata) == PJ_SUCCESS ? 0 : -1;
1691         subscription_persistence_update(sub_tree, NULL);
1692
1693         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1694                 "StateText: %s\r\n"
1695                 "Endpoint: %s\r\n",
1696                 pjsip_evsub_get_state_name(evsub),
1697                 ast_sorcery_object_get_id(endpoint));
1698
1699         return res;
1700 }
1701
1702 /*!
1703  * \brief Add a resource XML element to an RLMI body
1704  *
1705  * Each resource element represents a subscribed resource in the list. This function currently
1706  * will unconditionally add an instance element to each created resource element. Instance
1707  * elements refer to later parts in the multipart body.
1708  *
1709  * \param pool PJLIB allocation pool
1710  * \param cid Content-ID header of the resource
1711  * \param resource_name Name of the resource
1712  * \param resource_uri URI of the resource
1713  * \param state State of the subscribed resource
1714  */
1715 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1716                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1717 {
1718         static pj_str_t cid_name = { "cid", 3 };
1719         pj_xml_node *resource;
1720         pj_xml_node *name;
1721         pj_xml_node *instance;
1722         pj_xml_attr *cid_attr;
1723         char id[6];
1724         char uri[PJSIP_MAX_URL_SIZE];
1725
1726         /* This creates a string representing the Content-ID without the enclosing < > */
1727         const pj_str_t cid_stripped = {
1728                 .ptr = cid->hvalue.ptr + 1,
1729                 .slen = cid->hvalue.slen - 2,
1730         };
1731
1732         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1733         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1734         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1735
1736         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1737         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1738
1739         pj_strdup2(pool, &name->content, resource_name);
1740
1741         ast_generate_random_string(id, sizeof(id));
1742
1743         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1744         ast_sip_presence_xml_create_attr(pool, instance, "state",
1745                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1746
1747         /* Use the PJLIB-util XML library directly here since we are using a
1748          * pj_str_t
1749          */
1750
1751         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1752         pj_xml_add_attr(instance, cid_attr);
1753 }
1754
1755 /*!
1756  * \brief A multipart body part and meta-information
1757  *
1758  * When creating a multipart body part, the end result (the
1759  * pjsip_multipart_part) is hard to inspect without undoing
1760  * a lot of what was done to create it. Therefore, we use this
1761  * structure to store meta-information about the body part.
1762  *
1763  * The main consumer of this is the creator of the RLMI body
1764  * part of a multipart resource list body.
1765  */
1766 struct body_part {
1767         /*! Content-ID header for the body part */
1768         pjsip_generic_string_hdr *cid;
1769         /*! Subscribed resource represented in the body part */
1770         const char *resource;
1771         /*! URI for the subscribed body part */
1772         pjsip_sip_uri *uri;
1773         /*! Subscription state of the resource represented in the body part */
1774         pjsip_evsub_state state;
1775         /*! The actual body part that will be present in the multipart body */
1776         pjsip_multipart_part *part;
1777 };
1778
1779 /*!
1780  * \brief Type declaration for container of body part structures
1781  */
1782 AST_VECTOR(body_part_list, struct body_part *);
1783
1784 /*!
1785  * \brief Create a Content-ID header
1786  *
1787  * Content-ID headers are required by RFC2387 for multipart/related
1788  * bodies. They serve as identifiers for each part of the multipart body.
1789  *
1790  * \param pool PJLIB allocation pool
1791  * \param sub Subscription to a resource
1792  */
1793 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1794                 const struct ast_sip_subscription *sub)
1795 {
1796         static const pj_str_t cid_name = { "Content-ID", 10 };
1797         pjsip_generic_string_hdr *cid;
1798         char id[6];
1799         size_t alloc_size;
1800         pj_str_t cid_value;
1801
1802         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1803         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1804         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1805         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1806                         ast_generate_random_string(id, sizeof(id)),
1807                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1808         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1809
1810         return cid;
1811 }
1812
1813 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1814 {
1815         int num_printed;
1816         pj_xml_node *rlmi = msg_body->data;
1817
1818         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1819         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1820                 return -1;
1821         }
1822
1823         return num_printed;
1824 }
1825
1826 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1827 {
1828         const pj_xml_node *rlmi = data;
1829
1830         return pj_xml_clone(pool, rlmi);
1831 }
1832
1833 /*!
1834  * \brief Create an RLMI body part for a multipart resource list body
1835  *
1836  * RLMI (Resource list meta information) is a special body type that lists
1837  * the subscribed resources and tells subscribers the number of subscribed
1838  * resources and what other body parts are in the multipart body. The
1839  * RLMI body also has a version number that a subscriber can use to ensure
1840  * that the locally-stored state corresponds to server state.
1841  *
1842  * \param pool The allocation pool
1843  * \param sub The subscription representing the subscribed resource list
1844  * \param body_parts A container of body parts that RLMI will refer to
1845  * \param full_state Indicates whether this is a full or partial state notification
1846  * \return The multipart part representing the RLMI body
1847  */
1848 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1849                 struct body_part_list *body_parts, unsigned int full_state)
1850 {
1851         static const pj_str_t rlmi_type = { "application", 11 };
1852         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1853         pj_xml_node *rlmi;
1854         pj_xml_node *name;
1855         pjsip_multipart_part *rlmi_part;
1856         char version_str[32];
1857         char uri[PJSIP_MAX_URL_SIZE];
1858         pjsip_generic_string_hdr *cid;
1859         int i;
1860
1861         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1862         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1863
1864         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1865         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1866
1867         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1868         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1869         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1870
1871         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1872         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1873
1874         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1875                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1876
1877                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1878         }
1879
1880         rlmi_part = pjsip_multipart_create_part(pool);
1881
1882         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
1883         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
1884         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
1885         pj_list_init(&rlmi_part->body->content_type.param);
1886
1887         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
1888         rlmi_part->body->clone_data = rlmi_clone_data;
1889         rlmi_part->body->print_body = rlmi_print_body;
1890
1891         cid = generate_content_id_hdr(pool, sub);
1892         pj_list_insert_before(&rlmi_part->hdr, cid);
1893
1894         return rlmi_part;
1895 }
1896
1897 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
1898                 unsigned int force_full_state);
1899
1900 /*!
1901  * \brief Destroy a list of body parts
1902  *
1903  * \param parts The container of parts to destroy
1904  */
1905 static void free_body_parts(struct body_part_list *parts)
1906 {
1907         int i;
1908
1909         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
1910                 struct body_part *part = AST_VECTOR_GET(parts, i);
1911                 ast_free(part);
1912         }
1913
1914         AST_VECTOR_FREE(parts);
1915 }
1916
1917 /*!
1918  * \brief Allocate and initialize a body part structure
1919  *
1920  * \param pool PJLIB allocation pool
1921  * \param sub Subscription representing a subscribed resource
1922  */
1923 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
1924 {
1925         struct body_part *bp;
1926
1927         bp = ast_calloc(1, sizeof(*bp));
1928         if (!bp) {
1929                 return NULL;
1930         }
1931
1932         bp->cid = generate_content_id_hdr(pool, sub);
1933         bp->resource = sub->resource;
1934         bp->state = sub->subscription_state;
1935         bp->uri = sub->uri;
1936
1937         return bp;
1938 }
1939
1940 /*!
1941  * \brief Create a multipart body part for a subscribed resource
1942  *
1943  * \param pool PJLIB allocation pool
1944  * \param sub The subscription representing a subscribed resource
1945  * \param parts A vector of parts to append the created part to.
1946  * \param use_full_state Unused locally, but may be passed to other functions
1947  */
1948 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
1949                 struct body_part_list *parts, unsigned int use_full_state)
1950 {
1951         struct body_part *bp;
1952         pjsip_msg_body *body;
1953
1954         bp = allocate_body_part(pool, sub);
1955         if (!bp) {
1956                 return;
1957         }
1958
1959         body = generate_notify_body(pool, sub, use_full_state);
1960         if (!body) {
1961                 /* Partial state was requested and the resource has not changed state */
1962                 ast_free(bp);
1963                 return;
1964         }
1965
1966         bp->part = pjsip_multipart_create_part(pool);
1967         bp->part->body = body;
1968         pj_list_insert_before(&bp->part->hdr, bp->cid);
1969
1970         AST_VECTOR_APPEND(parts, bp);
1971 }
1972
1973 /*!
1974  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
1975  *
1976  * \param pool
1977  * \return The multipart message body
1978  */
1979 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
1980 {
1981         pjsip_media_type media_type;
1982         pjsip_param *media_type_param;
1983         char boundary[6];
1984         pj_str_t pj_boundary;
1985
1986         pjsip_media_type_init2(&media_type, "multipart", "related");
1987
1988         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
1989         pj_list_init(media_type_param);
1990
1991         pj_strdup2(pool, &media_type_param->name, "type");
1992         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
1993
1994         pj_list_insert_before(&media_type.param, media_type_param);
1995
1996         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
1997         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
1998 }
1999
2000 /*!
2001  * \brief Create a resource list body for NOTIFY requests
2002  *
2003  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2004  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2005  * convey state of individual subscribed resources.
2006  *
2007  * \param pool PJLIB allocation pool
2008  * \param sub Subscription details from which to generate body
2009  * \param force_full_state If true, ignore resource list settings and send a full state notification
2010  * \return The generated multipart/related body
2011  */
2012 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2013                 unsigned int force_full_state)
2014 {
2015         int i;
2016         pjsip_multipart_part *rlmi_part;
2017         pjsip_msg_body *multipart;
2018         struct body_part_list body_parts;
2019         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2020
2021         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2022                 return NULL;
2023         }
2024
2025         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2026                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2027         }
2028
2029         /* This can happen if issuing partial state and no children of the list have changed state */
2030         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2031                 return NULL;
2032         }
2033
2034         multipart = create_multipart_body(pool);
2035
2036         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2037         if (!rlmi_part) {
2038                 return NULL;
2039         }
2040         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2041
2042         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2043                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2044         }
2045
2046         free_body_parts(&body_parts);
2047         return multipart;
2048 }
2049
2050 /*!
2051  * \brief Create the body for a NOTIFY request.
2052  *
2053  * \param pool The pool used for allocations
2054  * \param root The root of the subscription tree
2055  * \param force_full_state If true, ignore resource list settings and send a full state notification
2056  */
2057 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2058                 unsigned int force_full_state)
2059 {
2060         pjsip_msg_body *body;
2061
2062         if (AST_VECTOR_SIZE(&root->children) == 0) {
2063                 if (force_full_state || root->body_changed) {
2064                         /* Not a list. We've already generated the body and saved it on the subscription.
2065                          * Use that directly.
2066                          */
2067                         pj_str_t type;
2068                         pj_str_t subtype;
2069                         pj_str_t text;
2070
2071                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2072                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2073                         pj_cstr(&text, ast_str_buffer(root->body_text));
2074
2075                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2076                         root->body_changed = 0;
2077                 } else {
2078                         body = NULL;
2079                 }
2080         } else {
2081                 body = generate_list_body(pool, root, force_full_state);
2082         }
2083
2084         return body;
2085 }
2086
2087 /*!
2088  * \brief Shortcut method to create a Require: eventlist header
2089  */
2090 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2091 {
2092         pjsip_require_hdr *require;
2093
2094         require = pjsip_require_hdr_create(pool);
2095         pj_strdup2(pool, &require->values[0], "eventlist");
2096         require->count = 1;
2097
2098         return require;
2099 }
2100
2101 /*!
2102  * \brief Send a NOTIFY request to a subscriber
2103  *
2104  * \pre sub_tree->dlg is locked
2105  *
2106  * \param sub_tree The subscription tree representing the subscription
2107  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2108  * \retval 0 Success
2109  * \retval non-zero Failure
2110  */
2111 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2112 {
2113         pjsip_evsub *evsub = sub_tree->evsub;
2114         pjsip_tx_data *tdata;
2115
2116         if (ast_shutdown_final()
2117                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2118                 && sub_tree->persistence) {
2119                 return 0;
2120         }
2121
2122         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2123                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2124                 return -1;
2125         }
2126
2127         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2128         if (!tdata->msg->body) {
2129                 pjsip_tx_data_dec_ref(tdata);
2130                 return -1;
2131         }
2132
2133         if (sub_tree->is_list) {
2134                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2135                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2136         }
2137
2138         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2139                 sub_tree->last_notify = 1;
2140         }
2141         if (sip_subscription_send_request(sub_tree, tdata)) {
2142                 return -1;
2143         }
2144
2145         sub_tree->send_scheduled_notify = 0;
2146
2147         return 0;
2148 }
2149
2150 static int serialized_send_notify(void *userdata)
2151 {
2152         struct sip_subscription_tree *sub_tree = userdata;
2153         pjsip_dialog *dlg = sub_tree->dlg;
2154
2155         pjsip_dlg_inc_lock(dlg);
2156         /* It's possible that between when the notification was scheduled
2157          * and now, that a new SUBSCRIBE arrived, requiring full state to be
2158          * sent out in an immediate NOTIFY. If that has happened, we need to
2159          * bail out here instead of sending the batched NOTIFY.
2160          */
2161         if (!sub_tree->send_scheduled_notify) {
2162                 pjsip_dlg_dec_lock(dlg);
2163                 ao2_cleanup(sub_tree);
2164                 return 0;
2165         }
2166
2167         send_notify(sub_tree, 0);
2168         ast_test_suite_event_notify("SUBSCRIPTION_STATE_CHANGED",
2169                         "Resource: %s",
2170                         sub_tree->root->resource);
2171         sub_tree->notify_sched_id = -1;
2172         pjsip_dlg_dec_lock(dlg);
2173         ao2_cleanup(sub_tree);
2174         return 0;
2175 }
2176
2177 static int sched_cb(const void *data)
2178 {
2179         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2180
2181         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2182         ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree);
2183         return 0;
2184 }
2185
2186 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2187 {
2188         /* There's already a notification scheduled */
2189         if (sub_tree->notify_sched_id > -1) {
2190                 return 0;
2191         }
2192
2193         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2194         if (sub_tree->notify_sched_id < 0) {
2195                 return -1;
2196         }
2197
2198         sub_tree->send_scheduled_notify = 1;
2199         return 0;
2200 }
2201
2202 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2203                 int terminate)
2204 {
2205         int res;
2206         pjsip_dialog *dlg = sub->tree->dlg;
2207
2208         pjsip_dlg_inc_lock(dlg);
2209
2210         if (!sub->tree->evsub) {
2211                 pjsip_dlg_dec_lock(dlg);
2212                 return 0;
2213         }
2214
2215         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2216                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2217                 pjsip_dlg_dec_lock(dlg);
2218                 return -1;
2219         }
2220
2221         sub->body_changed = 1;
2222         if (terminate) {
2223                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2224         }
2225
2226         if (sub->tree->notification_batch_interval) {
2227                 res = schedule_notification(sub->tree);
2228         } else {
2229                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2230                 ao2_ref(sub->tree, +1);
2231                 res = send_notify(sub->tree, 0);
2232                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2233                                 "Resource: %s",
2234                                 sub->tree->root->resource);
2235                 ao2_ref(sub->tree, -1);
2236         }
2237
2238         pjsip_dlg_dec_lock(dlg);
2239         return res;
2240 }
2241
2242 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2243 {
2244         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2245 }
2246
2247 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2248 {
2249         pjsip_dialog *dlg;
2250
2251         dlg = sub->tree->dlg;
2252         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2253 }
2254
2255 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2256 {
2257         return sub->resource;
2258 }
2259
2260 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2261 {
2262         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2263 }
2264
2265 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2266 {
2267         pjsip_hdr res_hdr;
2268
2269         /* If this is a persistence recreation the subscription has already been accepted */
2270         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2271                 return 0;
2272         }
2273
2274         pj_list_init(&res_hdr);
2275         if (sub_tree->is_list) {
2276                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2277                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2278         }
2279
2280         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2281 }
2282
2283 static void subscription_datastore_destroy(void *obj)
2284 {
2285         struct ast_datastore *datastore = obj;
2286
2287         /* Using the destroy function (if present) destroy the data */
2288         if (datastore->info->destroy != NULL && datastore->data != NULL) {
2289                 datastore->info->destroy(datastore->data);
2290                 datastore->data = NULL;
2291         }
2292
2293         ast_free((void *) datastore->uid);
2294         datastore->uid = NULL;
2295 }
2296
2297 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2298 {
2299         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
2300         char uuid_buf[AST_UUID_STR_LEN];
2301         const char *uid_ptr = uid;
2302
2303         if (!info) {
2304                 return NULL;
2305         }
2306
2307         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
2308         if (!datastore) {
2309                 return NULL;
2310         }
2311
2312         datastore->info = info;
2313         if (ast_strlen_zero(uid)) {
2314                 /* They didn't provide an ID so we'll provide one ourself */
2315                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
2316         }
2317
2318         datastore->uid = ast_strdup(uid_ptr);
2319         if (!datastore->uid) {
2320                 return NULL;
2321         }
2322
2323         ao2_ref(datastore, +1);
2324         return datastore;
2325 }
2326
2327 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2328 {
2329         ast_assert(datastore != NULL);
2330         ast_assert(datastore->info != NULL);
2331         ast_assert(!ast_strlen_zero(datastore->uid));
2332
2333         if (!ao2_link(subscription->datastores, datastore)) {
2334                 return -1;
2335         }
2336         return 0;
2337 }
2338
2339 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2340 {
2341         return ao2_find(subscription->datastores, name, OBJ_KEY);
2342 }
2343
2344 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2345 {
2346         ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
2347 }
2348
2349 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2350 {
2351         ast_assert(datastore != NULL);
2352         ast_assert(datastore->info != NULL);
2353         ast_assert(!ast_strlen_zero(datastore->uid));
2354
2355         if (!ao2_link(publication->datastores, datastore)) {
2356                 return -1;
2357         }
2358         return 0;
2359 }
2360
2361 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2362 {
2363         return ao2_find(publication->datastores, name, OBJ_KEY);
2364 }
2365
2366 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2367 {
2368         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
2369 }
2370
2371 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2372
2373 static int publication_hash_fn(const void *obj, const int flags)
2374 {
2375         const struct ast_sip_publication *publication = obj;
2376         const int *entity_tag = obj;
2377
2378         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2379 }
2380
2381 static int publication_cmp_fn(void *obj, void *arg, int flags)
2382 {
2383         const struct ast_sip_publication *publication1 = obj;
2384         const struct ast_sip_publication *publication2 = arg;
2385         const int *entity_tag = arg;
2386
2387         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2388                 CMP_MATCH | CMP_STOP : 0);
2389 }
2390
2391 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2392 {
2393         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2394         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2395 }
2396
2397 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2398 {
2399         if (ast_strlen_zero(handler->event_name)) {
2400                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2401                 return -1;
2402         }
2403
2404         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2405                 publication_hash_fn, publication_cmp_fn))) {
2406                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2407                         handler->event_name);
2408                 return -1;
2409         }
2410
2411         publish_add_handler(handler);
2412
2413         ast_module_ref(ast_module_info->self);
2414
2415         return 0;
2416 }
2417
2418 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2419 {
2420         struct ast_sip_publish_handler *iter;
2421         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2422         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2423                 if (handler == iter) {
2424                         AST_RWLIST_REMOVE_CURRENT(next);
2425                         ao2_cleanup(handler->publications);
2426                         ast_module_unref(ast_module_info->self);
2427                         break;
2428                 }
2429         }
2430         AST_RWLIST_TRAVERSE_SAFE_END;
2431 }
2432
2433 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2434
2435 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2436 {
2437         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2438         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2439         ast_module_ref(ast_module_info->self);
2440 }
2441
2442 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2443 {
2444         struct ast_sip_subscription_handler *iter;
2445         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2446
2447         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2448                 if (!strcmp(iter->event_name, event_name)) {
2449                         break;
2450                 }
2451         }
2452         return iter;
2453 }
2454
2455 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2456 {
2457         pj_str_t event;
2458         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2459         struct ast_sip_subscription_handler *existing;
2460         int i = 0;
2461
2462         if (ast_strlen_zero(handler->event_name)) {
2463                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2464                 return -1;
2465         }
2466
2467         existing = find_sub_handler_for_event_name(handler->event_name);
2468         if (existing) {
2469                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2470                                 "A handler is already registered\n", handler->event_name);
2471                 return -1;
2472         }
2473
2474         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2475                 pj_cstr(&accept[i], handler->accept[i]);
2476         }
2477
2478         pj_cstr(&event, handler->event_name);
2479
2480         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2481
2482         sub_add_handler(handler);
2483
2484         return 0;
2485 }
2486
2487 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2488 {
2489         struct ast_sip_subscription_handler *iter;
2490         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2491         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2492                 if (handler == iter) {
2493                         AST_RWLIST_REMOVE_CURRENT(next);
2494                         ast_module_unref(ast_module_info->self);
2495                         break;
2496                 }
2497         }
2498         AST_RWLIST_TRAVERSE_SAFE_END;
2499 }
2500
2501 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
2502                 const char *content_subtype)
2503 {
2504         struct ast_sip_pubsub_body_generator *iter;
2505         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2506
2507         AST_LIST_TRAVERSE(&body_generators, iter, list) {
2508                 if (!strcmp(iter->type, content_type) &&
2509                                 !strcmp(iter->subtype, content_subtype)) {
2510                         break;
2511                 }
2512         };
2513
2514         return iter;
2515 }
2516
2517 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2518 {
2519         char *accept_copy = ast_strdupa(accept);
2520         char *subtype = accept_copy;
2521         char *type = strsep(&subtype, "/");
2522
2523         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2524                 return NULL;
2525         }
2526
2527         return find_body_generator_type_subtype(type, subtype);
2528 }
2529
2530 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2531                 size_t num_accept, const char *body_type)
2532 {
2533         int i;
2534         struct ast_sip_pubsub_body_generator *generator = NULL;
2535
2536         for (i = 0; i < num_accept; ++i) {
2537                 generator = find_body_generator_accept(accept[i]);
2538                 if (generator) {
2539                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2540                         if (strcmp(generator->body_type, body_type)) {
2541                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2542                                                 generator->type, generator->subtype, generator);
2543                                 generator = NULL;
2544                                 continue;
2545                         }
2546                         break;
2547                 } else {
2548                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2549                 }
2550         }
2551
2552         return generator;
2553 }
2554
2555 static int generate_initial_notify(struct ast_sip_subscription *sub)
2556 {
2557         void *notify_data;
2558         int res;
2559         struct ast_sip_body_data data = {
2560                 .body_type = sub->handler->body_type,
2561         };
2562
2563         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2564                 int i;
2565
2566                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2567                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2568                                 return -1;
2569                         }
2570                 }
2571
2572                 return 0;
2573         }
2574
2575         /* We notify subscription establishment only on the tree leaves. */
2576         if (sub->handler->notifier->subscription_established(sub)) {
2577                 return -1;
2578         }
2579
2580         notify_data = sub->handler->notifier->get_notify_data(sub);
2581         if (!notify_data) {
2582                 return -1;
2583         }
2584
2585         data.body_data = notify_data;
2586
2587         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2588                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2589
2590         ao2_cleanup(notify_data);
2591
2592         return res;
2593 }
2594
2595 static int initial_notify_task(void * obj)
2596 {
2597         struct sip_subscription_tree *sub_tree;
2598
2599         sub_tree = obj;
2600         if (generate_initial_notify(sub_tree->root)) {
2601                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2602         } else {
2603                 send_notify(sub_tree, 1);
2604                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2605                         "Resource: %s",
2606                         sub_tree->root->resource);
2607         }
2608
2609         ao2_ref(sub_tree, -1);
2610         return 0;
2611 }
2612
2613 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2614 {
2615         pjsip_expires_hdr *expires_header;
2616         struct ast_sip_subscription_handler *handler;
2617         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2618         struct sip_subscription_tree *sub_tree;
2619         struct ast_sip_pubsub_body_generator *generator;
2620         char *resource;
2621         pjsip_uri *request_uri;
2622         pjsip_sip_uri *request_uri_sip;
2623         size_t resource_size;
2624         int resp;
2625         struct resource_tree tree;
2626         pj_status_t dlg_status;
2627
2628         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2629         ast_assert(endpoint != NULL);
2630
2631         if (!endpoint->subscription.allow) {
2632                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2633                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2634                 return PJ_TRUE;
2635         }
2636
2637         request_uri = rdata->msg_info.msg->line.req.uri;
2638
2639         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2640                 char uri_str[PJSIP_MAX_URL_SIZE];
2641
2642                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2643                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2644                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2645                 return PJ_TRUE;
2646         }
2647
2648         request_uri_sip = pjsip_uri_get_uri(request_uri);
2649         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2650         resource = ast_alloca(resource_size);
2651         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2652
2653         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2654
2655         if (expires_header) {
2656                 if (expires_header->ivalue == 0) {
2657                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2658                                 ast_sorcery_object_get_id(endpoint));
2659                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2660                                 return PJ_TRUE;
2661                 }
2662                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2663                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2664                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2665                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2666                         return PJ_TRUE;
2667                 }
2668         }
2669
2670         handler = subscription_get_handler_from_rdata(rdata);
2671         if (!handler) {
2672                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2673                 return PJ_TRUE;
2674         }
2675
2676         generator = subscription_get_generator_from_rdata(rdata, handler);
2677         if (!generator) {
2678                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2679                 return PJ_TRUE;
2680         }
2681
2682         memset(&tree, 0, sizeof(tree));
2683         resp = build_resource_tree(endpoint, handler, resource, &tree,
2684                 ast_sip_pubsub_has_eventlist_support(rdata));
2685         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2686                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2687                 resource_tree_destroy(&tree);
2688                 return PJ_TRUE;
2689         }
2690
2691         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2692         if (!sub_tree) {
2693                 if (dlg_status != PJ_EEXISTS) {
2694                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2695                 }
2696         } else {
2697                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2698                 subscription_persistence_update(sub_tree, rdata);
2699                 sip_subscription_accept(sub_tree, rdata, resp);
2700                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
2701                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2702                         ao2_ref(sub_tree, -1);
2703                 }
2704         }
2705
2706         resource_tree_destroy(&tree);
2707         return PJ_TRUE;
2708 }
2709
2710 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2711 {
2712         struct ast_sip_publish_handler *iter = NULL;
2713         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2714
2715         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2716                 if (strcmp(event, iter->event_name)) {
2717                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2718                         continue;
2719                 }
2720                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2721                 break;
2722         }
2723
2724         return iter;
2725 }
2726
2727 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2728         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2729 {
2730         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2731
2732         if (etag_hdr) {
2733                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2734
2735                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2736
2737                 if (sscanf(etag, "%30d", entity_id) != 1) {
2738                         return SIP_PUBLISH_UNKNOWN;
2739                 }
2740         }
2741
2742         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2743
2744         if (!(*expires)) {
2745                 return SIP_PUBLISH_REMOVE;
2746         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2747                 return SIP_PUBLISH_INITIAL;
2748         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2749                 return SIP_PUBLISH_REFRESH;
2750         } else if (etag_hdr && rdata->msg_info.msg->body) {
2751                 return SIP_PUBLISH_MODIFY;
2752         }
2753
2754         return SIP_PUBLISH_UNKNOWN;
2755 }
2756
2757 /*! \brief Internal destructor for publications */
2758 static void publication_destroy_fn(void *obj)
2759 {
2760         struct ast_sip_publication *publication = obj;
2761
2762         ast_debug(3, "Destroying SIP publication\n");
2763
2764         ao2_cleanup(publication->datastores);
2765         ao2_cleanup(publication->endpoint);
2766 }
2767
2768 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2769         const char *resource, const char *event_configuration_name)
2770 {
2771         struct ast_sip_publication *publication;
2772         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2773         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2774         char *dst;
2775
2776         ast_assert(endpoint != NULL);
2777
2778         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2779                 return NULL;
2780         }
2781
2782         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
2783                 ao2_ref(publication, -1);
2784                 return NULL;
2785         }
2786
2787         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2788         ao2_ref(endpoint, +1);
2789         publication->endpoint = endpoint;
2790         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2791         publication->sched_id = -1;
2792         dst = publication->data;
2793         publication->resource = strcpy(dst, resource);
2794         dst += resource_len;
2795         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2796
2797         return publication;
2798 }
2799
2800 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2801                 pjsip_rx_data *rdata)
2802 {
2803         pj_status_t status;
2804         pjsip_tx_data *tdata;
2805         pjsip_transaction *tsx;
2806
2807         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2808                 return -1;
2809         }
2810
2811         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2812                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
2813                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
2814
2815                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
2816                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
2817                         pjsip_tx_data_dec_ref(tdata);
2818                         return -1;
2819                 }
2820
2821                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
2822                 ast_sip_add_header(tdata, "Expires", expires);
2823         }
2824
2825         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
2826                 return -1;
2827         }
2828
2829         pjsip_tsx_recv_msg(tsx, rdata);
2830
2831         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2832                 return -1;
2833         }
2834
2835         return 0;
2836 }
2837
2838 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2839         struct ast_sip_publish_handler *handler)
2840 {
2841         struct ast_sip_publication *publication;
2842         char *resource_name;
2843         size_t resource_size;
2844         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2845         struct ast_variable *event_configuration_name = NULL;
2846         pjsip_uri *request_uri;
2847         pjsip_sip_uri *request_uri_sip;
2848         int resp;
2849
2850         request_uri = rdata->msg_info.msg->line.req.uri;
2851
2852         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2853                 char uri_str[PJSIP_MAX_URL_SIZE];
2854
2855                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2856                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2857                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2858                 return NULL;
2859         }
2860
2861         request_uri_sip = pjsip_uri_get_uri(request_uri);
2862         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2863         resource_name = ast_alloca(resource_size);
2864         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2865
2866         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2867         if (!resource) {
2868                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2869                 return NULL;
2870         }
2871
2872         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2873                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2874                 return NULL;
2875         }
2876
2877         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2878                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2879                         break;
2880                 }
2881         }
2882
2883         if (!event_configuration_name) {
2884                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2885                 return NULL;
2886         }
2887
2888         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
2889
2890         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2891                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2892                 return NULL;
2893         }
2894
2895         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
2896
2897         if (!publication) {
2898                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
2899                 return NULL;
2900         }
2901
2902         publication->handler = handler;
2903         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
2904                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
2905                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2906                 ao2_cleanup(publication);
2907                 return NULL;
2908         }
2909
2910         sip_publication_respond(publication, resp, rdata);
2911
2912         return publication;
2913 }
2914
2915 static int publish_expire_callback(void *data)
2916 {
2917         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
2918
2919         if (publication->handler->publish_expire) {
2920                 publication->handler->publish_expire(publication);
2921         }
2922
2923         return 0;
2924 }
2925
2926 static int publish_expire(const void *data)
2927 {
2928         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
2929
2930         ao2_unlink(publication->handler->publications, publication);
2931         publication->sched_id = -1;
2932
2933         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
2934                 ao2_cleanup(publication);
2935         }
2936
2937         return 0;
2938 }
2939
2940 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
2941 {
2942         pjsip_event_hdr *event_header;
2943         struct ast_sip_publish_handler *handler;
2944         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2945         char event[32];
2946         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
2947         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
2948         enum sip_publish_type publish_type;
2949         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
2950         int expires = 0, entity_id, response = 0;
2951
2952         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2953         ast_assert(endpoint != NULL);
2954
2955         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
2956         if (!event_header) {
2957                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
2958                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2959                 return PJ_TRUE;
2960         }
2961         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
2962
2963         handler = find_pub_handler(event);
2964         if (!handler) {
2965                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
2966                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2967                 return PJ_TRUE;
2968         }
2969
2970         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
2971
2972         /* If this is not an initial publish ensure that a publication is present */
2973         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
2974                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
2975                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
2976
2977                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
2978                                 NULL, NULL);
2979                         return PJ_TRUE;
2980                 }
2981
2982                 /* Per the RFC every response has to have a new entity tag */
2983                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2984
2985                 /* Update the expires here so that the created responses will contain the correct value */
2986                 publication->expires = expires;
2987         }
2988
2989         switch (publish_type) {
2990                 case SIP_PUBLISH_INITIAL:
2991                         publication = publish_request_initial(endpoint, rdata, handler);
2992                         break;
2993                 case SIP_PUBLISH_REFRESH:
2994                 case SIP_PUBLISH_MODIFY:
2995                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
2996                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
2997                                 /* If an error occurs we want to terminate the publication */
2998                                 expires = 0;
2999                         }
3000                         response = 200;
3001                         break;
3002                 case SIP_PUBLISH_REMOVE:
3003                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3004                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3005                         response = 200;
3006                         break;
3007                 case SIP_PUBLISH_UNKNOWN:
3008                 default:
3009                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3010                         break;
3011         }
3012
3013         if (publication) {
3014                 if (expires) {
3015                         ao2_link(handler->publications, publication);
3016
3017                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3018                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3019                 } else {
3020                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3021                 }
3022         }
3023
3024         if (response) {
3025                 sip_publication_respond(publication, response, rdata);
3026         }
3027
3028         return PJ_TRUE;
3029 }
3030
3031 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3032 {
3033         return pub->endpoint;
3034 }
3035
3036 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3037 {
3038         return pub->resource;
3039 }
3040
3041 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3042 {
3043         return pub->event_configuration_name;
3044 }
3045
3046 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3047 {
3048         struct ast_sip_pubsub_body_generator *existing;
3049         pj_str_t accept;
3050         pj_size_t accept_len;
3051
3052         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
3053         if (existing) {
3054                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
3055                                 "One is already registered.\n", generator->type, generator->subtype);
3056                 return -1;
3057         }
3058
3059         AST_RWLIST_WRLOCK(&body_generators);
3060         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3061         AST_RWLIST_UNLOCK(&body_generators);
3062
3063         /* Lengths of type and subtype plus a slash. */
3064         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3065
3066         /* Add room for null terminator that sprintf() will set. */
3067         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3068         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3069
3070         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3071                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3072
3073         return 0;
3074 }
3075
3076 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3077 {
3078         struct ast_sip_pubsub_body_generator *iter;
3079         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3080
3081         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3082                 if (iter == generator) {
3083                         AST_LIST_REMOVE_CURRENT(list);
3084                         break;
3085                 }
3086         }
3087         AST_RWLIST_TRAVERSE_SAFE_END;
3088 }
3089
3090 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3091 {
3092         AST_RWLIST_WRLOCK(&body_supplements);
3093         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3094         AST_RWLIST_UNLOCK(&body_supplements);
3095
3096         return 0;
3097 }
3098
3099 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3100 {
3101         struct ast_sip_pubsub_body_supplement *iter;
3102         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3103
3104         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3105                 if (iter == supplement) {
3106                         AST_LIST_REMOVE_CURRENT(list);
3107                         break;
3108                 }
3109         }
3110         AST_RWLIST_TRAVERSE_SAFE_END;
3111 }
3112
3113 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3114 {
3115         return sub->body_generator->type;
3116 }
3117
3118 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3119 {
3120         return sub->body_generator->subtype;
3121 }
3122
3123 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3124                 struct ast_sip_body_data *data, struct ast_str **str)
3125 {
3126         struct ast_sip_pubsub_body_supplement *supplement;
3127         struct ast_sip_pubsub_body_generator *generator;
3128         int res = 0;
3129         void *body;
3130
3131         generator = find_body_generator_type_subtype(type, subtype);
3132         if (!generator) {
3133                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3134                                 type, subtype);
3135                 return -1;
3136         }
3137
3138         if (strcmp(data->body_type, generator->body_type)) {
3139                 ast_log(LOG_WARNING, "Body generator does not accept the type of data provided\n");
3140                 return -1;
3141         }
3142
3143         body = generator->allocate_body(data->body_data);
3144         if (!body) {
3145                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
3146                                 type, subtype);
3147                 return -1;
3148         }
3149
3150         if (generator->generate_body_content(body, data->body_data)) {
3151                 res = -1;
3152                 goto end;
3153         }
3154
3155         AST_RWLIST_RDLOCK(&body_supplements);
3156         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3157                 if (!strcmp(generator->type, supplement->type) &&
3158                                 !strcmp(generator->subtype, supplement->subtype)) {
3159                         res = supplement->supplement_body(body, data->body_data);
3160                         if (res) {
3161                                 break;
3162                         }
3163                 }
3164         }
3165         AST_RWLIST_UNLOCK(&body_supplements);
3166
3167         if (!res) {
3168                 generator->to_string(body, str);
3169         }
3170
3171 end:
3172         if (generator->destroy_body) {
3173                 generator->destroy_body(body);
3174         }
3175
3176         return res;
3177 }
3178
3179 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3180 {
3181         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3182                 return pubsub_on_rx_subscribe_request(rdata);
3183         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3184                 return pubsub_on_rx_publish_request(rdata);
3185         }
3186
3187         return PJ_FALSE;
3188 }
3189
3190 static void set_state_terminated(struct ast_sip_subscription *sub)
3191 {
3192         int i;
3193
3194         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3195         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3196                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3197         }
3198 }
3199
3200 /* XXX This function and serialized_pubsub_on_rx_refresh are nearly identical */
3201 static int serialized_pubsub_on_server_timeout(void *userdata)
3202 {
3203         struct sip_subscription_tree *sub_tree = userdata;
3204         pjsip_dialog *dlg = sub_tree->dlg;
3205
3206         pjsip_dlg_inc_lock(dlg);
3207         if (!sub_tree->evsub) {
3208                 pjsip_dlg_dec_lock(dlg);
3209                 return 0;
3210         }
3211         set_state_terminated(sub_tree->root);
3212         send_notify(sub_tree, 1);
3213         ast_test_suite_event_notify("SUBSCRIPTION_TERMINATED",
3214                         "Resource: %s",
3215                         sub_tree->root->resource);
3216
3217         pjsip_dlg_dec_lock(dlg);
3218         ao2_cleanup(sub_tree);
3219         return 0;
3220 }
3221
3222 /*!
3223  * \brief PJSIP callback when underlying SIP subscription changes state
3224  *
3225  * This callback is a bit of a mess, because it's not always called when
3226  * you might expect it to be, and it can be called multiple times for the
3227  * same state.
3228  *
3229  * For instance, this function is not called at all when an incoming SUBSCRIBE
3230  * arrives to refresh a subscription. That makes sense in a way, since the
3231  * subscription state has not made a change; it was active and remains active.
3232  *
3233  * However, if an incoming SUBSCRIBE arrives to end a subscription, then this
3234  * will be called into once upon receiving the SUBSCRIBE (after the call to
3235  * pubsub_on_rx_refresh) and again when sending a NOTIFY to end the subscription.
3236  * In both cases, the apparent state of the subscription is "terminated".
3237  *
3238  * However, the double-terminated state changes don't happen in all cases. For
3239  * instance, if a subscription expires, then the only time this callback is
3240  * called is when we send the NOTIFY to end the subscription.
3241  *
3242  * As far as state changes are concerned, we only ever care about transitions
3243  * to the "terminated" state. The action we take here is dependent on the
3244  * conditions behind why the state change to "terminated" occurred. If the
3245  * state change has occurred because we are sending a NOTIFY to end the
3246  * subscription, we consider this to be the final hurrah of the subscription
3247  * and take measures to start shutting things down. If the state change to
3248  * terminated occurs for a different reason (e.g. transaction timeout,
3249  * incoming SUBSCRIBE to end the subscription), then we push a task to
3250  * send out a NOTIFY. When that NOTIFY is sent, this callback will be