res_pjsip_pubsub.c: Add some notification comments.
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="dialog"><para>
152                                                                 This is identical to <replaceable>presence</replaceable>.
153                                                         </para></enum>
154                                                         <enum name="message-summary"><para>
155                                                                 Message-waiting indication (MWI) reporting.
156                                                         </para></enum>
157                                                 </enumlist>
158                                         </description>
159                                 </configOption>
160                                 <configOption name="list_item">
161                                         <synopsis>The name of a resource to report state on</synopsis>
162                                         <description>
163                                                 <para>In general Asterisk looks up list items in the following way:</para>
164                                                 <para>1. Check if the list item refers to another configured resource list.</para>
165                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
166                                                    to find the specified resource.</para>
167                                                 <para>The second part means that the way the list item is specified depends
168                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
169                                                 set to <literal>presence</literal>, then list items should be in the form of
170                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
171                                                 names should be listed.</para>
172                                         </description>
173                                 </configOption>
174                                 <configOption name="full_state" default="no">
175                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
176                                         <description>
177                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
178                                                 a notification that contains the state of all resources in the list. If the option is
179                                                 disabled, Asterisk will construct a notification that only contains the states of
180                                                 resources that have changed.</para>
181                                                 <note>
182                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
183                                                         to send a notification with the states of all resources in the list. When a subscriber
184                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
185                                                         notification.</para>
186                                                 </note>
187                                         </description>
188                                 </configOption>
189                                 <configOption name="notification_batch_interval" default="0">
190                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
191                                         <description>
192                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
193                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
194                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
195                                                 many notifications.</para>
196                                         </description>
197                                 </configOption>
198                         </configObject>
199                         <configObject name="inbound-publication">
200                                 <synopsis>The configuration for inbound publications</synopsis>
201                                 <configOption name="endpoint" default="">
202                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
203                                 </configOption>
204                                 <configOption name="type">
205                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
206                                 </configOption>
207                         </configObject>
208                 </configFile>
209         </configInfo>
210  ***/
211
212 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
213
214 static struct pjsip_module pubsub_module = {
215         .name = { "PubSub Module", 13 },
216         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
217         .on_rx_request = pubsub_on_rx_request,
218 };
219
220 #define MOD_DATA_PERSISTENCE "sub_persistence"
221 #define MOD_DATA_MSG "sub_msg"
222
223 static const pj_str_t str_event_name = { "Event", 5 };
224
225 /*! \brief Scheduler used for automatically expiring publications */
226 static struct ast_sched_context *sched;
227
228 /*! \brief Number of buckets for publications (on a per handler) */
229 #define PUBLICATIONS_BUCKETS 37
230
231 /*! \brief Default expiration time for PUBLISH if one is not specified */
232 #define DEFAULT_PUBLISH_EXPIRES 3600
233
234 /*! \brief Number of buckets for subscription datastore */
235 #define DATASTORE_BUCKETS 53
236
237 /*! \brief Default expiration for subscriptions */
238 #define DEFAULT_EXPIRES 3600
239
240 /*! \brief Defined method for PUBLISH */
241 const pjsip_method pjsip_publish_method =
242 {
243         PJSIP_OTHER_METHOD,
244         { "PUBLISH", 7 }
245 };
246
247 /*!
248  * \brief The types of PUBLISH messages defined in RFC 3903
249  */
250 enum sip_publish_type {
251         /*!
252          * \brief Unknown
253          *
254          * \details
255          * This actually is not defined in RFC 3903. We use this as a constant
256          * to indicate that an incoming PUBLISH does not fit into any of the
257          * other categories and is thus invalid.
258          */
259         SIP_PUBLISH_UNKNOWN,
260
261         /*!
262          * \brief Initial
263          *
264          * \details
265          * The first PUBLISH sent. This will contain a non-zero Expires header
266          * as well as a body that indicates the current state of the endpoint
267          * that has sent the message. The initial PUBLISH is the only type
268          * of PUBLISH to not contain a Sip-If-Match header in it.
269          */
270         SIP_PUBLISH_INITIAL,
271
272         /*!
273          * \brief Refresh
274          *
275          * \details
276          * Used to keep a published state from expiring. This will contain a
277          * non-zero Expires header but no body since its purpose is not to
278          * update state.
279          */
280         SIP_PUBLISH_REFRESH,
281
282         /*!
283          * \brief Modify
284          *
285          * \details
286          * Used to change state from its previous value. This will contain
287          * a body updating the published state. May or may not contain an
288          * Expires header.
289          */
290         SIP_PUBLISH_MODIFY,
291
292         /*!
293          * \brief Remove
294          *
295          * \details
296          * Used to remove published state from an ESC. This will contain
297          * an Expires header set to 0 and likely no body.
298          */
299         SIP_PUBLISH_REMOVE,
300 };
301
302 /*!
303  * \brief A vector of strings commonly used throughout this module
304  */
305 AST_VECTOR(resources, const char *);
306
307 /*!
308  * \brief Resource list configuration item
309  */
310 struct resource_list {
311         SORCERY_OBJECT(details);
312         /*! SIP event package the list uses. */
313         char event[32];
314         /*! Strings representing resources in the list. */
315         struct resources items;
316         /*! Indicates if Asterisk sends full or partial state on notifications. */
317         unsigned int full_state;
318         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
319         unsigned int notification_batch_interval;
320 };
321
322 /*!
323  * Used to create new entity IDs by ESCs.
324  */
325 static int esc_etag_counter;
326
327 /*!
328  * \brief Structure representing a SIP publication
329  */
330 struct ast_sip_publication {
331         /*! Publication datastores set up by handlers */
332         struct ao2_container *datastores;
333         /*! \brief Entity tag for the publication */
334         int entity_tag;
335         /*! \brief Handler for this publication */
336         struct ast_sip_publish_handler *handler;
337         /*! \brief The endpoint with which the subscription is communicating */
338         struct ast_sip_endpoint *endpoint;
339         /*! \brief Expiration time of the publication */
340         int expires;
341         /*! \brief Scheduled item for expiration of publication */
342         int sched_id;
343         /*! \brief The resource the publication is to */
344         char *resource;
345         /*! \brief The name of the event type configuration */
346         char *event_configuration_name;
347         /*! \brief Data containing the above */
348         char data[0];
349 };
350
351
352 /*!
353  * \brief Structure used for persisting an inbound subscription
354  */
355 struct subscription_persistence {
356         /*! Sorcery object details */
357         SORCERY_OBJECT(details);
358         /*! The name of the endpoint involved in the subscrption */
359         char *endpoint;
360         /*! SIP message that creates the subscription */
361         char packet[PJSIP_MAX_PKT_LEN];
362         /*! Source address of the message */
363         char src_name[PJ_INET6_ADDRSTRLEN];
364         /*! Source port of the message */
365         int src_port;
366         /*! Local transport key type */
367         char transport_key[32];
368         /*! Local transport address */
369         char local_name[PJ_INET6_ADDRSTRLEN];
370         /*! Local transport port */
371         int local_port;
372         /*! Next CSeq to use for message */
373         unsigned int cseq;
374         /*! Local tag of the dialog */
375         char *tag;
376         /*! When this subscription expires */
377         struct timeval expires;
378 };
379
380 /*!
381  * \brief A tree of SIP subscriptions
382  *
383  * Because of the ability to subscribe to resource lists, a SIP
384  * subscription can result in a tree of subscriptions being created.
385  * This structure represents the information relevant to the subscription
386  * as a whole, to include the underlying PJSIP structure for the
387  * subscription.
388  */
389 struct sip_subscription_tree {
390         /*! The endpoint with which the subscription is communicating */
391         struct ast_sip_endpoint *endpoint;
392         /*! Serializer on which to place operations for this subscription */
393         struct ast_taskprocessor *serializer;
394         /*! The role for this subscription */
395         enum ast_sip_subscription_role role;
396         /*! Persistence information */
397         struct subscription_persistence *persistence;
398         /*! The underlying PJSIP event subscription structure */
399         pjsip_evsub *evsub;
400         /*! The underlying PJSIP dialog */
401         pjsip_dialog *dlg;
402         /*! Interval to use for batching notifications */
403         unsigned int notification_batch_interval;
404         /*! Scheduler ID for batched notification */
405         int notify_sched_id;
406         /*! Indicator if scheduled batched notification should be sent */
407         unsigned int send_scheduled_notify;
408         /*! The root of the subscription tree */
409         struct ast_sip_subscription *root;
410         /*! Is this subscription to a list? */
411         int is_list;
412         /*! Next item in the list */
413         AST_LIST_ENTRY(sip_subscription_tree) next;
414 };
415
416 /*!
417  * \brief Structure representing a "virtual" SIP subscription.
418  *
419  * This structure serves a dual purpose. Structurally, it is
420  * the constructed tree of subscriptions based on the resources
421  * being subscribed to. API-wise, this serves as the handle that
422  * subscription handlers use in order to interact with the pubsub API.
423  */
424 struct ast_sip_subscription {
425         /*! Subscription datastores set up by handlers */
426         struct ao2_container *datastores;
427         /*! The handler for this subscription */
428         const struct ast_sip_subscription_handler *handler;
429         /*! Pointer to the base of the tree */
430         struct sip_subscription_tree *tree;
431         /*! Body generaator for NOTIFYs */
432         struct ast_sip_pubsub_body_generator *body_generator;
433         /*! Vector of child subscriptions */
434         AST_VECTOR(, struct ast_sip_subscription *) children;
435         /*! Saved NOTIFY body text for this subscription */
436         struct ast_str *body_text;
437         /*! Indicator that the body text has changed since the last notification */
438         int body_changed;
439         /*! The current state of the subscription */
440         pjsip_evsub_state subscription_state;
441         /*! For lists, the current version to place in the RLMI body */
442         unsigned int version;
443         /*! For lists, indicates if full state should always be communicated. */
444         unsigned int full_state;
445         /*! URI associated with the subscription */
446         pjsip_sip_uri *uri;
447         /*! Name of resource being subscribed to */
448         char resource[0];
449 };
450
451 /*!
452  * \brief Structure representing a publication resource
453  */
454 struct ast_sip_publication_resource {
455         /*! \brief Sorcery object details */
456         SORCERY_OBJECT(details);
457         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
458         char *endpoint;
459         /*! \brief Mapping for event types to configuration */
460         struct ast_variable *events;
461 };
462
463 static const char *sip_subscription_roles_map[] = {
464         [AST_SIP_SUBSCRIBER] = "Subscriber",
465         [AST_SIP_NOTIFIER] = "Notifier"
466 };
467
468 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
469
470 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
471 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
472
473 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
474 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
475                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
476 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
477                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
478 static void pubsub_on_client_refresh(pjsip_evsub *sub);
479 static void pubsub_on_server_timeout(pjsip_evsub *sub);
480  
481 static pjsip_evsub_user pubsub_cb = {
482         .on_evsub_state = pubsub_on_evsub_state,
483         .on_rx_refresh = pubsub_on_rx_refresh,
484         .on_rx_notify = pubsub_on_rx_notify,
485         .on_client_refresh = pubsub_on_client_refresh,
486         .on_server_timeout = pubsub_on_server_timeout,
487 };
488
489 /*! \brief Destructor for publication resource */
490 static void publication_resource_destroy(void *obj)
491 {
492         struct ast_sip_publication_resource *resource = obj;
493
494         ast_free(resource->endpoint);
495         ast_variables_destroy(resource->events);
496 }
497
498 /*! \brief Allocator for publication resource */
499 static void *publication_resource_alloc(const char *name)
500 {
501         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
502 }
503
504 /*! \brief Destructor for subscription persistence */
505 static void subscription_persistence_destroy(void *obj)
506 {
507         struct subscription_persistence *persistence = obj;
508
509         ast_free(persistence->endpoint);
510         ast_free(persistence->tag);
511 }
512
513 /*! \brief Allocator for subscription persistence */
514 static void *subscription_persistence_alloc(const char *name)
515 {
516         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
517 }
518
519 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
520 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
521 {
522         char tag[PJ_GUID_STRING_LENGTH + 1];
523
524         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
525          * look it up by id at all.
526          */
527         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
528                 "subscription_persistence", NULL);
529
530         pjsip_dialog *dlg = sub_tree->dlg;
531
532         if (!persistence) {
533                 return NULL;
534         }
535
536         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
537         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
538         persistence->tag = ast_strdup(tag);
539
540         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
541         return persistence;
542 }
543
544 /*! \brief Function which updates persistence information of a subscription in sorcery */
545 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
546         pjsip_rx_data *rdata)
547 {
548         pjsip_dialog *dlg;
549
550         if (!sub_tree->persistence) {
551                 return;
552         }
553
554         dlg = sub_tree->dlg;
555         sub_tree->persistence->cseq = dlg->local.cseq;
556
557         if (rdata) {
558                 int expires;
559                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
560
561                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
562                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
563
564                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
565                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
566                  * will always point to the proper SIP message that is to be processed. When updating subscription
567                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
568                  * only ever have a single SIP message on it, and so we base persistence on that.
569                  */
570                 if (rdata->msg_info.msg_buf) {
571                         ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
572                                         MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
573                 } else {
574                         ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
575                                         sizeof(sub_tree->persistence->packet));
576                 }
577                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
578                                 sizeof(sub_tree->persistence->src_name));
579                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
580                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
581                         sizeof(sub_tree->persistence->transport_key));
582                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
583                         sizeof(sub_tree->persistence->local_name));
584                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
585         }
586
587         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
588 }
589
590 /*! \brief Function which removes persistence of a subscription from sorcery */
591 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
592 {
593         if (!sub_tree->persistence) {
594                 return;
595         }
596
597         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
598         ao2_ref(sub_tree->persistence, -1);
599 }
600
601
602 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
603 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
604                 size_t num_accept, const char *body_type);
605
606 /*! \brief Retrieve a handler using the Event header of an rdata message */
607 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
608 {
609         pjsip_event_hdr *event_header;
610         char event[32];
611         struct ast_sip_subscription_handler *handler;
612
613         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
614         if (!event_header) {
615                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
616                 return NULL;
617         }
618         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
619
620         handler = find_sub_handler_for_event_name(event);
621         if (!handler) {
622                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
623         }
624
625         return handler;
626 }
627
628 /*!
629  * \brief Accept headers that are exceptions to the rule
630  *
631  * Typically, when a SUBSCRIBE arrives, we attempt to find a
632  * body generator that matches one of the Accept headers in
633  * the request. When subscribing to a single resource, this works
634  * great. However, when subscribing to a list, things work
635  * differently. Most Accept header values are fine, but there
636  * are a couple that are endemic to resource lists that need
637  * to be ignored when searching for a body generator to use
638  * for the individual resources of the subscription.
639  */
640 const char *accept_exceptions[] =  {
641         "multipart/related",
642         "application/rlmi+xml",
643 };
644
645 /*!
646  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
647  *
648  * \retval 1 This Accept header value is an exception to the rule.
649  * \retval 0 This Accept header is not an exception to the rule.
650  */
651 static int exceptional_accept(const pj_str_t *accept)
652 {
653         int i;
654
655         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
656                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
657                         return 1;
658                 }
659         }
660
661         return 0;
662 }
663
664 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
665 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
666         const struct ast_sip_subscription_handler *handler)
667 {
668         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
669         char accept[AST_SIP_MAX_ACCEPT][64];
670         size_t num_accept_headers = 0;
671
672         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
673                 int i;
674
675                 for (i = 0; i < accept_header->count; ++i) {
676                         if (!exceptional_accept(&accept_header->values[i])) {
677                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
678                                 ++num_accept_headers;
679                         }
680                 }
681         }
682
683         if (num_accept_headers == 0) {
684                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
685                  * the default accept type for the event package is to be used.
686                  */
687                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
688                 num_accept_headers = 1;
689         }
690
691         return find_body_generator(accept, num_accept_headers, handler->body_type);
692 }
693
694 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
695  *
696  *  \retval 1 rdata has an eventlist containing supported header
697  *  \retval 0 rdata doesn't have an eventlist containing supported header
698  */
699 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
700 {
701         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
702
703         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
704                 int i;
705
706                 for (i = 0; i < supported_header->count; i++) {
707                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
708                                 return 1;
709                         }
710                 }
711         }
712
713         return 0;
714 }
715
716 struct resource_tree;
717
718 /*!
719  * \brief A node for a resource tree.
720  */
721 struct tree_node {
722         AST_VECTOR(, struct tree_node *) children;
723         unsigned int full_state;
724         char resource[0];
725 };
726
727 /*!
728  * \brief Helper function for retrieving a resource list for a given event.
729  *
730  * This will retrieve a resource list that corresponds to the resource and event provided.
731  *
732  * \param resource The name of the resource list to retrieve
733  * \param event The expected event name on the resource list
734  */
735 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
736 {
737         struct resource_list *list;
738
739         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
740         if (!list) {
741                 return NULL;
742         }
743
744         if (strcmp(list->event, event)) {
745                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
746                                 resource, list->event, event);
747                 ao2_cleanup(list);
748                 return NULL;
749         }
750
751         return list;
752 }
753
754 /*!
755  * \brief Allocate a tree node
756  *
757  * In addition to allocating and initializing the tree node, the node is also added
758  * to the vector of visited resources. See \ref build_resource_tree for more information
759  * on the visited resources.
760  *
761  * \param resource The name of the resource for this tree node.
762  * \param visited The vector of resources that have been visited.
763  * \param if allocating a list, indicate whether full state is requested in notifications.
764  * \retval NULL Allocation failure.
765  * \retval non-NULL The newly-allocated tree_node
766  */
767 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
768 {
769         struct tree_node *node;
770
771         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
772         if (!node) {
773                 return NULL;
774         }
775
776         strcpy(node->resource, resource);
777         if (AST_VECTOR_INIT(&node->children, 4)) {
778                 ast_free(node);
779                 return NULL;
780         }
781         node->full_state = full_state;
782
783         if (visited) {
784                 AST_VECTOR_APPEND(visited, resource);
785         }
786         return node;
787 }
788
789 /*!
790  * \brief Destructor for a tree node
791  *
792  * This function calls recursively in order to destroy
793  * all nodes lower in the tree from the given node in
794  * addition to the node itself.
795  *
796  * \param node The node to destroy.
797  */
798 static void tree_node_destroy(struct tree_node *node)
799 {
800         int i;
801         if (!node) {
802                 return;
803         }
804
805         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
806                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
807         }
808         AST_VECTOR_FREE(&node->children);
809         ast_free(node);
810 }
811
812 /*!
813  * \brief Determine if this resource has been visited already
814  *
815  * See \ref build_resource_tree for more information
816  *
817  * \param resource The resource currently being visited
818  * \param visited The resources that have previously been visited
819  */
820 static int have_visited(const char *resource, struct resources *visited)
821 {
822         int i;
823
824         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
825                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
826                         return 1;
827                 }
828         }
829
830         return 0;
831 }
832
833 /*!
834  * \brief Build child nodes for a given parent.
835  *
836  * This iterates through the items on a resource list and creates tree nodes for each one. The
837  * tree nodes created are children of the supplied parent node. If an item in the resource
838  * list is itself a list, then this function is called recursively to provide children for
839  * the the new node.
840  *
841  * If an item in a resource list is not a list, then the supplied subscription handler is
842  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
843  * is used to determine if the node can be added to the tree or not.
844  *
845  * If a parent node ends up having no child nodes added under it, then the parent node is
846  * pruned from the tree.
847  *
848  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
849  * \param handler The subscription handler for leaf nodes in the tree.
850  * \param list The configured resource list from which the child node is being built.
851  * \param parent The parent node for these children.
852  * \param visited The resources that have already been visited.
853  */
854 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
855                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
856 {
857         int i;
858
859         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
860                 struct tree_node *current;
861                 struct resource_list *child_list;
862                 const char *resource = AST_VECTOR_GET(&list->items, i);
863
864                 if (have_visited(resource, visited)) {
865                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
866                         continue;
867                 }
868
869                 child_list = retrieve_resource_list(resource, list->event);
870                 if (!child_list) {
871                         int resp = handler->notifier->new_subscribe(endpoint, resource);
872                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
873                                 current = tree_node_alloc(resource, visited, 0);
874                                 if (!current) {
875                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
876                                                         "allocation error afterwards\n", resource);
877                                         continue;
878                                 }
879                                 ast_debug(1, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
880                                                 resource, parent->resource);
881                                 AST_VECTOR_APPEND(&parent->children, current);
882                         } else {
883                                 ast_debug(1, "Subscription to leaf resource %s resulted in error response %d\n",
884                                                 resource, resp);
885                         }
886                 } else {
887                         ast_debug(1, "Resource %s (child of %s) is a list\n", resource, parent->resource);
888                         current = tree_node_alloc(resource, visited, child_list->full_state);
889                         if (!current) {
890                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
891                                 continue;
892                         }
893                         build_node_children(endpoint, handler, child_list, current, visited);
894                         if (AST_VECTOR_SIZE(&current->children) > 0) {
895                                 ast_debug(1, "List %s had no successful children.\n", resource);
896                                 AST_VECTOR_APPEND(&parent->children, current);
897                         } else {
898                                 ast_debug(1, "List %s had successful children. Adding to parent %s\n",
899                                                 resource, parent->resource);
900                                 tree_node_destroy(current);
901                         }
902                         ao2_cleanup(child_list);
903                 }
904         }
905 }
906
907 /*!
908  * \brief A resource tree
909  *
910  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
911  * be a resource list. If this is the case, the resource list may contain resources
912  * that are themselves lists. The structure needed to hold the resources is
913  * a tree.
914  *
915  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
916  * to the individual resources in the tree would be successful or not. Any successful
917  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
918  * result in no node being created.
919  *
920  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
921  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
922  */
923 struct resource_tree {
924         struct tree_node *root;
925         unsigned int notification_batch_interval;
926 };
927
928 /*!
929  * \brief Destroy a resource tree.
930  *
931  * This function makes no assumptions about how the tree itself was
932  * allocated and does not attempt to free the tree itself. Callers
933  * of this function are responsible for freeing the tree.
934  *
935  * \param tree The tree to destroy.
936  */
937 static void resource_tree_destroy(struct resource_tree *tree)
938 {
939         if (tree) {
940                 tree_node_destroy(tree->root);
941         }
942 }
943
944 /*!
945  * \brief Build a resource tree
946  *
947  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
948  *
949  * This function also creates a container that has all resources that have been visited during
950  * creation of the tree, whether those resources resulted in a tree node being created or not.
951  * Keeping this container of visited resources allows for misconfigurations such as loops in
952  * the tree or duplicated resources to be detected.
953  *
954  * \param endpoint The endpoint that sent the SUBSCRIBE request.
955  * \param handler The subscription handler for leaf nodes in the tree.
956  * \param resource The resource requested in the SUBSCRIBE request.
957  * \param tree The tree that is to be built.
958  * \param has_eventlist_support
959  *
960  * \retval 200-299 Successfully subscribed to at least one resource.
961  * \retval 300-699 Failure to subscribe to requested resource.
962  */
963 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
964                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
965 {
966         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
967         struct resources visited;
968
969         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
970                 ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
971                 tree->root = tree_node_alloc(resource, NULL, 0);
972                 if (!tree->root) {
973                         return 500;
974                 }
975                 return handler->notifier->new_subscribe(endpoint, resource);
976         }
977
978         ast_debug(1, "Subscription to resource %s is a list\n", resource);
979         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
980                 return 500;
981         }
982
983         tree->root = tree_node_alloc(resource, &visited, list->full_state);
984         if (!tree->root) {
985                 AST_VECTOR_FREE(&visited);
986                 return 500;
987         }
988
989         tree->notification_batch_interval = list->notification_batch_interval;
990
991         build_node_children(endpoint, handler, list, tree->root, &visited);
992         AST_VECTOR_FREE(&visited);
993
994         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
995                 return 200;
996         } else {
997                 return 500;
998         }
999 }
1000
1001 static int datastore_hash(const void *obj, int flags)
1002 {
1003         const struct ast_datastore *datastore = obj;
1004         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
1005
1006         ast_assert(uid != NULL);
1007
1008         return ast_str_hash(uid);
1009 }
1010
1011 static int datastore_cmp(void *obj, void *arg, int flags)
1012 {
1013         const struct ast_datastore *datastore1 = obj;
1014         const struct ast_datastore *datastore2 = arg;
1015         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
1016
1017         ast_assert(datastore1->uid != NULL);
1018         ast_assert(uid2 != NULL);
1019
1020         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
1021 }
1022
1023 static int subscription_remove_serializer(void *obj)
1024 {
1025         struct sip_subscription_tree *sub_tree = obj;
1026
1027         /* This is why we keep the dialog on the subscription. When the subscription
1028          * is destroyed, there is no guarantee that the underlying dialog is ready
1029          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1030          * either. The dialog could be destroyed before our subscription is. We fix
1031          * this problem by keeping a reference to the dialog until it is time to
1032          * destroy the subscription. We need to have the dialog available when the
1033          * subscription is destroyed so that we can guarantee that our attempt to
1034          * remove the serializer will be successful.
1035          */
1036         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
1037         ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
1038         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1039
1040         return 0;
1041 }
1042
1043 static void add_subscription(struct sip_subscription_tree *obj)
1044 {
1045         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1046         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1047 }
1048
1049 static void remove_subscription(struct sip_subscription_tree *obj)
1050 {
1051         struct sip_subscription_tree *i;
1052         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1053         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1054                 if (i == obj) {
1055                         AST_RWLIST_REMOVE_CURRENT(next);
1056                         if (i->root) {
1057                                 ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
1058                                                 ast_sip_subscription_get_resource_name(i->root));
1059                         }
1060                         break;
1061                 }
1062         }
1063         AST_RWLIST_TRAVERSE_SAFE_END;
1064 }
1065
1066 static void subscription_destructor(void *obj)
1067 {
1068         struct ast_sip_subscription *sub = obj;
1069
1070         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1071         ast_free(sub->body_text);
1072
1073         ao2_cleanup(sub->datastores);
1074 }
1075
1076 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1077                 const char *resource, struct sip_subscription_tree *tree)
1078 {
1079         struct ast_sip_subscription *sub;
1080         pjsip_sip_uri *contact_uri;
1081
1082         sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
1083         if (!sub) {
1084                 return NULL;
1085         }
1086         strcpy(sub->resource, resource); /* Safe */
1087
1088         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1089         if (!sub->datastores) {
1090                 ao2_ref(sub, -1);
1091                 return NULL;
1092         }
1093
1094         sub->body_text = ast_str_create(128);
1095         if (!sub->body_text) {
1096                 ao2_ref(sub, -1);
1097                 return NULL;
1098         }
1099
1100         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1101         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1102         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1103         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1104
1105         sub->handler = handler;
1106         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1107         sub->tree = tree;
1108
1109         return sub;
1110 }
1111
1112 /*!
1113  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1114  *
1115  * \param handler The handler to supply to leaf subscriptions.
1116  * \param resource The requested resource for this subscription.
1117  * \param generator Body generator to use for leaf subscriptions.
1118  * \param tree The root of the subscription tree.
1119  * \param current The tree node that corresponds to the subscription being created.
1120  */
1121 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1122                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1123                 struct sip_subscription_tree *tree, struct tree_node *current)
1124 {
1125         int i;
1126         struct ast_sip_subscription *sub;
1127
1128         sub = allocate_subscription(handler, resource, tree);
1129         if (!sub) {
1130                 return NULL;
1131         }
1132
1133         sub->full_state = current->full_state;
1134         sub->body_generator = generator;
1135
1136         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1137                 struct ast_sip_subscription *child;
1138                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1139
1140                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1141                                 tree, child_node);
1142
1143                 if (!child) {
1144                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1145                                         child_node->resource);
1146                         continue;
1147                 }
1148
1149                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1150                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1151                                         child_node->resource);
1152                 }
1153         }
1154
1155         return sub;
1156 }
1157
1158 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1159 {
1160         int i;
1161
1162         if (!sub) {
1163                 return;
1164         }
1165
1166         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1167                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1168                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1169                         ao2_cleanup(AST_VECTOR_GET(&sub->children, i));
1170                 }
1171                 return;
1172         }
1173
1174         /* We notify subscription shutdown only on the tree leaves. */
1175         if (sub->handler->subscription_shutdown) {
1176                 sub->handler->subscription_shutdown(sub);
1177         }
1178 }
1179
1180 static void subscription_tree_destructor(void *obj)
1181 {
1182         struct sip_subscription_tree *sub_tree = obj;
1183
1184         remove_subscription(sub_tree);
1185
1186         subscription_persistence_remove(sub_tree);
1187         ao2_cleanup(sub_tree->endpoint);
1188
1189         if (sub_tree->dlg) {
1190                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub_tree);
1191         }
1192
1193         shutdown_subscriptions(sub_tree->root);
1194         ao2_cleanup(sub_tree->root);
1195
1196         ast_taskprocessor_unreference(sub_tree->serializer);
1197         ast_module_unref(ast_module_info->self);
1198 }
1199
1200 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1201 {
1202         /* We keep a reference to the dialog until our subscription is destroyed. See
1203          * the subscription_destructor for more details
1204          */
1205         pjsip_dlg_inc_session(dlg, &pubsub_module);
1206         sub_tree->dlg = dlg;
1207         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1208         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1209         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1210 }
1211
1212 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
1213 {
1214         struct sip_subscription_tree *sub_tree;
1215
1216         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1217         if (!sub_tree) {
1218                 return NULL;
1219         }
1220
1221         ast_module_ref(ast_module_info->self);
1222
1223         sub_tree->serializer = ast_sip_create_serializer();
1224         if (!sub_tree->serializer) {
1225                 ao2_ref(sub_tree, -1);
1226                 return NULL;
1227         }
1228
1229         sub_tree->endpoint = ao2_bump(endpoint);
1230         sub_tree->notify_sched_id = -1;
1231
1232         add_subscription(sub_tree);
1233         return sub_tree;
1234 }
1235
1236 /*!
1237  * \brief Create a subscription tree based on a resource tree.
1238  *
1239  * Using the previously-determined valid resources in the provided resource tree,
1240  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1241  * subscription tree is a real subscription, and the rest in the tree are
1242  * virtual subscriptions.
1243  *
1244  * \param handler The handler to use for leaf subscriptions
1245  * \param endpoint The endpoint that sent the SUBSCRIBE request
1246  * \param rdata The SUBSCRIBE content
1247  * \param resource The requested resource in the SUBSCRIBE request
1248  * \param generator The body generator to use in leaf subscriptions
1249  * \param tree The resource tree on which the subscription tree is based
1250  * \param dlg_status[out] The result of attempting to create a dialog.
1251  *
1252  * \retval NULL Could not create the subscription tree
1253  * \retval non-NULL The root of the created subscription tree
1254  */
1255
1256 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1257                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1258                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1259                 pj_status_t *dlg_status)
1260 {
1261         struct sip_subscription_tree *sub_tree;
1262         pjsip_dialog *dlg;
1263         struct subscription_persistence *persistence;
1264
1265         sub_tree = allocate_subscription_tree(endpoint);
1266         if (!sub_tree) {
1267                 *dlg_status = PJ_ENOMEM;
1268                 return NULL;
1269         }
1270         sub_tree->role = AST_SIP_NOTIFIER;
1271
1272         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1273         if (!dlg) {
1274                 if (*dlg_status != PJ_EEXISTS) {
1275                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1276                 }
1277                 ao2_ref(sub_tree, -1);
1278                 return NULL;
1279         }
1280
1281         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1282                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1283         if (persistence) {
1284                 /* Update the created dialog with the persisted information */
1285                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1286                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1287                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1288                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1289                 dlg->local.cseq = persistence->cseq;
1290                 dlg->remote.cseq = persistence->cseq;
1291         }
1292
1293         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1294         subscription_setup_dialog(sub_tree, dlg);
1295
1296         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1297                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1298
1299         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1300
1301         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1302         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1303                 sub_tree->is_list = 1;
1304         }
1305
1306         return sub_tree;
1307 }
1308
1309 static int generate_initial_notify(struct ast_sip_subscription *sub);
1310 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1311
1312 /*! \brief Callback function to perform the actual recreation of a subscription */
1313 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1314 {
1315         struct subscription_persistence *persistence = obj;
1316         pj_pool_t *pool = arg;
1317         pjsip_rx_data rdata = { { 0, }, };
1318         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1319         struct sip_subscription_tree *sub_tree;
1320         struct ast_sip_pubsub_body_generator *generator;
1321         int resp;
1322         char *resource;
1323         size_t resource_size;
1324         pjsip_sip_uri *request_uri;
1325         struct resource_tree tree;
1326         pjsip_expires_hdr *expires_header;
1327         struct ast_sip_subscription_handler *handler;
1328
1329         /* If this subscription has already expired remove it */
1330         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1331                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1332                 return 0;
1333         }
1334
1335         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
1336         if (!endpoint) {
1337                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
1338                         persistence->endpoint);
1339                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1340                 return 0;
1341         }
1342
1343         pj_pool_reset(pool);
1344         rdata.tp_info.pool = pool;
1345
1346         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1347                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1348                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
1349                         persistence->endpoint);
1350                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1351                 return 0;
1352         }
1353
1354         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1355                 ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n",
1356                                 ast_sorcery_object_get_id(endpoint));
1357                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1358                 return 0;
1359         }
1360
1361         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
1362         resource_size = pj_strlen(&request_uri->user) + 1;
1363         resource = ast_alloca(resource_size);
1364         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1365
1366         /* Update the expiration header with the new expiration */
1367         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
1368         if (!expires_header) {
1369                 expires_header = pjsip_expires_hdr_create(pool, 0);
1370                 if (!expires_header) {
1371                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1372                         return 0;
1373                 }
1374                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
1375         }
1376         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1377
1378         handler = subscription_get_handler_from_rdata(&rdata);
1379         if (!handler || !handler->notifier) {
1380                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1381                 return 0;
1382         }
1383
1384         generator = subscription_get_generator_from_rdata(&rdata, handler);
1385         if (!generator) {
1386                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1387                 return 0;
1388         }
1389
1390         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
1391                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1392
1393         memset(&tree, 0, sizeof(tree));
1394         resp = build_resource_tree(endpoint, handler, resource, &tree,
1395                 ast_sip_pubsub_has_eventlist_support(&rdata));
1396         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1397                 pj_status_t dlg_status;
1398
1399                 sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status);
1400                 if (!sub_tree) {
1401                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1402                         ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
1403                         return 0;
1404                 }
1405                 sub_tree->persistence = ao2_bump(persistence);
1406                 subscription_persistence_update(sub_tree, &rdata);
1407                 if (generate_initial_notify(sub_tree->root)) {
1408                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1409                 } else {
1410                         send_notify(sub_tree, 1);
1411                 }
1412         } else {
1413                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1414         }
1415         resource_tree_destroy(&tree);
1416
1417         return 0;
1418 }
1419
1420 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1421 static int subscription_persistence_load(void *data)
1422 {
1423         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1424                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1425         pj_pool_t *pool;
1426
1427         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1428                 PJSIP_POOL_RDATA_INC);
1429         if (!pool) {
1430                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1431                 return 0;
1432         }
1433
1434         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1435
1436         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1437
1438         ao2_ref(persisted_subscriptions, -1);
1439         return 0;
1440 }
1441
1442 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1443 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1444 {
1445         struct ast_json_payload *payload;
1446         const char *type;
1447
1448         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1449                 return;
1450         }
1451
1452         payload = stasis_message_data(message);
1453         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1454
1455         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1456          * recreate SIP subscriptions.
1457          */
1458         if (strcmp(type, "FullyBooted")) {
1459                 return;
1460         }
1461
1462         /* This has to be here so the subscription is recreated when the body generator is available */
1463         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1464
1465         /* Once the system is fully booted we don't care anymore */
1466         stasis_unsubscribe(sub);
1467 }
1468
1469 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1470
1471 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1472 {
1473         int num = 0;
1474         struct sip_subscription_tree *i;
1475         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1476
1477         if (!on_subscription) {
1478                 return num;
1479         }
1480
1481         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1482                 if (on_subscription(i, arg)) {
1483                         break;
1484                 }
1485                 ++num;
1486         }
1487         return num;
1488 }
1489
1490 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1491                                     struct ast_str **buf)
1492 {
1493         char str[256];
1494         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1495
1496         ast_str_append(buf, 0, "Role: %s\r\n",
1497                        sip_subscription_roles_map[sub_tree->role]);
1498         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1499                        ast_sorcery_object_get_id(sub_tree->endpoint));
1500
1501         ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1502         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1503
1504         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1505
1506         ast_callerid_merge(str, sizeof(str),
1507                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1508                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1509                            "Unknown");
1510
1511         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1512
1513         /* XXX This needs to be done recursively for lists */
1514         if (sub_tree->root->handler->to_ami) {
1515                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1516         }
1517 }
1518
1519
1520 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1521 {
1522         pjsip_dialog *dlg = sub->tree->dlg;
1523         pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1524         pj_str_t name;
1525
1526         pj_cstr(&name, header);
1527
1528         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1529 }
1530
1531 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1532                 struct ast_sip_endpoint *endpoint, const char *resource)
1533 {
1534         struct ast_sip_subscription *sub;
1535         pjsip_dialog *dlg;
1536         struct ast_sip_contact *contact;
1537         pj_str_t event;
1538         pjsip_tx_data *tdata;
1539         pjsip_evsub *evsub;
1540         struct sip_subscription_tree *sub_tree = NULL;
1541
1542         sub_tree = allocate_subscription_tree(endpoint);
1543         if (!sub_tree) {
1544                 return NULL;
1545         }
1546
1547         sub = allocate_subscription(handler, resource, sub_tree);
1548         if (!sub) {
1549                 ao2_cleanup(sub_tree);
1550                 return NULL;
1551         }
1552
1553         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1554         if (!contact || ast_strlen_zero(contact->uri)) {
1555                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1556                                 ast_sorcery_object_get_id(endpoint));
1557                 ao2_ref(sub_tree, -1);
1558                 ao2_cleanup(contact);
1559                 return NULL;
1560         }
1561
1562         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1563         ao2_cleanup(contact);
1564         if (!dlg) {
1565                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1566                 ao2_ref(sub_tree, -1);
1567                 return NULL;
1568         }
1569
1570         pj_cstr(&event, handler->event_name);
1571         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1572         subscription_setup_dialog(sub_tree, dlg);
1573
1574         evsub = sub_tree->evsub;
1575
1576         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1577                 pjsip_evsub_send_request(evsub, tdata);
1578         } else {
1579                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1580                  * being called and terminating the subscription. Therefore, we don't
1581                  * need to decrease the reference count of sub here.
1582                  */
1583                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1584                 ao2_ref(sub_tree, -1);
1585                 return NULL;
1586         }
1587
1588         return sub;
1589 }
1590
1591 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1592 {
1593         ast_assert(sub->tree->endpoint != NULL);
1594         return ao2_bump(sub->tree->endpoint);
1595 }
1596
1597 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1598 {
1599         ast_assert(sub->tree->serializer != NULL);
1600         return sub->tree->serializer;
1601 }
1602
1603 /*!
1604  * \brief Pre-allocate a buffer for the transmission
1605  *
1606  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1607  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1608  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1609  * packet, then we get told the message is too long to be sent.
1610  *
1611  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1612  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1613  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1614  * if the message will fit, and resizing the buffer as required.
1615  *
1616  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1617  * it at 64000 for a couple of reasons:
1618  * 1) Allocating more than 64K at a time is hard to justify
1619  * 2) If the message goes through proxies, those proxies will want to add Via and
1620  *    Record-Route headers, making the message even larger. Giving some space for
1621  *    those headers is a nice thing to do.
1622  *
1623  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1624  * going to impose the same 64K limit as a memory savings.
1625  *
1626  * \param tdata The tdata onto which to allocate a buffer
1627  * \retval 0 Success
1628  * \retval -1 The message is too large
1629  */
1630 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1631 {
1632         int buf_size;
1633         int size = -1;
1634         char *buf;
1635
1636         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1637                 buf = pj_pool_alloc(tdata->pool, buf_size);
1638                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1639         }
1640
1641         if (size == -1) {
1642                 return -1;
1643         }
1644
1645         tdata->buf.start = buf;
1646         tdata->buf.cur = tdata->buf.start;
1647         tdata->buf.end = tdata->buf.start + buf_size;
1648
1649         return 0;
1650 }
1651
1652 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1653 {
1654 #ifdef TEST_FRAMEWORK
1655         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1656 #endif
1657         int res;
1658
1659         if (allocate_tdata_buffer(tdata)) {
1660                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1661                 return -1;
1662         }
1663
1664         res = pjsip_evsub_send_request(sub_tree->evsub, tdata) == PJ_SUCCESS ? 0 : -1;
1665         subscription_persistence_update(sub_tree, NULL);
1666
1667         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1668                 "StateText: %s\r\n"
1669                 "Endpoint: %s\r\n",
1670                 pjsip_evsub_get_state_name(sub_tree->evsub),
1671                 ast_sorcery_object_get_id(endpoint));
1672
1673         return res;
1674 }
1675
1676 /*!
1677  * \brief Add a resource XML element to an RLMI body
1678  *
1679  * Each resource element represents a subscribed resource in the list. This function currently
1680  * will unconditionally add an instance element to each created resource element. Instance
1681  * elements refer to later parts in the multipart body.
1682  *
1683  * \param pool PJLIB allocation pool
1684  * \param cid Content-ID header of the resource
1685  * \param resource_name Name of the resource
1686  * \param resource_uri URI of the resource
1687  * \param state State of the subscribed resource
1688  */
1689 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1690                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1691 {
1692         static pj_str_t cid_name = { "cid", 3 };
1693         pj_xml_node *resource;
1694         pj_xml_node *name;
1695         pj_xml_node *instance;
1696         pj_xml_attr *cid_attr;
1697         char id[6];
1698         char uri[PJSIP_MAX_URL_SIZE];
1699
1700         /* This creates a string representing the Content-ID without the enclosing < > */
1701         const pj_str_t cid_stripped = {
1702                 .ptr = cid->hvalue.ptr + 1,
1703                 .slen = cid->hvalue.slen - 2,
1704         };
1705
1706         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1707         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1708         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1709
1710         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1711         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1712
1713         pj_strdup2(pool, &name->content, resource_name);
1714
1715         ast_generate_random_string(id, sizeof(id));
1716
1717         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1718         ast_sip_presence_xml_create_attr(pool, instance, "state",
1719                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1720
1721         /* Use the PJLIB-util XML library directly here since we are using a
1722          * pj_str_t
1723          */
1724
1725         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1726         pj_xml_add_attr(instance, cid_attr);
1727 }
1728
1729 /*!
1730  * \brief A multipart body part and meta-information
1731  *
1732  * When creating a multipart body part, the end result (the
1733  * pjsip_multipart_part) is hard to inspect without undoing
1734  * a lot of what was done to create it. Therefore, we use this
1735  * structure to store meta-information about the body part.
1736  *
1737  * The main consumer of this is the creator of the RLMI body
1738  * part of a multipart resource list body.
1739  */
1740 struct body_part {
1741         /*! Content-ID header for the body part */
1742         pjsip_generic_string_hdr *cid;
1743         /*! Subscribed resource represented in the body part */
1744         const char *resource;
1745         /*! URI for the subscribed body part */
1746         pjsip_sip_uri *uri;
1747         /*! Subscription state of the resource represented in the body part */
1748         pjsip_evsub_state state;
1749         /*! The actual body part that will be present in the multipart body */
1750         pjsip_multipart_part *part;
1751 };
1752
1753 /*!
1754  * \brief Type declaration for container of body part structures
1755  */
1756 AST_VECTOR(body_part_list, struct body_part *);
1757
1758 /*!
1759  * \brief Create a Content-ID header
1760  *
1761  * Content-ID headers are required by RFC2387 for multipart/related
1762  * bodies. They serve as identifiers for each part of the multipart body.
1763  *
1764  * \param pool PJLIB allocation pool
1765  * \param sub Subscription to a resource
1766  */
1767 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1768                 const struct ast_sip_subscription *sub)
1769 {
1770         static const pj_str_t cid_name = { "Content-ID", 10 };
1771         pjsip_generic_string_hdr *cid;
1772         char id[6];
1773         size_t alloc_size;
1774         pj_str_t cid_value;
1775
1776         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1777         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1778         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1779         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1780                         ast_generate_random_string(id, sizeof(id)),
1781                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1782         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1783
1784         return cid;
1785 }
1786
1787 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1788 {
1789         int num_printed;
1790         pj_xml_node *rlmi = msg_body->data;
1791
1792         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1793         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1794                 return -1;
1795         }
1796
1797         return num_printed;
1798 }
1799
1800 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1801 {
1802         const pj_xml_node *rlmi = data;
1803
1804         return pj_xml_clone(pool, rlmi);
1805 }
1806
1807 /*!
1808  * \brief Create an RLMI body part for a multipart resource list body
1809  *
1810  * RLMI (Resource list meta information) is a special body type that lists
1811  * the subscribed resources and tells subscribers the number of subscribed
1812  * resources and what other body parts are in the multipart body. The
1813  * RLMI body also has a version number that a subscriber can use to ensure
1814  * that the locally-stored state corresponds to server state.
1815  *
1816  * \param pool The allocation pool
1817  * \param sub The subscription representing the subscribed resource list
1818  * \param body_parts A container of body parts that RLMI will refer to
1819  * \param full_state Indicates whether this is a full or partial state notification
1820  * \return The multipart part representing the RLMI body
1821  */
1822 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1823                 struct body_part_list *body_parts, unsigned int full_state)
1824 {
1825         static const pj_str_t rlmi_type = { "application", 11 };
1826         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1827         pj_xml_node *rlmi;
1828         pj_xml_node *name;
1829         pjsip_multipart_part *rlmi_part;
1830         char version_str[32];
1831         char uri[PJSIP_MAX_URL_SIZE];
1832         pjsip_generic_string_hdr *cid;
1833         int i;
1834
1835         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1836         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1837
1838         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1839         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1840
1841         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1842         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1843         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1844
1845         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1846         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1847
1848         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1849                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1850
1851                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1852         }
1853
1854         rlmi_part = pjsip_multipart_create_part(pool);
1855
1856         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
1857         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
1858         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
1859         pj_list_init(&rlmi_part->body->content_type.param);
1860
1861         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
1862         rlmi_part->body->clone_data = rlmi_clone_data;
1863         rlmi_part->body->print_body = rlmi_print_body;
1864
1865         cid = generate_content_id_hdr(pool, sub);
1866         pj_list_insert_before(&rlmi_part->hdr, cid);
1867
1868         return rlmi_part;
1869 }
1870
1871 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
1872                 unsigned int force_full_state);
1873
1874 /*!
1875  * \brief Destroy a list of body parts
1876  *
1877  * \param parts The container of parts to destroy
1878  */
1879 static void free_body_parts(struct body_part_list *parts)
1880 {
1881         int i;
1882
1883         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
1884                 struct body_part *part = AST_VECTOR_GET(parts, i);
1885                 ast_free(part);
1886         }
1887
1888         AST_VECTOR_FREE(parts);
1889 }
1890
1891 /*!
1892  * \brief Allocate and initialize a body part structure
1893  *
1894  * \param pool PJLIB allocation pool
1895  * \param sub Subscription representing a subscribed resource
1896  */
1897 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
1898 {
1899         struct body_part *bp;
1900
1901         bp = ast_calloc(1, sizeof(*bp));
1902         if (!bp) {
1903                 return NULL;
1904         }
1905
1906         bp->cid = generate_content_id_hdr(pool, sub);
1907         bp->resource = sub->resource;
1908         bp->state = sub->subscription_state;
1909         bp->uri = sub->uri;
1910
1911         return bp;
1912 }
1913
1914 /*!
1915  * \brief Create a multipart body part for a subscribed resource
1916  *
1917  * \param pool PJLIB allocation pool
1918  * \param sub The subscription representing a subscribed resource
1919  * \param parts A vector of parts to append the created part to.
1920  * \param use_full_state Unused locally, but may be passed to other functions
1921  */
1922 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
1923                 struct body_part_list *parts, unsigned int use_full_state)
1924 {
1925         struct body_part *bp;
1926         pjsip_msg_body *body;
1927
1928         bp = allocate_body_part(pool, sub);
1929         if (!bp) {
1930                 return;
1931         }
1932
1933         body = generate_notify_body(pool, sub, use_full_state);
1934         if (!body) {
1935                 /* Partial state was requested and the resource has not changed state */
1936                 ast_free(bp);
1937                 return;
1938         }
1939
1940         bp->part = pjsip_multipart_create_part(pool);
1941         bp->part->body = body;
1942         pj_list_insert_before(&bp->part->hdr, bp->cid);
1943
1944         AST_VECTOR_APPEND(parts, bp);
1945 }
1946
1947 /*!
1948  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
1949  *
1950  * \param pool
1951  * \return The multipart message body
1952  */
1953 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
1954 {
1955         pjsip_media_type media_type;
1956         pjsip_param *media_type_param;
1957         char boundary[6];
1958         pj_str_t pj_boundary;
1959
1960         pjsip_media_type_init2(&media_type, "multipart", "related");
1961
1962         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
1963         pj_list_init(media_type_param);
1964
1965         pj_strdup2(pool, &media_type_param->name, "type");
1966         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
1967
1968         pj_list_insert_before(&media_type.param, media_type_param);
1969
1970         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
1971         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
1972 }
1973
1974 /*!
1975  * \brief Create a resource list body for NOTIFY requests
1976  *
1977  * Resource list bodies are multipart/related bodies. The first part of the multipart body
1978  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
1979  * convey state of individual subscribed resources.
1980  *
1981  * \param pool PJLIB allocation pool
1982  * \param sub Subscription details from which to generate body
1983  * \param force_full_state If true, ignore resource list settings and send a full state notification
1984  * \return The generated multipart/related body
1985  */
1986 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1987                 unsigned int force_full_state)
1988 {
1989         int i;
1990         pjsip_multipart_part *rlmi_part;
1991         pjsip_msg_body *multipart;
1992         struct body_part_list body_parts;
1993         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
1994
1995         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
1996                 return NULL;
1997         }
1998
1999         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2000                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2001         }
2002
2003         /* This can happen if issuing partial state and no children of the list have changed state */
2004         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2005                 return NULL;
2006         }
2007
2008         multipart = create_multipart_body(pool);
2009
2010         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2011         if (!rlmi_part) {
2012                 return NULL;
2013         }
2014         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2015
2016         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2017                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2018         }
2019
2020         free_body_parts(&body_parts);
2021         return multipart;
2022 }
2023
2024 /*!
2025  * \brief Create the body for a NOTIFY request.
2026  *
2027  * \param pool The pool used for allocations
2028  * \param root The root of the subscription tree
2029  * \param force_full_state If true, ignore resource list settings and send a full state notification
2030  */
2031 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2032                 unsigned int force_full_state)
2033 {
2034         pjsip_msg_body *body;
2035
2036         if (AST_VECTOR_SIZE(&root->children) == 0) {
2037                 if (force_full_state || root->body_changed) {
2038                         /* Not a list. We've already generated the body and saved it on the subscription.
2039                          * Use that directly.
2040                          */
2041                         pj_str_t type;
2042                         pj_str_t subtype;
2043                         pj_str_t text;
2044
2045                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2046                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2047                         pj_cstr(&text, ast_str_buffer(root->body_text));
2048
2049                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2050                         root->body_changed = 0;
2051                 } else {
2052                         body = NULL;
2053                 }
2054         } else {
2055                 body = generate_list_body(pool, root, force_full_state);
2056         }
2057
2058         return body;
2059 }
2060
2061 /*!
2062  * \brief Shortcut method to create a Require: eventlist header
2063  */
2064 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2065 {
2066         pjsip_require_hdr *require;
2067
2068         require = pjsip_require_hdr_create(pool);
2069         pj_strdup2(pool, &require->values[0], "eventlist");
2070         require->count = 1;
2071
2072         return require;
2073 }
2074
2075 /*!
2076  * \brief Send a NOTIFY request to a subscriber
2077  *
2078  * \param sub_tree The subscription tree representing the subscription
2079  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2080  * \retval 0 Success
2081  * \retval non-zero Failure
2082  */
2083 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2084 {
2085         pjsip_evsub *evsub = sub_tree->evsub;
2086         pjsip_tx_data *tdata;
2087
2088         if (ast_shutdown_final()
2089                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2090                 && sub_tree->persistence) {
2091                 return 0;
2092         }
2093
2094         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2095                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2096                 return -1;
2097         }
2098
2099         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2100         if (!tdata->msg->body) {
2101                 pjsip_tx_data_dec_ref(tdata);
2102                 return -1;
2103         }
2104
2105         if (sub_tree->is_list) {
2106                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2107                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2108         }
2109
2110         if (sip_subscription_send_request(sub_tree, tdata)) {
2111                 return -1;
2112         }
2113
2114         sub_tree->send_scheduled_notify = 0;
2115
2116         return 0;
2117 }
2118
2119 static int serialized_send_notify(void *userdata)
2120 {
2121         struct sip_subscription_tree *sub_tree = userdata;
2122
2123         /* It's possible that between when the notification was scheduled
2124          * and now, that a new SUBSCRIBE arrived, requiring full state to be
2125          * sent out in an immediate NOTIFY. If that has happened, we need to
2126          * bail out here instead of sending the batched NOTIFY.
2127          */
2128         if (!sub_tree->send_scheduled_notify) {
2129                 ao2_cleanup(sub_tree);
2130                 return 0;
2131         }
2132
2133         send_notify(sub_tree, 0);
2134         ast_test_suite_event_notify("SUBSCRIPTION_STATE_CHANGED",
2135                         "Resource: %s",
2136                         sub_tree->root->resource);
2137         sub_tree->notify_sched_id = -1;
2138         ao2_cleanup(sub_tree);
2139         return 0;
2140 }
2141
2142 static int sched_cb(const void *data)
2143 {
2144         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2145
2146         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2147         ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree);
2148         return 0;
2149 }
2150
2151 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2152 {
2153         /* There's already a notification scheduled */
2154         if (sub_tree->notify_sched_id > -1) {
2155                 return 0;
2156         }
2157
2158         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2159         if (sub_tree->notify_sched_id < 0) {
2160                 return -1;
2161         }
2162
2163         sub_tree->send_scheduled_notify = 1;
2164         return 0;
2165 }
2166
2167 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2168                 int terminate)
2169 {
2170         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2171                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2172                 return -1;
2173         }
2174
2175         sub->body_changed = 1;
2176         if (terminate) {
2177                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2178         }
2179
2180         if (sub->tree->notification_batch_interval) {
2181                 return schedule_notification(sub->tree);
2182         } else {
2183                 int res;
2184                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2185                 ao2_ref(sub->tree, +1);
2186                 res = send_notify(sub->tree, 0);
2187                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2188                                 "Resource: %s",
2189                                 sub->tree->root->resource);
2190                 ao2_ref(sub->tree, -1);
2191
2192                 return res;
2193         }
2194 }
2195
2196 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2197 {
2198         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2199 }
2200
2201 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2202 {
2203         pjsip_dialog *dlg = sub->tree->dlg;
2204         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2205 }
2206
2207 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2208 {
2209         return sub->resource;
2210 }
2211
2212 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2213 {
2214         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2215 }
2216
2217 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2218 {
2219         pjsip_hdr res_hdr;
2220
2221         /* If this is a persistence recreation the subscription has already been accepted */
2222         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2223                 return 0;
2224         }
2225
2226         pj_list_init(&res_hdr);
2227         if (sub_tree->is_list) {
2228                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2229                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2230         }
2231
2232         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2233 }
2234
2235 static void subscription_datastore_destroy(void *obj)
2236 {
2237         struct ast_datastore *datastore = obj;
2238
2239         /* Using the destroy function (if present) destroy the data */
2240         if (datastore->info->destroy != NULL && datastore->data != NULL) {
2241                 datastore->info->destroy(datastore->data);
2242                 datastore->data = NULL;
2243         }
2244
2245         ast_free((void *) datastore->uid);
2246         datastore->uid = NULL;
2247 }
2248
2249 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2250 {
2251         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
2252         char uuid_buf[AST_UUID_STR_LEN];
2253         const char *uid_ptr = uid;
2254
2255         if (!info) {
2256                 return NULL;
2257         }
2258
2259         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
2260         if (!datastore) {
2261                 return NULL;
2262         }
2263
2264         datastore->info = info;
2265         if (ast_strlen_zero(uid)) {
2266                 /* They didn't provide an ID so we'll provide one ourself */
2267                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
2268         }
2269
2270         datastore->uid = ast_strdup(uid_ptr);
2271         if (!datastore->uid) {
2272                 return NULL;
2273         }
2274
2275         ao2_ref(datastore, +1);
2276         return datastore;
2277 }
2278
2279 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2280 {
2281         ast_assert(datastore != NULL);
2282         ast_assert(datastore->info != NULL);
2283         ast_assert(!ast_strlen_zero(datastore->uid));
2284
2285         if (!ao2_link(subscription->datastores, datastore)) {
2286                 return -1;
2287         }
2288         return 0;
2289 }
2290
2291 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2292 {
2293         return ao2_find(subscription->datastores, name, OBJ_KEY);
2294 }
2295
2296 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2297 {
2298         ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
2299 }
2300
2301 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2302 {
2303         ast_assert(datastore != NULL);
2304         ast_assert(datastore->info != NULL);
2305         ast_assert(!ast_strlen_zero(datastore->uid));
2306
2307         if (!ao2_link(publication->datastores, datastore)) {
2308                 return -1;
2309         }
2310         return 0;
2311 }
2312
2313 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2314 {
2315         return ao2_find(publication->datastores, name, OBJ_KEY);
2316 }
2317
2318 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2319 {
2320         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
2321 }
2322
2323 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2324
2325 static int publication_hash_fn(const void *obj, const int flags)
2326 {
2327         const struct ast_sip_publication *publication = obj;
2328         const int *entity_tag = obj;
2329
2330         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2331 }
2332
2333 static int publication_cmp_fn(void *obj, void *arg, int flags)
2334 {
2335         const struct ast_sip_publication *publication1 = obj;
2336         const struct ast_sip_publication *publication2 = arg;
2337         const int *entity_tag = arg;
2338
2339         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2340                 CMP_MATCH | CMP_STOP : 0);
2341 }
2342
2343 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2344 {
2345         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2346         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2347 }
2348
2349 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2350 {
2351         if (ast_strlen_zero(handler->event_name)) {
2352                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2353                 return -1;
2354         }
2355
2356         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2357                 publication_hash_fn, publication_cmp_fn))) {
2358                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2359                         handler->event_name);
2360                 return -1;
2361         }
2362
2363         publish_add_handler(handler);
2364
2365         ast_module_ref(ast_module_info->self);
2366
2367         return 0;
2368 }
2369
2370 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2371 {
2372         struct ast_sip_publish_handler *iter;
2373         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2374         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2375                 if (handler == iter) {
2376                         AST_RWLIST_REMOVE_CURRENT(next);
2377                         ao2_cleanup(handler->publications);
2378                         ast_module_unref(ast_module_info->self);
2379                         break;
2380                 }
2381         }
2382         AST_RWLIST_TRAVERSE_SAFE_END;
2383 }
2384
2385 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2386
2387 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2388 {
2389         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2390         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2391         ast_module_ref(ast_module_info->self);
2392 }
2393
2394 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2395 {
2396         struct ast_sip_subscription_handler *iter;
2397         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2398
2399         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2400                 if (!strcmp(iter->event_name, event_name)) {
2401                         break;
2402                 }
2403         }
2404         return iter;
2405 }
2406
2407 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2408 {
2409         pj_str_t event;
2410         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2411         struct ast_sip_subscription_handler *existing;
2412         int i = 0;
2413
2414         if (ast_strlen_zero(handler->event_name)) {
2415                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2416                 return -1;
2417         }
2418
2419         existing = find_sub_handler_for_event_name(handler->event_name);
2420         if (existing) {
2421                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2422                                 "A handler is already registered\n", handler->event_name);
2423                 return -1;
2424         }
2425
2426         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2427                 pj_cstr(&accept[i], handler->accept[i]);
2428         }
2429
2430         pj_cstr(&event, handler->event_name);
2431
2432         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2433
2434         sub_add_handler(handler);
2435
2436         return 0;
2437 }
2438
2439 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2440 {
2441         struct ast_sip_subscription_handler *iter;
2442         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2443         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2444                 if (handler == iter) {
2445                         AST_RWLIST_REMOVE_CURRENT(next);
2446                         ast_module_unref(ast_module_info->self);
2447                         break;
2448                 }
2449         }
2450         AST_RWLIST_TRAVERSE_SAFE_END;
2451 }
2452
2453 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
2454                 const char *content_subtype)
2455 {
2456         struct ast_sip_pubsub_body_generator *iter;
2457         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2458
2459         AST_LIST_TRAVERSE(&body_generators, iter, list) {
2460                 if (!strcmp(iter->type, content_type) &&
2461                                 !strcmp(iter->subtype, content_subtype)) {
2462                         break;
2463                 }
2464         };
2465
2466         return iter;
2467 }
2468
2469 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2470 {
2471         char *accept_copy = ast_strdupa(accept);
2472         char *subtype = accept_copy;
2473         char *type = strsep(&subtype, "/");
2474
2475         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2476                 return NULL;
2477         }
2478
2479         return find_body_generator_type_subtype(type, subtype);
2480 }
2481
2482 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2483                 size_t num_accept, const char *body_type)
2484 {
2485         int i;
2486         struct ast_sip_pubsub_body_generator *generator = NULL;
2487
2488         for (i = 0; i < num_accept; ++i) {
2489                 generator = find_body_generator_accept(accept[i]);
2490                 if (generator) {
2491                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2492                         if (strcmp(generator->body_type, body_type)) {
2493                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2494                                                 generator->type, generator->subtype, generator);
2495                                 generator = NULL;
2496                                 continue;
2497                         }
2498                         break;
2499                 } else {
2500                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2501                 }
2502         }
2503
2504         return generator;
2505 }
2506
2507 static int generate_initial_notify(struct ast_sip_subscription *sub)
2508 {
2509         void *notify_data;
2510         int res;
2511         struct ast_sip_body_data data = {
2512                 .body_type = sub->handler->body_type,
2513         };
2514
2515         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2516                 int i;
2517
2518                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2519                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2520                                 return -1;
2521                         }
2522                 }
2523
2524                 return 0;
2525         }
2526
2527         /* We notify subscription establishment only on the tree leaves. */
2528         if (sub->handler->notifier->subscription_established(sub)) {
2529                 return -1;
2530         }
2531
2532         notify_data = sub->handler->notifier->get_notify_data(sub);
2533         if (!notify_data) {
2534                 return -1;
2535         }
2536
2537         data.body_data = notify_data;
2538
2539         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2540                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2541
2542         ao2_cleanup(notify_data);
2543
2544         return res;
2545 }
2546
2547 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2548 {
2549         pjsip_expires_hdr *expires_header;
2550         struct ast_sip_subscription_handler *handler;
2551         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2552         struct sip_subscription_tree *sub_tree;
2553         struct ast_sip_pubsub_body_generator *generator;
2554         char *resource;
2555         pjsip_uri *request_uri;
2556         pjsip_sip_uri *request_uri_sip;
2557         size_t resource_size;
2558         int resp;
2559         struct resource_tree tree;
2560         pj_status_t dlg_status;
2561
2562         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2563         ast_assert(endpoint != NULL);
2564
2565         if (!endpoint->subscription.allow) {
2566                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2567                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2568                 return PJ_TRUE;
2569         }
2570
2571         request_uri = rdata->msg_info.msg->line.req.uri;
2572
2573         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2574                 char uri_str[PJSIP_MAX_URL_SIZE];
2575
2576                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2577                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2578                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2579                 return PJ_TRUE;
2580         }
2581
2582         request_uri_sip = pjsip_uri_get_uri(request_uri);
2583         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2584         resource = ast_alloca(resource_size);
2585         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2586
2587         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2588
2589         if (expires_header) {
2590                 if (expires_header->ivalue == 0) {
2591                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2592                                 ast_sorcery_object_get_id(endpoint));
2593                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2594                                 return PJ_TRUE;
2595                 }
2596                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2597                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2598                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2599                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2600                         return PJ_TRUE;
2601                 }
2602         }
2603
2604         handler = subscription_get_handler_from_rdata(rdata);
2605         if (!handler) {
2606                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2607                 return PJ_TRUE;
2608         }
2609
2610         generator = subscription_get_generator_from_rdata(rdata, handler);
2611         if (!generator) {
2612                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2613                 return PJ_TRUE;
2614         }
2615
2616         memset(&tree, 0, sizeof(tree));
2617         resp = build_resource_tree(endpoint, handler, resource, &tree,
2618                 ast_sip_pubsub_has_eventlist_support(rdata));
2619         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2620                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2621                 resource_tree_destroy(&tree);
2622                 return PJ_TRUE;
2623         }
2624
2625         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2626         if (!sub_tree) {
2627                 if (dlg_status != PJ_EEXISTS) {
2628                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2629                 }
2630         } else {
2631                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2632                 subscription_persistence_update(sub_tree, rdata);
2633                 sip_subscription_accept(sub_tree, rdata, resp);
2634                 if (generate_initial_notify(sub_tree->root)) {
2635                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2636                 } else {
2637                         send_notify(sub_tree, 1);
2638                         ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2639                                         "Resource: %s",
2640                                         sub_tree->root->resource);
2641                 }
2642         }
2643
2644         resource_tree_destroy(&tree);
2645         return PJ_TRUE;
2646 }
2647
2648 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2649 {
2650         struct ast_sip_publish_handler *iter = NULL;
2651         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2652
2653         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2654                 if (strcmp(event, iter->event_name)) {
2655                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2656                         continue;
2657                 }
2658                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2659                 break;
2660         }
2661
2662         return iter;
2663 }
2664
2665 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2666         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2667 {
2668         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2669
2670         if (etag_hdr) {
2671                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2672
2673                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2674
2675                 if (sscanf(etag, "%30d", entity_id) != 1) {
2676                         return SIP_PUBLISH_UNKNOWN;
2677                 }
2678         }
2679
2680         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2681
2682         if (!(*expires)) {
2683                 return SIP_PUBLISH_REMOVE;
2684         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2685                 return SIP_PUBLISH_INITIAL;
2686         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2687                 return SIP_PUBLISH_REFRESH;
2688         } else if (etag_hdr && rdata->msg_info.msg->body) {
2689                 return SIP_PUBLISH_MODIFY;
2690         }
2691
2692         return SIP_PUBLISH_UNKNOWN;
2693 }
2694
2695 /*! \brief Internal destructor for publications */
2696 static void publication_destroy_fn(void *obj)
2697 {
2698         struct ast_sip_publication *publication = obj;
2699
2700         ast_debug(3, "Destroying SIP publication\n");
2701
2702         ao2_cleanup(publication->datastores);
2703         ao2_cleanup(publication->endpoint);
2704 }
2705
2706 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2707         const char *resource, const char *event_configuration_name)
2708 {
2709         struct ast_sip_publication *publication;
2710         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2711         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2712         char *dst;
2713
2714         ast_assert(endpoint != NULL);
2715
2716         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2717                 return NULL;
2718         }
2719
2720         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
2721                 ao2_ref(publication, -1);
2722                 return NULL;
2723         }
2724
2725         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2726         ao2_ref(endpoint, +1);
2727         publication->endpoint = endpoint;
2728         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2729         publication->sched_id = -1;
2730         dst = publication->data;
2731         publication->resource = strcpy(dst, resource);
2732         dst += resource_len;
2733         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2734
2735         return publication;
2736 }
2737
2738 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2739                 pjsip_rx_data *rdata)
2740 {
2741         pj_status_t status;
2742         pjsip_tx_data *tdata;
2743         pjsip_transaction *tsx;
2744
2745         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2746                 return -1;
2747         }
2748
2749         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2750                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
2751                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
2752
2753                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
2754                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
2755                         pjsip_tx_data_dec_ref(tdata);
2756                         return -1;
2757                 }
2758
2759                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
2760                 ast_sip_add_header(tdata, "Expires", expires);
2761         }
2762
2763         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
2764                 return -1;
2765         }
2766
2767         pjsip_tsx_recv_msg(tsx, rdata);
2768
2769         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2770                 return -1;
2771         }
2772
2773         return 0;
2774 }
2775
2776 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2777         struct ast_sip_publish_handler *handler)
2778 {
2779         struct ast_sip_publication *publication;
2780         char *resource_name;
2781         size_t resource_size;
2782         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2783         struct ast_variable *event_configuration_name = NULL;
2784         pjsip_uri *request_uri;
2785         pjsip_sip_uri *request_uri_sip;
2786         int resp;
2787
2788         request_uri = rdata->msg_info.msg->line.req.uri;
2789
2790         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2791                 char uri_str[PJSIP_MAX_URL_SIZE];
2792
2793                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2794                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2795                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2796                 return NULL;
2797         }
2798
2799         request_uri_sip = pjsip_uri_get_uri(request_uri);
2800         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2801         resource_name = ast_alloca(resource_size);
2802         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2803
2804         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2805         if (!resource) {
2806                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2807                 return NULL;
2808         }
2809
2810         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2811                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2812                 return NULL;
2813         }
2814
2815         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2816                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2817                         break;
2818                 }
2819         }
2820
2821         if (!event_configuration_name) {
2822                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2823                 return NULL;
2824         }
2825
2826         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
2827
2828         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2829                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2830                 return NULL;
2831         }
2832
2833         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
2834
2835         if (!publication) {
2836                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
2837                 return NULL;
2838         }
2839
2840         publication->handler = handler;
2841         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
2842                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
2843                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2844                 ao2_cleanup(publication);
2845                 return NULL;
2846         }
2847
2848         sip_publication_respond(publication, resp, rdata);
2849
2850         return publication;
2851 }
2852
2853 static int publish_expire_callback(void *data)
2854 {
2855         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
2856
2857         if (publication->handler->publish_expire) {
2858                 publication->handler->publish_expire(publication);
2859         }
2860
2861         return 0;
2862 }
2863
2864 static int publish_expire(const void *data)
2865 {
2866         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
2867
2868         ao2_unlink(publication->handler->publications, publication);
2869         publication->sched_id = -1;
2870
2871         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
2872                 ao2_cleanup(publication);
2873         }
2874
2875         return 0;
2876 }
2877
2878 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
2879 {
2880         pjsip_event_hdr *event_header;
2881         struct ast_sip_publish_handler *handler;
2882         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2883         char event[32];
2884         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
2885         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
2886         enum sip_publish_type publish_type;
2887         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
2888         int expires = 0, entity_id, response = 0;
2889
2890         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2891         ast_assert(endpoint != NULL);
2892
2893         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
2894         if (!event_header) {
2895                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
2896                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2897                 return PJ_TRUE;
2898         }
2899         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
2900
2901         handler = find_pub_handler(event);
2902         if (!handler) {
2903                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
2904                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2905                 return PJ_TRUE;
2906         }
2907
2908         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
2909
2910         /* If this is not an initial publish ensure that a publication is present */
2911         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
2912                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
2913                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
2914
2915                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
2916                                 NULL, NULL);
2917                         return PJ_TRUE;
2918                 }
2919
2920                 /* Per the RFC every response has to have a new entity tag */
2921                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2922
2923                 /* Update the expires here so that the created responses will contain the correct value */
2924                 publication->expires = expires;
2925         }
2926
2927         switch (publish_type) {
2928                 case SIP_PUBLISH_INITIAL:
2929                         publication = publish_request_initial(endpoint, rdata, handler);
2930                         break;
2931                 case SIP_PUBLISH_REFRESH:
2932                 case SIP_PUBLISH_MODIFY:
2933                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
2934                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
2935                                 /* If an error occurs we want to terminate the publication */
2936                                 expires = 0;
2937                         }
2938                         response = 200;
2939                         break;
2940                 case SIP_PUBLISH_REMOVE:
2941                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
2942                                         AST_SIP_PUBLISH_STATE_TERMINATED);
2943                         response = 200;
2944                         break;
2945                 case SIP_PUBLISH_UNKNOWN:
2946                 default:
2947                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2948                         break;
2949         }
2950
2951         if (publication) {
2952                 if (expires) {
2953                         ao2_link(handler->publications, publication);
2954
2955                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
2956                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
2957                 } else {
2958                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
2959                 }
2960         }
2961
2962         if (response) {
2963                 sip_publication_respond(publication, response, rdata);
2964         }
2965
2966         return PJ_TRUE;
2967 }
2968
2969 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
2970 {
2971         return pub->endpoint;
2972 }
2973
2974 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
2975 {
2976         return pub->resource;
2977 }
2978
2979 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
2980 {
2981         return pub->event_configuration_name;
2982 }
2983
2984 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
2985 {
2986         struct ast_sip_pubsub_body_generator *existing;
2987         pj_str_t accept;
2988         pj_size_t accept_len;
2989
2990         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
2991         if (existing) {
2992                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
2993                                 "One is already registered.\n", generator->type, generator->subtype);
2994                 return -1;
2995         }
2996
2997         AST_RWLIST_WRLOCK(&body_generators);
2998         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
2999         AST_RWLIST_UNLOCK(&body_generators);
3000
3001         /* Lengths of type and subtype plus a slash. */
3002         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3003
3004         /* Add room for null terminator that sprintf() will set. */
3005         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3006         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3007
3008         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3009                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3010
3011         return 0;
3012 }
3013
3014 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3015 {
3016         struct ast_sip_pubsub_body_generator *iter;
3017         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3018
3019         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3020                 if (iter == generator) {
3021                         AST_LIST_REMOVE_CURRENT(list);
3022                         break;
3023                 }
3024         }
3025         AST_RWLIST_TRAVERSE_SAFE_END;
3026 }
3027
3028 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3029 {
3030         AST_RWLIST_WRLOCK(&body_supplements);
3031         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3032         AST_RWLIST_UNLOCK(&body_supplements);
3033
3034         return 0;
3035 }
3036
3037 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3038 {
3039         struct ast_sip_pubsub_body_supplement *iter;
3040         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3041
3042         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3043                 if (iter == supplement) {
3044                         AST_LIST_REMOVE_CURRENT(list);
3045                         break;
3046                 }
3047         }
3048         AST_RWLIST_TRAVERSE_SAFE_END;
3049 }
3050
3051 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3052 {
3053         return sub->body_generator->type;
3054 }
3055
3056 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3057 {
3058         return sub->body_generator->subtype;
3059 }
3060
3061 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3062                 struct ast_sip_body_data *data, struct ast_str **str)
3063 {
3064         struct ast_sip_pubsub_body_supplement *supplement;
3065         struct ast_sip_pubsub_body_generator *generator;
3066         int res = 0;
3067         void *body;
3068
3069         generator = find_body_generator_type_subtype(type, subtype);
3070         if (!generator) {
3071                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3072                                 type, subtype);
3073                 return -1;
3074         }
3075
3076         if (strcmp(data->body_type, generator->body_type)) {
3077                 ast_log(LOG_WARNING, "Body generator does not accept the type of data provided\n");
3078                 return -1;
3079         }
3080
3081         body = generator->allocate_body(data->body_data);
3082         if (!body) {
3083                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
3084                                 type, subtype);
3085                 return -1;
3086         }
3087
3088         if (generator->generate_body_content(body, data->body_data)) {
3089                 res = -1;
3090                 goto end;
3091         }
3092
3093         AST_RWLIST_RDLOCK(&body_supplements);
3094         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3095                 if (!strcmp(generator->type, supplement->type) &&
3096                                 !strcmp(generator->subtype, supplement->subtype)) {
3097                         res = supplement->supplement_body(body, data->body_data);
3098                         if (res) {
3099                                 break;
3100                         }
3101                 }
3102         }
3103         AST_RWLIST_UNLOCK(&body_supplements);
3104
3105         if (!res) {
3106                 generator->to_string(body, str);
3107         }
3108
3109 end:
3110         if (generator->destroy_body) {
3111                 generator->destroy_body(body);
3112         }
3113
3114         return res;
3115 }
3116
3117 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3118 {
3119         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3120                 return pubsub_on_rx_subscribe_request(rdata);
3121         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3122                 return pubsub_on_rx_publish_request(rdata);
3123         }
3124
3125         return PJ_FALSE;
3126 }
3127
3128 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3129 {
3130         struct sip_subscription_tree *sub_tree;
3131
3132         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3133                 return;
3134         }
3135
3136         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3137         if (!sub_tree) {
3138                 return;
3139         }
3140
3141         ao2_cleanup(sub_tree);
3142
3143         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3144 }
3145
3146 static void set_state_terminated(struct ast_sip_subscription *sub)
3147 {
3148         int i;
3149
3150         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3151         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3152                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3153         }
3154 }
3155
3156 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
3157                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3158 {
3159         struct sip_subscription_tree *sub_tree;
3160
3161         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3162         if (!sub_tree) {
3163                 return;
3164         }
3165
3166         /* If sending a NOTIFY to terminate a subscription, then pubsub_on_evsub_state()
3167          * will be called when we send the NOTIFY, and that will result in dropping the
3168          * refcount of sub_tree by one, and possibly destroying the sub_tree. We need to
3169          * hold a reference to the sub_tree until this function returns so that we don't
3170          * try to read from or write to freed memory by accident
3171          */
3172         ao2_ref(sub_tree, +1);
3173
3174         if (pjsip_evsub_get_state(evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
3175                 set_state_terminated(sub_tree->root);
3176         }
3177
3178         if (send_notify(sub_tree, 1)) {
3179                 *p_st_code = 500;
3180         }
3181
3182         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3183                         "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3184                         "Resource: %s", sub_tree->root->resource);
3185
3186         if (sub_tree->is_list) {
3187                 pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
3188         }
3189
3190         ao2_ref(sub_tree, -1);
3191 }
3192
3193 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
3194                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3195 {
3196         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3197
3198         if (!sub) {
3199                 return;
3200         }
3201
3202         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
3203                         pjsip_evsub_get_state(evsub));
3204 }
3205
3206 static int serialized_pubsub_on_client_refresh(void *userdata)
3207 {
3208         struct sip_subscription_tree *sub_tree = userdata;
3209         pjsip_tx_data *tdata;
3210
3211         if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
3212                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
3213         } else {
3214                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
3215         }
3216         ao2_cleanup(sub_tree);
3217         return 0;
3218 }
3219
3220 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
3221 {
3222         struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3223
3224         ao2_ref(sub_tree, +1);
3225         ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, sub_tree);
3226 }
3227
3228 static int serialized_pubsub_on_server_timeout(void *userdata)
3229 {
3230         struct sip_subscription_tree *sub_tree = userdata;
3231
3232         set_state_terminated(sub_tree->root);
3233         send_notify(sub_tree, 1);
3234         ast_test_suite_event_notify("SUBSCRIPTION_TERMINATED",
3235                         "Resource: %s",
3236                         sub_tree->root->resource);
3237
3238         ao2_cleanup(sub_tree);
3239         return 0;
3240 }
3241
3242 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
3243 {
3244         struct sip_subscription_tree *sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3245
3246         if (!sub_tree) {
3247                 /* if a subscription has been terminated and the subscription
3248                    timeout/expires is less than the time it takes for all pending
3249                    transactions to end then the subscription timer will not have