fe16c613ab86b408c4707f8ba6467f329f20c223
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="dialog"><para>
152                                                                 This is identical to <replaceable>presence</replaceable>.
153                                                         </para></enum>
154                                                         <enum name="message-summary"><para>
155                                                                 Message-waiting indication (MWI) reporting.
156                                                         </para></enum>
157                                                 </enumlist>
158                                         </description>
159                                 </configOption>
160                                 <configOption name="list_item">
161                                         <synopsis>The name of a resource to report state on</synopsis>
162                                         <description>
163                                                 <para>In general Asterisk looks up list items in the following way:</para>
164                                                 <para>1. Check if the list item refers to another configured resource list.</para>
165                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
166                                                    to find the specified resource.</para>
167                                                 <para>The second part means that the way the list item is specified depends
168                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
169                                                 set to <literal>presence</literal>, then list items should be in the form of
170                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
171                                                 names should be listed.</para>
172                                         </description>
173                                 </configOption>
174                                 <configOption name="full_state" default="no">
175                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
176                                         <description>
177                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
178                                                 a notification that contains the state of all resources in the list. If the option is
179                                                 disabled, Asterisk will construct a notification that only contains the states of
180                                                 resources that have changed.</para>
181                                                 <note>
182                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
183                                                         to send a notification with the states of all resources in the list. When a subscriber
184                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
185                                                         notification.</para>
186                                                 </note>
187                                         </description>
188                                 </configOption>
189                                 <configOption name="notification_batch_interval" default="0">
190                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
191                                         <description>
192                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
193                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
194                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
195                                                 many notifications.</para>
196                                         </description>
197                                 </configOption>
198                         </configObject>
199                         <configObject name="inbound-publication">
200                                 <synopsis>The configuration for inbound publications</synopsis>
201                                 <configOption name="endpoint" default="">
202                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
203                                 </configOption>
204                                 <configOption name="type">
205                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
206                                 </configOption>
207                         </configObject>
208                 </configFile>
209         </configInfo>
210  ***/
211
212 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
213
214 static struct pjsip_module pubsub_module = {
215         .name = { "PubSub Module", 13 },
216         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
217         .on_rx_request = pubsub_on_rx_request,
218 };
219
220 #define MOD_DATA_PERSISTENCE "sub_persistence"
221 #define MOD_DATA_MSG "sub_msg"
222
223 static const pj_str_t str_event_name = { "Event", 5 };
224
225 /*! \brief Scheduler used for automatically expiring publications */
226 static struct ast_sched_context *sched;
227
228 /*! \brief Number of buckets for publications (on a per handler) */
229 #define PUBLICATIONS_BUCKETS 37
230
231 /*! \brief Default expiration time for PUBLISH if one is not specified */
232 #define DEFAULT_PUBLISH_EXPIRES 3600
233
234 /*! \brief Number of buckets for subscription datastore */
235 #define DATASTORE_BUCKETS 53
236
237 /*! \brief Default expiration for subscriptions */
238 #define DEFAULT_EXPIRES 3600
239
240 /*! \brief Defined method for PUBLISH */
241 const pjsip_method pjsip_publish_method =
242 {
243         PJSIP_OTHER_METHOD,
244         { "PUBLISH", 7 }
245 };
246
247 /*!
248  * \brief The types of PUBLISH messages defined in RFC 3903
249  */
250 enum sip_publish_type {
251         /*!
252          * \brief Unknown
253          *
254          * \details
255          * This actually is not defined in RFC 3903. We use this as a constant
256          * to indicate that an incoming PUBLISH does not fit into any of the
257          * other categories and is thus invalid.
258          */
259         SIP_PUBLISH_UNKNOWN,
260
261         /*!
262          * \brief Initial
263          *
264          * \details
265          * The first PUBLISH sent. This will contain a non-zero Expires header
266          * as well as a body that indicates the current state of the endpoint
267          * that has sent the message. The initial PUBLISH is the only type
268          * of PUBLISH to not contain a Sip-If-Match header in it.
269          */
270         SIP_PUBLISH_INITIAL,
271
272         /*!
273          * \brief Refresh
274          *
275          * \details
276          * Used to keep a published state from expiring. This will contain a
277          * non-zero Expires header but no body since its purpose is not to
278          * update state.
279          */
280         SIP_PUBLISH_REFRESH,
281
282         /*!
283          * \brief Modify
284          *
285          * \details
286          * Used to change state from its previous value. This will contain
287          * a body updating the published state. May or may not contain an
288          * Expires header.
289          */
290         SIP_PUBLISH_MODIFY,
291
292         /*!
293          * \brief Remove
294          *
295          * \details
296          * Used to remove published state from an ESC. This will contain
297          * an Expires header set to 0 and likely no body.
298          */
299         SIP_PUBLISH_REMOVE,
300 };
301
302 /*!
303  * \brief A vector of strings commonly used throughout this module
304  */
305 AST_VECTOR(resources, const char *);
306
307 /*!
308  * \brief Resource list configuration item
309  */
310 struct resource_list {
311         SORCERY_OBJECT(details);
312         /*! SIP event package the list uses. */
313         char event[32];
314         /*! Strings representing resources in the list. */
315         struct resources items;
316         /*! Indicates if Asterisk sends full or partial state on notifications. */
317         unsigned int full_state;
318         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
319         unsigned int notification_batch_interval;
320 };
321
322 /*!
323  * Used to create new entity IDs by ESCs.
324  */
325 static int esc_etag_counter;
326
327 /*!
328  * \brief Structure representing a SIP publication
329  */
330 struct ast_sip_publication {
331         /*! Publication datastores set up by handlers */
332         struct ao2_container *datastores;
333         /*! \brief Entity tag for the publication */
334         int entity_tag;
335         /*! \brief Handler for this publication */
336         struct ast_sip_publish_handler *handler;
337         /*! \brief The endpoint with which the subscription is communicating */
338         struct ast_sip_endpoint *endpoint;
339         /*! \brief Expiration time of the publication */
340         int expires;
341         /*! \brief Scheduled item for expiration of publication */
342         int sched_id;
343         /*! \brief The resource the publication is to */
344         char *resource;
345         /*! \brief The name of the event type configuration */
346         char *event_configuration_name;
347         /*! \brief Data containing the above */
348         char data[0];
349 };
350
351
352 /*!
353  * \brief Structure used for persisting an inbound subscription
354  */
355 struct subscription_persistence {
356         /*! Sorcery object details */
357         SORCERY_OBJECT(details);
358         /*! The name of the endpoint involved in the subscription */
359         char *endpoint;
360         /*! SIP message that creates the subscription */
361         char packet[PJSIP_MAX_PKT_LEN];
362         /*! Source address of the message */
363         char src_name[PJ_INET6_ADDRSTRLEN];
364         /*! Source port of the message */
365         int src_port;
366         /*! Local transport key type */
367         char transport_key[32];
368         /*! Local transport address */
369         char local_name[PJ_INET6_ADDRSTRLEN];
370         /*! Local transport port */
371         int local_port;
372         /*! Next CSeq to use for message */
373         unsigned int cseq;
374         /*! Local tag of the dialog */
375         char *tag;
376         /*! When this subscription expires */
377         struct timeval expires;
378 };
379
380 /*!
381  * \brief The state of the subscription tree
382  */
383 enum sip_subscription_tree_state {
384         /*! Normal operation */
385         SIP_SUB_TREE_NORMAL = 0,
386         /*! A terminate has been requested by Asterisk, the client, or pjproject */
387         SIP_SUB_TREE_TERMINATE_PENDING,
388         /*! The terminate is in progress */
389         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
390         /*! The terminate process has finished and the subscription tree is no longer valid */
391         SIP_SUB_TREE_TERMINATED,
392 };
393
394 /*!
395  * \brief A tree of SIP subscriptions
396  *
397  * Because of the ability to subscribe to resource lists, a SIP
398  * subscription can result in a tree of subscriptions being created.
399  * This structure represents the information relevant to the subscription
400  * as a whole, to include the underlying PJSIP structure for the
401  * subscription.
402  */
403 struct sip_subscription_tree {
404         /*! The endpoint with which the subscription is communicating */
405         struct ast_sip_endpoint *endpoint;
406         /*! Serializer on which to place operations for this subscription */
407         struct ast_taskprocessor *serializer;
408         /*! The role for this subscription */
409         enum ast_sip_subscription_role role;
410         /*! Persistence information */
411         struct subscription_persistence *persistence;
412         /*! The underlying PJSIP event subscription structure */
413         pjsip_evsub *evsub;
414         /*! The underlying PJSIP dialog */
415         pjsip_dialog *dlg;
416         /*! Interval to use for batching notifications */
417         unsigned int notification_batch_interval;
418         /*! Scheduler ID for batched notification */
419         int notify_sched_id;
420         /*! Indicator if scheduled batched notification should be sent */
421         unsigned int send_scheduled_notify;
422         /*! The root of the subscription tree */
423         struct ast_sip_subscription *root;
424         /*! Is this subscription to a list? */
425         int is_list;
426         /*! Next item in the list */
427         AST_LIST_ENTRY(sip_subscription_tree) next;
428         /*! Subscription tree state */
429         enum sip_subscription_tree_state state;
430 };
431
432 /*!
433  * \brief Structure representing a "virtual" SIP subscription.
434  *
435  * This structure serves a dual purpose. Structurally, it is
436  * the constructed tree of subscriptions based on the resources
437  * being subscribed to. API-wise, this serves as the handle that
438  * subscription handlers use in order to interact with the pubsub API.
439  */
440 struct ast_sip_subscription {
441         /*! Subscription datastores set up by handlers */
442         struct ao2_container *datastores;
443         /*! The handler for this subscription */
444         const struct ast_sip_subscription_handler *handler;
445         /*! Pointer to the base of the tree */
446         struct sip_subscription_tree *tree;
447         /*! Body generaator for NOTIFYs */
448         struct ast_sip_pubsub_body_generator *body_generator;
449         /*! Vector of child subscriptions */
450         AST_VECTOR(, struct ast_sip_subscription *) children;
451         /*! Saved NOTIFY body text for this subscription */
452         struct ast_str *body_text;
453         /*! Indicator that the body text has changed since the last notification */
454         int body_changed;
455         /*! The current state of the subscription */
456         pjsip_evsub_state subscription_state;
457         /*! For lists, the current version to place in the RLMI body */
458         unsigned int version;
459         /*! For lists, indicates if full state should always be communicated. */
460         unsigned int full_state;
461         /*! URI associated with the subscription */
462         pjsip_sip_uri *uri;
463         /*! Name of resource being subscribed to */
464         char resource[0];
465 };
466
467 /*!
468  * \brief Structure representing a publication resource
469  */
470 struct ast_sip_publication_resource {
471         /*! \brief Sorcery object details */
472         SORCERY_OBJECT(details);
473         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
474         char *endpoint;
475         /*! \brief Mapping for event types to configuration */
476         struct ast_variable *events;
477 };
478
479 static const char *sip_subscription_roles_map[] = {
480         [AST_SIP_SUBSCRIBER] = "Subscriber",
481         [AST_SIP_NOTIFIER] = "Notifier"
482 };
483
484 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
485
486 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
487 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
488
489 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
490 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
491                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
492 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
493                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
494 static void pubsub_on_client_refresh(pjsip_evsub *sub);
495 static void pubsub_on_server_timeout(pjsip_evsub *sub);
496  
497 static pjsip_evsub_user pubsub_cb = {
498         .on_evsub_state = pubsub_on_evsub_state,
499         .on_rx_refresh = pubsub_on_rx_refresh,
500         .on_rx_notify = pubsub_on_rx_notify,
501         .on_client_refresh = pubsub_on_client_refresh,
502         .on_server_timeout = pubsub_on_server_timeout,
503 };
504
505 /*! \brief Destructor for publication resource */
506 static void publication_resource_destroy(void *obj)
507 {
508         struct ast_sip_publication_resource *resource = obj;
509
510         ast_free(resource->endpoint);
511         ast_variables_destroy(resource->events);
512 }
513
514 /*! \brief Allocator for publication resource */
515 static void *publication_resource_alloc(const char *name)
516 {
517         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
518 }
519
520 /*! \brief Destructor for subscription persistence */
521 static void subscription_persistence_destroy(void *obj)
522 {
523         struct subscription_persistence *persistence = obj;
524
525         ast_free(persistence->endpoint);
526         ast_free(persistence->tag);
527 }
528
529 /*! \brief Allocator for subscription persistence */
530 static void *subscription_persistence_alloc(const char *name)
531 {
532         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
533 }
534
535 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
536 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
537 {
538         char tag[PJ_GUID_STRING_LENGTH + 1];
539
540         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
541          * look it up by id at all.
542          */
543         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
544                 "subscription_persistence", NULL);
545
546         pjsip_dialog *dlg = sub_tree->dlg;
547
548         if (!persistence) {
549                 return NULL;
550         }
551
552         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
553         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
554         persistence->tag = ast_strdup(tag);
555
556         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
557         return persistence;
558 }
559
560 /*! \brief Function which updates persistence information of a subscription in sorcery */
561 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
562         pjsip_rx_data *rdata)
563 {
564         pjsip_dialog *dlg;
565
566         if (!sub_tree->persistence) {
567                 return;
568         }
569
570         dlg = sub_tree->dlg;
571         sub_tree->persistence->cseq = dlg->local.cseq;
572
573         if (rdata) {
574                 int expires;
575                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
576
577                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
578                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
579
580                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
581                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
582                  * will always point to the proper SIP message that is to be processed. When updating subscription
583                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
584                  * only ever have a single SIP message on it, and so we base persistence on that.
585                  */
586                 if (rdata->msg_info.msg_buf) {
587                         ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
588                                         MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
589                 } else {
590                         ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
591                                         sizeof(sub_tree->persistence->packet));
592                 }
593                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
594                                 sizeof(sub_tree->persistence->src_name));
595                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
596                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
597                         sizeof(sub_tree->persistence->transport_key));
598                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
599                         sizeof(sub_tree->persistence->local_name));
600                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
601         }
602
603         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
604 }
605
606 /*! \brief Function which removes persistence of a subscription from sorcery */
607 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
608 {
609         if (!sub_tree->persistence) {
610                 return;
611         }
612
613         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
614         ao2_ref(sub_tree->persistence, -1);
615         sub_tree->persistence = NULL;
616 }
617
618
619 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
620 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
621                 size_t num_accept, const char *body_type);
622
623 /*! \brief Retrieve a handler using the Event header of an rdata message */
624 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
625 {
626         pjsip_event_hdr *event_header;
627         char event[32];
628         struct ast_sip_subscription_handler *handler;
629
630         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
631         if (!event_header) {
632                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
633                 return NULL;
634         }
635         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
636
637         handler = find_sub_handler_for_event_name(event);
638         if (!handler) {
639                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
640         }
641
642         return handler;
643 }
644
645 /*!
646  * \brief Accept headers that are exceptions to the rule
647  *
648  * Typically, when a SUBSCRIBE arrives, we attempt to find a
649  * body generator that matches one of the Accept headers in
650  * the request. When subscribing to a single resource, this works
651  * great. However, when subscribing to a list, things work
652  * differently. Most Accept header values are fine, but there
653  * are a couple that are endemic to resource lists that need
654  * to be ignored when searching for a body generator to use
655  * for the individual resources of the subscription.
656  */
657 const char *accept_exceptions[] =  {
658         "multipart/related",
659         "application/rlmi+xml",
660 };
661
662 /*!
663  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
664  *
665  * \retval 1 This Accept header value is an exception to the rule.
666  * \retval 0 This Accept header is not an exception to the rule.
667  */
668 static int exceptional_accept(const pj_str_t *accept)
669 {
670         int i;
671
672         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
673                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
674                         return 1;
675                 }
676         }
677
678         return 0;
679 }
680
681 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
682 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
683         const struct ast_sip_subscription_handler *handler)
684 {
685         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
686         char accept[AST_SIP_MAX_ACCEPT][64];
687         size_t num_accept_headers = 0;
688
689         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
690                 int i;
691
692                 for (i = 0; i < accept_header->count; ++i) {
693                         if (!exceptional_accept(&accept_header->values[i])) {
694                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
695                                 ++num_accept_headers;
696                         }
697                 }
698         }
699
700         if (num_accept_headers == 0) {
701                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
702                  * the default accept type for the event package is to be used.
703                  */
704                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
705                 num_accept_headers = 1;
706         }
707
708         return find_body_generator(accept, num_accept_headers, handler->body_type);
709 }
710
711 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
712  *
713  *  \retval 1 rdata has an eventlist containing supported header
714  *  \retval 0 rdata doesn't have an eventlist containing supported header
715  */
716 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
717 {
718         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
719
720         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
721                 int i;
722
723                 for (i = 0; i < supported_header->count; i++) {
724                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
725                                 return 1;
726                         }
727                 }
728         }
729
730         return 0;
731 }
732
733 struct resource_tree;
734
735 /*!
736  * \brief A node for a resource tree.
737  */
738 struct tree_node {
739         AST_VECTOR(, struct tree_node *) children;
740         unsigned int full_state;
741         char resource[0];
742 };
743
744 /*!
745  * \brief Helper function for retrieving a resource list for a given event.
746  *
747  * This will retrieve a resource list that corresponds to the resource and event provided.
748  *
749  * \param resource The name of the resource list to retrieve
750  * \param event The expected event name on the resource list
751  */
752 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
753 {
754         struct resource_list *list;
755
756         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
757         if (!list) {
758                 return NULL;
759         }
760
761         if (strcmp(list->event, event)) {
762                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
763                                 resource, list->event, event);
764                 ao2_cleanup(list);
765                 return NULL;
766         }
767
768         return list;
769 }
770
771 /*!
772  * \brief Allocate a tree node
773  *
774  * In addition to allocating and initializing the tree node, the node is also added
775  * to the vector of visited resources. See \ref build_resource_tree for more information
776  * on the visited resources.
777  *
778  * \param resource The name of the resource for this tree node.
779  * \param visited The vector of resources that have been visited.
780  * \param if allocating a list, indicate whether full state is requested in notifications.
781  * \retval NULL Allocation failure.
782  * \retval non-NULL The newly-allocated tree_node
783  */
784 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
785 {
786         struct tree_node *node;
787
788         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
789         if (!node) {
790                 return NULL;
791         }
792
793         strcpy(node->resource, resource);
794         if (AST_VECTOR_INIT(&node->children, 4)) {
795                 ast_free(node);
796                 return NULL;
797         }
798         node->full_state = full_state;
799
800         if (visited) {
801                 AST_VECTOR_APPEND(visited, resource);
802         }
803         return node;
804 }
805
806 /*!
807  * \brief Destructor for a tree node
808  *
809  * This function calls recursively in order to destroy
810  * all nodes lower in the tree from the given node in
811  * addition to the node itself.
812  *
813  * \param node The node to destroy.
814  */
815 static void tree_node_destroy(struct tree_node *node)
816 {
817         int i;
818         if (!node) {
819                 return;
820         }
821
822         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
823                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
824         }
825         AST_VECTOR_FREE(&node->children);
826         ast_free(node);
827 }
828
829 /*!
830  * \brief Determine if this resource has been visited already
831  *
832  * See \ref build_resource_tree for more information
833  *
834  * \param resource The resource currently being visited
835  * \param visited The resources that have previously been visited
836  */
837 static int have_visited(const char *resource, struct resources *visited)
838 {
839         int i;
840
841         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
842                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
843                         return 1;
844                 }
845         }
846
847         return 0;
848 }
849
850 /*!
851  * \brief Build child nodes for a given parent.
852  *
853  * This iterates through the items on a resource list and creates tree nodes for each one. The
854  * tree nodes created are children of the supplied parent node. If an item in the resource
855  * list is itself a list, then this function is called recursively to provide children for
856  * the the new node.
857  *
858  * If an item in a resource list is not a list, then the supplied subscription handler is
859  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
860  * is used to determine if the node can be added to the tree or not.
861  *
862  * If a parent node ends up having no child nodes added under it, then the parent node is
863  * pruned from the tree.
864  *
865  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
866  * \param handler The subscription handler for leaf nodes in the tree.
867  * \param list The configured resource list from which the child node is being built.
868  * \param parent The parent node for these children.
869  * \param visited The resources that have already been visited.
870  */
871 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
872                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
873 {
874         int i;
875
876         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
877                 struct tree_node *current;
878                 struct resource_list *child_list;
879                 const char *resource = AST_VECTOR_GET(&list->items, i);
880
881                 if (have_visited(resource, visited)) {
882                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
883                         continue;
884                 }
885
886                 child_list = retrieve_resource_list(resource, list->event);
887                 if (!child_list) {
888                         int resp = handler->notifier->new_subscribe(endpoint, resource);
889                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
890                                 current = tree_node_alloc(resource, visited, 0);
891                                 if (!current) {
892                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
893                                                         "allocation error afterwards\n", resource);
894                                         continue;
895                                 }
896                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
897                                                 resource, parent->resource);
898                                 AST_VECTOR_APPEND(&parent->children, current);
899                         } else {
900                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
901                                                 resource, resp);
902                         }
903                 } else {
904                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
905                         current = tree_node_alloc(resource, visited, child_list->full_state);
906                         if (!current) {
907                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
908                                 continue;
909                         }
910                         build_node_children(endpoint, handler, child_list, current, visited);
911                         if (AST_VECTOR_SIZE(&current->children) > 0) {
912                                 ast_debug(1, "List %s had no successful children.\n", resource);
913                                 AST_VECTOR_APPEND(&parent->children, current);
914                         } else {
915                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
916                                                 resource, parent->resource);
917                                 tree_node_destroy(current);
918                         }
919                         ao2_cleanup(child_list);
920                 }
921         }
922 }
923
924 /*!
925  * \brief A resource tree
926  *
927  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
928  * be a resource list. If this is the case, the resource list may contain resources
929  * that are themselves lists. The structure needed to hold the resources is
930  * a tree.
931  *
932  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
933  * to the individual resources in the tree would be successful or not. Any successful
934  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
935  * result in no node being created.
936  *
937  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
938  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
939  */
940 struct resource_tree {
941         struct tree_node *root;
942         unsigned int notification_batch_interval;
943 };
944
945 /*!
946  * \brief Destroy a resource tree.
947  *
948  * This function makes no assumptions about how the tree itself was
949  * allocated and does not attempt to free the tree itself. Callers
950  * of this function are responsible for freeing the tree.
951  *
952  * \param tree The tree to destroy.
953  */
954 static void resource_tree_destroy(struct resource_tree *tree)
955 {
956         if (tree) {
957                 tree_node_destroy(tree->root);
958         }
959 }
960
961 /*!
962  * \brief Build a resource tree
963  *
964  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
965  *
966  * This function also creates a container that has all resources that have been visited during
967  * creation of the tree, whether those resources resulted in a tree node being created or not.
968  * Keeping this container of visited resources allows for misconfigurations such as loops in
969  * the tree or duplicated resources to be detected.
970  *
971  * \param endpoint The endpoint that sent the SUBSCRIBE request.
972  * \param handler The subscription handler for leaf nodes in the tree.
973  * \param resource The resource requested in the SUBSCRIBE request.
974  * \param tree The tree that is to be built.
975  * \param has_eventlist_support
976  *
977  * \retval 200-299 Successfully subscribed to at least one resource.
978  * \retval 300-699 Failure to subscribe to requested resource.
979  */
980 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
981                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
982 {
983         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
984         struct resources visited;
985
986         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
987                 ast_debug(2, "Subscription to resource %s is not to a list\n", resource);
988                 tree->root = tree_node_alloc(resource, NULL, 0);
989                 if (!tree->root) {
990                         return 500;
991                 }
992                 return handler->notifier->new_subscribe(endpoint, resource);
993         }
994
995         ast_debug(2, "Subscription to resource %s is a list\n", resource);
996         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
997                 return 500;
998         }
999
1000         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1001         if (!tree->root) {
1002                 AST_VECTOR_FREE(&visited);
1003                 return 500;
1004         }
1005
1006         tree->notification_batch_interval = list->notification_batch_interval;
1007
1008         build_node_children(endpoint, handler, list, tree->root, &visited);
1009         AST_VECTOR_FREE(&visited);
1010
1011         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1012                 return 200;
1013         } else {
1014                 return 500;
1015         }
1016 }
1017
1018 static void add_subscription(struct sip_subscription_tree *obj)
1019 {
1020         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1021         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1022 }
1023
1024 static void remove_subscription(struct sip_subscription_tree *obj)
1025 {
1026         struct sip_subscription_tree *i;
1027         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1028         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1029                 if (i == obj) {
1030                         AST_RWLIST_REMOVE_CURRENT(next);
1031                         if (i->root) {
1032                                 ast_debug(2, "Removing subscription to resource %s from list of subscriptions\n",
1033                                                 ast_sip_subscription_get_resource_name(i->root));
1034                         }
1035                         break;
1036                 }
1037         }
1038         AST_RWLIST_TRAVERSE_SAFE_END;
1039 }
1040
1041 static void destroy_subscription(struct ast_sip_subscription *sub)
1042 {
1043         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1044         ast_free(sub->body_text);
1045
1046         AST_VECTOR_FREE(&sub->children);
1047         ao2_cleanup(sub->datastores);
1048         ast_free(sub);
1049 }
1050
1051 static void destroy_subscriptions(struct ast_sip_subscription *root)
1052 {
1053         int i;
1054
1055         if (!root) {
1056                 return;
1057         }
1058
1059         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1060                 struct ast_sip_subscription *child;
1061
1062                 child = AST_VECTOR_GET(&root->children, i);
1063                 destroy_subscriptions(child);
1064         }
1065
1066         destroy_subscription(root);
1067 }
1068
1069 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1070                 const char *resource, struct sip_subscription_tree *tree)
1071 {
1072         struct ast_sip_subscription *sub;
1073         pjsip_sip_uri *contact_uri;
1074
1075         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1076         if (!sub) {
1077                 return NULL;
1078         }
1079         strcpy(sub->resource, resource); /* Safe */
1080
1081         sub->datastores = ast_datastores_alloc();
1082         if (!sub->datastores) {
1083                 destroy_subscription(sub);
1084                 return NULL;
1085         }
1086
1087         sub->body_text = ast_str_create(128);
1088         if (!sub->body_text) {
1089                 destroy_subscription(sub);
1090                 return NULL;
1091         }
1092
1093         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1094         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1095         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1096         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1097
1098         sub->handler = handler;
1099         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1100         sub->tree = ao2_bump(tree);
1101
1102         return sub;
1103 }
1104
1105 /*!
1106  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1107  *
1108  * \param handler The handler to supply to leaf subscriptions.
1109  * \param resource The requested resource for this subscription.
1110  * \param generator Body generator to use for leaf subscriptions.
1111  * \param tree The root of the subscription tree.
1112  * \param current The tree node that corresponds to the subscription being created.
1113  */
1114 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1115                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1116                 struct sip_subscription_tree *tree, struct tree_node *current)
1117 {
1118         int i;
1119         struct ast_sip_subscription *sub;
1120
1121         sub = allocate_subscription(handler, resource, tree);
1122         if (!sub) {
1123                 return NULL;
1124         }
1125
1126         sub->full_state = current->full_state;
1127         sub->body_generator = generator;
1128         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1129
1130         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1131                 struct ast_sip_subscription *child;
1132                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1133
1134                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1135                                 tree, child_node);
1136
1137                 if (!child) {
1138                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1139                                         child_node->resource);
1140                         continue;
1141                 }
1142
1143                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1144                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1145                                         child_node->resource);
1146                 }
1147         }
1148
1149         return sub;
1150 }
1151
1152 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1153 {
1154         int i;
1155
1156         if (!sub) {
1157                 return;
1158         }
1159
1160         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1161                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1162                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1163                 }
1164                 return;
1165         }
1166
1167         /* We notify subscription shutdown only on the tree leaves. */
1168         if (sub->handler->subscription_shutdown) {
1169                 sub->handler->subscription_shutdown(sub);
1170         }
1171 }
1172 static int subscription_unreference_dialog(void *obj)
1173 {
1174         struct sip_subscription_tree *sub_tree = obj;
1175
1176         /* This is why we keep the dialog on the subscription. When the subscription
1177          * is destroyed, there is no guarantee that the underlying dialog is ready
1178          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1179          * either. The dialog could be destroyed before our subscription is. We fix
1180          * this problem by keeping a reference to the dialog until it is time to
1181          * destroy the subscription. We need to have the dialog available when the
1182          * subscription is destroyed so that we can guarantee that our attempt to
1183          * remove the serializer will be successful.
1184          */
1185         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1186         sub_tree->dlg = NULL;
1187
1188         return 0;
1189 }
1190
1191 static void subscription_tree_destructor(void *obj)
1192 {
1193         struct sip_subscription_tree *sub_tree = obj;
1194
1195         ast_debug(3, "Destroying subscription tree %p\n", sub_tree);
1196
1197         ao2_cleanup(sub_tree->endpoint);
1198
1199         destroy_subscriptions(sub_tree->root);
1200
1201         if (sub_tree->dlg) {
1202                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1203         }
1204
1205         ast_taskprocessor_unreference(sub_tree->serializer);
1206         ast_module_unref(ast_module_info->self);
1207 }
1208
1209 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1210 {
1211         ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree);
1212         ao2_cleanup(sub->tree);
1213 }
1214
1215 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1216 {
1217         sub_tree->dlg = dlg;
1218         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1219         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1220         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1221         pjsip_dlg_inc_session(dlg, &pubsub_module);
1222 }
1223
1224 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1225 {
1226         struct sip_subscription_tree *sub_tree;
1227
1228         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1229         if (!sub_tree) {
1230                 return NULL;
1231         }
1232
1233         ast_module_ref(ast_module_info->self);
1234
1235         if (rdata) {
1236                 /*
1237                  * We must continue using the serializer that the original
1238                  * SUBSCRIBE came in on for the dialog.  There may be
1239                  * retransmissions already enqueued in the original
1240                  * serializer that can result in reentrancy and message
1241                  * sequencing problems.
1242                  */
1243                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1244         } else {
1245                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1246
1247                 /* Create name with seq number appended. */
1248                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1249                         ast_sorcery_object_get_id(endpoint));
1250
1251                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1252         }
1253         if (!sub_tree->serializer) {
1254                 ao2_ref(sub_tree, -1);
1255                 return NULL;
1256         }
1257
1258         sub_tree->endpoint = ao2_bump(endpoint);
1259         sub_tree->notify_sched_id = -1;
1260
1261         return sub_tree;
1262 }
1263
1264 /*!
1265  * \brief Create a subscription tree based on a resource tree.
1266  *
1267  * Using the previously-determined valid resources in the provided resource tree,
1268  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1269  * subscription tree is a real subscription, and the rest in the tree are
1270  * virtual subscriptions.
1271  *
1272  * \param handler The handler to use for leaf subscriptions
1273  * \param endpoint The endpoint that sent the SUBSCRIBE request
1274  * \param rdata The SUBSCRIBE content
1275  * \param resource The requested resource in the SUBSCRIBE request
1276  * \param generator The body generator to use in leaf subscriptions
1277  * \param tree The resource tree on which the subscription tree is based
1278  * \param dlg_status[out] The result of attempting to create a dialog.
1279  *
1280  * \retval NULL Could not create the subscription tree
1281  * \retval non-NULL The root of the created subscription tree
1282  */
1283
1284 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1285                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1286                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1287                 pj_status_t *dlg_status)
1288 {
1289         struct sip_subscription_tree *sub_tree;
1290         pjsip_dialog *dlg;
1291         struct subscription_persistence *persistence;
1292
1293         sub_tree = allocate_subscription_tree(endpoint, rdata);
1294         if (!sub_tree) {
1295                 *dlg_status = PJ_ENOMEM;
1296                 return NULL;
1297         }
1298         sub_tree->role = AST_SIP_NOTIFIER;
1299
1300         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1301         if (!dlg) {
1302                 if (*dlg_status != PJ_EEXISTS) {
1303                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1304                 }
1305                 ao2_ref(sub_tree, -1);
1306                 return NULL;
1307         }
1308
1309         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1310                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1311         if (persistence) {
1312                 /* Update the created dialog with the persisted information */
1313                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1314                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1315                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1316                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1317                 dlg->local.cseq = persistence->cseq;
1318                 dlg->remote.cseq = persistence->cseq;
1319         }
1320
1321         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1322         subscription_setup_dialog(sub_tree, dlg);
1323
1324 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1325         pjsip_evsub_add_ref(sub_tree->evsub);
1326 #endif
1327
1328         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1329                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1330
1331         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1332
1333         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1334         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1335                 sub_tree->is_list = 1;
1336         }
1337
1338         add_subscription(sub_tree);
1339
1340         return sub_tree;
1341 }
1342
1343 static int initial_notify_task(void *obj);
1344 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1345
1346 /*! Persistent subscription recreation continuation under distributor serializer data */
1347 struct persistence_recreate_data {
1348         struct subscription_persistence *persistence;
1349         pjsip_rx_data *rdata;
1350 };
1351
1352 /*!
1353  * \internal
1354  * \brief subscription_persistence_recreate continuation under distributor serializer.
1355  * \since 13.10.0
1356  *
1357  * \retval 0 on success.
1358  * \retval -1 on error.
1359  */
1360 static int sub_persistence_recreate(void *obj)
1361 {
1362         struct persistence_recreate_data *recreate_data = obj;
1363         struct subscription_persistence *persistence = recreate_data->persistence;
1364         pjsip_rx_data *rdata = recreate_data->rdata;
1365         struct ast_sip_endpoint *endpoint;
1366         struct sip_subscription_tree *sub_tree;
1367         struct ast_sip_pubsub_body_generator *generator;
1368         struct ast_sip_subscription_handler *handler;
1369         char *resource;
1370         pjsip_sip_uri *request_uri;
1371         size_t resource_size;
1372         int resp;
1373         struct resource_tree tree;
1374         pjsip_expires_hdr *expires_header;
1375
1376         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1377         resource_size = pj_strlen(&request_uri->user) + 1;
1378         resource = ast_alloca(resource_size);
1379         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1380
1381         handler = subscription_get_handler_from_rdata(rdata);
1382         if (!handler || !handler->notifier) {
1383                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1384                         persistence->endpoint);
1385                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1386                 return 0;
1387         }
1388
1389         generator = subscription_get_generator_from_rdata(rdata, handler);
1390         if (!generator) {
1391                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1392                         persistence->endpoint);
1393                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1394                 return 0;
1395         }
1396
1397         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1398                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1399
1400         /* Getting the endpoint may take some time that can affect the expiration. */
1401         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1402                 persistence->endpoint);
1403         if (!endpoint) {
1404                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1405                         persistence->endpoint);
1406                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1407                 ao2_ref(endpoint, -1);
1408                 return 0;
1409         }
1410
1411         /* Update the expiration header with the new expiration */
1412         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1413                 rdata->msg_info.msg->hdr.next);
1414         if (!expires_header) {
1415                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1416                 if (!expires_header) {
1417                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1418                                 persistence->endpoint);
1419                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1420                         ao2_ref(endpoint, -1);
1421                         return 0;
1422                 }
1423                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1424         }
1425         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1426         if (expires_header->ivalue <= 0) {
1427                 /* The subscription expired since we started recreating the subscription. */
1428                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1429                 ao2_ref(endpoint, -1);
1430                 return 0;
1431         }
1432
1433         memset(&tree, 0, sizeof(tree));
1434         resp = build_resource_tree(endpoint, handler, resource, &tree,
1435                 ast_sip_pubsub_has_eventlist_support(rdata));
1436         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1437                 pj_status_t dlg_status;
1438
1439                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1440                         &tree, &dlg_status);
1441                 if (!sub_tree) {
1442                         if (dlg_status != PJ_EEXISTS) {
1443                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1444                                         persistence->endpoint);
1445                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1446                         }
1447                 } else {
1448                         sub_tree->persistence = ao2_bump(persistence);
1449                         subscription_persistence_update(sub_tree, rdata);
1450                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task,
1451                                 ao2_bump(sub_tree))) {
1452                                 /* Could not send initial subscribe NOTIFY */
1453                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1454                                 ao2_ref(sub_tree, -1);
1455                         }
1456                 }
1457         } else {
1458                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1459         }
1460         resource_tree_destroy(&tree);
1461         ao2_ref(endpoint, -1);
1462
1463         return 0;
1464 }
1465
1466 /*! \brief Callback function to perform the actual recreation of a subscription */
1467 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1468 {
1469         struct subscription_persistence *persistence = obj;
1470         pj_pool_t *pool = arg;
1471         struct ast_taskprocessor *serializer;
1472         pjsip_rx_data rdata;
1473         struct persistence_recreate_data recreate_data;
1474
1475         /* If this subscription has already expired remove it */
1476         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1477                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1478                 return 0;
1479         }
1480
1481         memset(&rdata, 0, sizeof(rdata));
1482         pj_pool_reset(pool);
1483         rdata.tp_info.pool = pool;
1484
1485         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1486                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1487                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1488                         persistence->endpoint);
1489                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1490                 return 0;
1491         }
1492
1493         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1494                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1495                         persistence->endpoint);
1496                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1497                 return 0;
1498         }
1499
1500         /* Continue the remainder in the distributor serializer */
1501         serializer = ast_sip_get_distributor_serializer(&rdata);
1502         if (!serializer) {
1503                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1504                         persistence->endpoint);
1505                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1506                 return 0;
1507         }
1508         recreate_data.persistence = persistence;
1509         recreate_data.rdata = &rdata;
1510         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1511                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1512                         persistence->endpoint);
1513                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1514         }
1515         ast_taskprocessor_unreference(serializer);
1516
1517         return 0;
1518 }
1519
1520 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1521 static int subscription_persistence_load(void *data)
1522 {
1523         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1524                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1525         pj_pool_t *pool;
1526
1527         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1528                 PJSIP_POOL_RDATA_INC);
1529         if (!pool) {
1530                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1531                 return 0;
1532         }
1533
1534         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1535
1536         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1537
1538         ao2_ref(persisted_subscriptions, -1);
1539         return 0;
1540 }
1541
1542 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1543 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1544 {
1545         struct ast_json_payload *payload;
1546         const char *type;
1547
1548         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1549                 return;
1550         }
1551
1552         payload = stasis_message_data(message);
1553         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1554
1555         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1556          * recreate SIP subscriptions.
1557          */
1558         if (strcmp(type, "FullyBooted")) {
1559                 return;
1560         }
1561
1562         /* This has to be here so the subscription is recreated when the body generator is available */
1563         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1564
1565         /* Once the system is fully booted we don't care anymore */
1566         stasis_unsubscribe(sub);
1567 }
1568
1569 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1570
1571 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1572 {
1573         int num = 0;
1574         struct sip_subscription_tree *i;
1575         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1576
1577         if (!on_subscription) {
1578                 return num;
1579         }
1580
1581         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1582                 if (on_subscription(i, arg)) {
1583                         break;
1584                 }
1585                 ++num;
1586         }
1587         return num;
1588 }
1589
1590 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1591                                     struct ast_str **buf)
1592 {
1593         char str[256];
1594         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1595
1596         ast_str_append(buf, 0, "Role: %s\r\n",
1597                        sip_subscription_roles_map[sub_tree->role]);
1598         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1599                        ast_sorcery_object_get_id(sub_tree->endpoint));
1600
1601         if (sub_tree->dlg) {
1602                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1603         } else {
1604                 ast_copy_string(str, "<unknown>", sizeof(str));
1605         }
1606         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1607
1608         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1609
1610         ast_callerid_merge(str, sizeof(str),
1611                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1612                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1613                            "Unknown");
1614
1615         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1616
1617         /* XXX This needs to be done recursively for lists */
1618         if (sub_tree->root->handler->to_ami) {
1619                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1620         }
1621 }
1622
1623
1624 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1625 {
1626         pjsip_dialog *dlg;
1627         pjsip_msg *msg;
1628         pj_str_t name;
1629
1630         dlg = sub->tree->dlg;
1631         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1632         pj_cstr(&name, header);
1633
1634         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1635 }
1636
1637 /*!
1638  * \internal
1639  * \brief Wrapper for pjsip_evsub_send_request
1640  *
1641  * This function (re)sets the transport before sending to catch cases
1642  * where the transport might have changed.
1643  *
1644  * If pjproject gives us the ability to resend, we'll only reset the transport
1645  * if PJSIP_ETPNOTAVAIL is returned from send.
1646  *
1647  * \returns pj_status_t
1648  */
1649 static pj_status_t internal_pjsip_evsub_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1650 {
1651         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
1652
1653         ast_sip_set_tpselector_from_transport_name(sub_tree->endpoint->transport, &selector);
1654         pjsip_dlg_set_transport(sub_tree->dlg, &selector);
1655
1656         return pjsip_evsub_send_request(sub_tree->evsub, tdata);
1657 }
1658
1659 /* XXX This function is not used. */
1660 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1661                 struct ast_sip_endpoint *endpoint, const char *resource)
1662 {
1663         struct ast_sip_subscription *sub;
1664         pjsip_dialog *dlg;
1665         struct ast_sip_contact *contact;
1666         pj_str_t event;
1667         pjsip_tx_data *tdata;
1668         pjsip_evsub *evsub;
1669         struct sip_subscription_tree *sub_tree = NULL;
1670
1671         sub_tree = allocate_subscription_tree(endpoint, NULL);
1672         if (!sub_tree) {
1673                 return NULL;
1674         }
1675
1676         sub = allocate_subscription(handler, resource, sub_tree);
1677         if (!sub) {
1678                 ao2_cleanup(sub_tree);
1679                 return NULL;
1680         }
1681
1682         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1683         if (!contact || ast_strlen_zero(contact->uri)) {
1684                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1685                                 ast_sorcery_object_get_id(endpoint));
1686                 ao2_ref(sub_tree, -1);
1687                 ao2_cleanup(contact);
1688                 return NULL;
1689         }
1690
1691         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1692         ao2_cleanup(contact);
1693         if (!dlg) {
1694                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1695                 ao2_ref(sub_tree, -1);
1696                 return NULL;
1697         }
1698
1699         pj_cstr(&event, handler->event_name);
1700         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1701         subscription_setup_dialog(sub_tree, dlg);
1702
1703         evsub = sub_tree->evsub;
1704
1705         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1706                 internal_pjsip_evsub_send_request(sub_tree, tdata);
1707         } else {
1708                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1709                  * being called and terminating the subscription. Therefore, we don't
1710                  * need to decrease the reference count of sub here.
1711                  */
1712                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1713                 ao2_ref(sub_tree, -1);
1714                 return NULL;
1715         }
1716
1717         add_subscription(sub_tree);
1718
1719         return sub;
1720 }
1721
1722 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1723 {
1724         ast_assert(sub->tree->dlg != NULL);
1725         return sub->tree->dlg;
1726 }
1727
1728 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1729 {
1730         ast_assert(sub->tree->endpoint != NULL);
1731         return ao2_bump(sub->tree->endpoint);
1732 }
1733
1734 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1735 {
1736         ast_assert(sub->tree->serializer != NULL);
1737         return sub->tree->serializer;
1738 }
1739
1740 /*!
1741  * \brief Pre-allocate a buffer for the transmission
1742  *
1743  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1744  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1745  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1746  * packet, then we get told the message is too long to be sent.
1747  *
1748  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1749  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1750  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1751  * if the message will fit, and resizing the buffer as required.
1752  *
1753  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1754  * it at 64000 for a couple of reasons:
1755  * 1) Allocating more than 64K at a time is hard to justify
1756  * 2) If the message goes through proxies, those proxies will want to add Via and
1757  *    Record-Route headers, making the message even larger. Giving some space for
1758  *    those headers is a nice thing to do.
1759  *
1760  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1761  * going to impose the same 64K limit as a memory savings.
1762  *
1763  * \param tdata The tdata onto which to allocate a buffer
1764  * \retval 0 Success
1765  * \retval -1 The message is too large
1766  */
1767 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1768 {
1769         int buf_size;
1770         int size = -1;
1771         char *buf;
1772
1773         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1774                 buf = pj_pool_alloc(tdata->pool, buf_size);
1775                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1776         }
1777
1778         if (size == -1) {
1779                 return -1;
1780         }
1781
1782         tdata->buf.start = buf;
1783         tdata->buf.cur = tdata->buf.start;
1784         tdata->buf.end = tdata->buf.start + buf_size;
1785
1786         return 0;
1787 }
1788
1789 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1790 {
1791 #ifdef TEST_FRAMEWORK
1792         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1793         pjsip_evsub *evsub = sub_tree->evsub;
1794 #endif
1795         int res;
1796
1797         if (allocate_tdata_buffer(tdata)) {
1798                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1799                 return -1;
1800         }
1801
1802         res = internal_pjsip_evsub_send_request(sub_tree, tdata);
1803
1804         subscription_persistence_update(sub_tree, NULL);
1805
1806         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1807                 "StateText: %s\r\n"
1808                 "Endpoint: %s\r\n",
1809                 pjsip_evsub_get_state_name(evsub),
1810                 ast_sorcery_object_get_id(endpoint));
1811
1812         return (res == PJ_SUCCESS ? 0 : -1);
1813 }
1814
1815 /*!
1816  * \brief Add a resource XML element to an RLMI body
1817  *
1818  * Each resource element represents a subscribed resource in the list. This function currently
1819  * will unconditionally add an instance element to each created resource element. Instance
1820  * elements refer to later parts in the multipart body.
1821  *
1822  * \param pool PJLIB allocation pool
1823  * \param cid Content-ID header of the resource
1824  * \param resource_name Name of the resource
1825  * \param resource_uri URI of the resource
1826  * \param state State of the subscribed resource
1827  */
1828 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1829                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1830 {
1831         static pj_str_t cid_name = { "cid", 3 };
1832         pj_xml_node *resource;
1833         pj_xml_node *name;
1834         pj_xml_node *instance;
1835         pj_xml_attr *cid_attr;
1836         char id[6];
1837         char uri[PJSIP_MAX_URL_SIZE];
1838
1839         /* This creates a string representing the Content-ID without the enclosing < > */
1840         const pj_str_t cid_stripped = {
1841                 .ptr = cid->hvalue.ptr + 1,
1842                 .slen = cid->hvalue.slen - 2,
1843         };
1844
1845         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1846         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1847         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1848
1849         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1850         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1851
1852         pj_strdup2(pool, &name->content, resource_name);
1853
1854         ast_generate_random_string(id, sizeof(id));
1855
1856         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1857         ast_sip_presence_xml_create_attr(pool, instance, "state",
1858                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1859
1860         /* Use the PJLIB-util XML library directly here since we are using a
1861          * pj_str_t
1862          */
1863
1864         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1865         pj_xml_add_attr(instance, cid_attr);
1866 }
1867
1868 /*!
1869  * \brief A multipart body part and meta-information
1870  *
1871  * When creating a multipart body part, the end result (the
1872  * pjsip_multipart_part) is hard to inspect without undoing
1873  * a lot of what was done to create it. Therefore, we use this
1874  * structure to store meta-information about the body part.
1875  *
1876  * The main consumer of this is the creator of the RLMI body
1877  * part of a multipart resource list body.
1878  */
1879 struct body_part {
1880         /*! Content-ID header for the body part */
1881         pjsip_generic_string_hdr *cid;
1882         /*! Subscribed resource represented in the body part */
1883         const char *resource;
1884         /*! URI for the subscribed body part */
1885         pjsip_sip_uri *uri;
1886         /*! Subscription state of the resource represented in the body part */
1887         pjsip_evsub_state state;
1888         /*! The actual body part that will be present in the multipart body */
1889         pjsip_multipart_part *part;
1890 };
1891
1892 /*!
1893  * \brief Type declaration for container of body part structures
1894  */
1895 AST_VECTOR(body_part_list, struct body_part *);
1896
1897 /*!
1898  * \brief Create a Content-ID header
1899  *
1900  * Content-ID headers are required by RFC2387 for multipart/related
1901  * bodies. They serve as identifiers for each part of the multipart body.
1902  *
1903  * \param pool PJLIB allocation pool
1904  * \param sub Subscription to a resource
1905  */
1906 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1907                 const struct ast_sip_subscription *sub)
1908 {
1909         static const pj_str_t cid_name = { "Content-ID", 10 };
1910         pjsip_generic_string_hdr *cid;
1911         char id[6];
1912         size_t alloc_size;
1913         pj_str_t cid_value;
1914
1915         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1916         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1917         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1918         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1919                         ast_generate_random_string(id, sizeof(id)),
1920                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1921         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1922
1923         return cid;
1924 }
1925
1926 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1927 {
1928         int num_printed;
1929         pj_xml_node *rlmi = msg_body->data;
1930
1931         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1932         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1933                 return -1;
1934         }
1935
1936         return num_printed;
1937 }
1938
1939 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1940 {
1941         const pj_xml_node *rlmi = data;
1942
1943         return pj_xml_clone(pool, rlmi);
1944 }
1945
1946 /*!
1947  * \brief Create an RLMI body part for a multipart resource list body
1948  *
1949  * RLMI (Resource list meta information) is a special body type that lists
1950  * the subscribed resources and tells subscribers the number of subscribed
1951  * resources and what other body parts are in the multipart body. The
1952  * RLMI body also has a version number that a subscriber can use to ensure
1953  * that the locally-stored state corresponds to server state.
1954  *
1955  * \param pool The allocation pool
1956  * \param sub The subscription representing the subscribed resource list
1957  * \param body_parts A container of body parts that RLMI will refer to
1958  * \param full_state Indicates whether this is a full or partial state notification
1959  * \return The multipart part representing the RLMI body
1960  */
1961 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1962                 struct body_part_list *body_parts, unsigned int full_state)
1963 {
1964         static const pj_str_t rlmi_type = { "application", 11 };
1965         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1966         pj_xml_node *rlmi;
1967         pj_xml_node *name;
1968         pjsip_multipart_part *rlmi_part;
1969         char version_str[32];
1970         char uri[PJSIP_MAX_URL_SIZE];
1971         pjsip_generic_string_hdr *cid;
1972         int i;
1973
1974         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1975         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1976
1977         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1978         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1979
1980         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1981         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1982         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1983
1984         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1985         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1986
1987         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1988                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1989
1990                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1991         }
1992
1993         rlmi_part = pjsip_multipart_create_part(pool);
1994
1995         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
1996         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
1997         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
1998         pj_list_init(&rlmi_part->body->content_type.param);
1999
2000         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2001         rlmi_part->body->clone_data = rlmi_clone_data;
2002         rlmi_part->body->print_body = rlmi_print_body;
2003
2004         cid = generate_content_id_hdr(pool, sub);
2005         pj_list_insert_before(&rlmi_part->hdr, cid);
2006
2007         return rlmi_part;
2008 }
2009
2010 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2011                 unsigned int force_full_state);
2012
2013 /*!
2014  * \brief Destroy a list of body parts
2015  *
2016  * \param parts The container of parts to destroy
2017  */
2018 static void free_body_parts(struct body_part_list *parts)
2019 {
2020         int i;
2021
2022         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2023                 struct body_part *part = AST_VECTOR_GET(parts, i);
2024                 ast_free(part);
2025         }
2026
2027         AST_VECTOR_FREE(parts);
2028 }
2029
2030 /*!
2031  * \brief Allocate and initialize a body part structure
2032  *
2033  * \param pool PJLIB allocation pool
2034  * \param sub Subscription representing a subscribed resource
2035  */
2036 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2037 {
2038         struct body_part *bp;
2039
2040         bp = ast_calloc(1, sizeof(*bp));
2041         if (!bp) {
2042                 return NULL;
2043         }
2044
2045         bp->cid = generate_content_id_hdr(pool, sub);
2046         bp->resource = sub->resource;
2047         bp->state = sub->subscription_state;
2048         bp->uri = sub->uri;
2049
2050         return bp;
2051 }
2052
2053 /*!
2054  * \brief Create a multipart body part for a subscribed resource
2055  *
2056  * \param pool PJLIB allocation pool
2057  * \param sub The subscription representing a subscribed resource
2058  * \param parts A vector of parts to append the created part to.
2059  * \param use_full_state Unused locally, but may be passed to other functions
2060  */
2061 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2062                 struct body_part_list *parts, unsigned int use_full_state)
2063 {
2064         struct body_part *bp;
2065         pjsip_msg_body *body;
2066
2067         bp = allocate_body_part(pool, sub);
2068         if (!bp) {
2069                 return;
2070         }
2071
2072         body = generate_notify_body(pool, sub, use_full_state);
2073         if (!body) {
2074                 /* Partial state was requested and the resource has not changed state */
2075                 ast_free(bp);
2076                 return;
2077         }
2078
2079         bp->part = pjsip_multipart_create_part(pool);
2080         bp->part->body = body;
2081         pj_list_insert_before(&bp->part->hdr, bp->cid);
2082
2083         AST_VECTOR_APPEND(parts, bp);
2084 }
2085
2086 /*!
2087  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2088  *
2089  * \param pool
2090  * \return The multipart message body
2091  */
2092 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2093 {
2094         pjsip_media_type media_type;
2095         pjsip_param *media_type_param;
2096         char boundary[6];
2097         pj_str_t pj_boundary;
2098
2099         pjsip_media_type_init2(&media_type, "multipart", "related");
2100
2101         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2102         pj_list_init(media_type_param);
2103
2104         pj_strdup2(pool, &media_type_param->name, "type");
2105         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2106
2107         pj_list_insert_before(&media_type.param, media_type_param);
2108
2109         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2110         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2111 }
2112
2113 /*!
2114  * \brief Create a resource list body for NOTIFY requests
2115  *
2116  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2117  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2118  * convey state of individual subscribed resources.
2119  *
2120  * \param pool PJLIB allocation pool
2121  * \param sub Subscription details from which to generate body
2122  * \param force_full_state If true, ignore resource list settings and send a full state notification
2123  * \return The generated multipart/related body
2124  */
2125 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2126                 unsigned int force_full_state)
2127 {
2128         int i;
2129         pjsip_multipart_part *rlmi_part;
2130         pjsip_msg_body *multipart;
2131         struct body_part_list body_parts;
2132         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2133
2134         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2135                 return NULL;
2136         }
2137
2138         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2139                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2140         }
2141
2142         /* This can happen if issuing partial state and no children of the list have changed state */
2143         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2144                 return NULL;
2145         }
2146
2147         multipart = create_multipart_body(pool);
2148
2149         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2150         if (!rlmi_part) {
2151                 return NULL;
2152         }
2153         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2154
2155         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2156                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2157         }
2158
2159         free_body_parts(&body_parts);
2160         return multipart;
2161 }
2162
2163 /*!
2164  * \brief Create the body for a NOTIFY request.
2165  *
2166  * \param pool The pool used for allocations
2167  * \param root The root of the subscription tree
2168  * \param force_full_state If true, ignore resource list settings and send a full state notification
2169  */
2170 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2171                 unsigned int force_full_state)
2172 {
2173         pjsip_msg_body *body;
2174
2175         if (AST_VECTOR_SIZE(&root->children) == 0) {
2176                 if (force_full_state || root->body_changed) {
2177                         /* Not a list. We've already generated the body and saved it on the subscription.
2178                          * Use that directly.
2179                          */
2180                         pj_str_t type;
2181                         pj_str_t subtype;
2182                         pj_str_t text;
2183
2184                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2185                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2186                         pj_cstr(&text, ast_str_buffer(root->body_text));
2187
2188                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2189                         root->body_changed = 0;
2190                 } else {
2191                         body = NULL;
2192                 }
2193         } else {
2194                 body = generate_list_body(pool, root, force_full_state);
2195         }
2196
2197         return body;
2198 }
2199
2200 /*!
2201  * \brief Shortcut method to create a Require: eventlist header
2202  */
2203 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2204 {
2205         pjsip_require_hdr *require;
2206
2207         require = pjsip_require_hdr_create(pool);
2208         pj_strdup2(pool, &require->values[0], "eventlist");
2209         require->count = 1;
2210
2211         return require;
2212 }
2213
2214 /*!
2215  * \brief Send a NOTIFY request to a subscriber
2216  *
2217  * \pre sub_tree->dlg is locked
2218  *
2219  * \param sub_tree The subscription tree representing the subscription
2220  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2221  * \retval 0 Success
2222  * \retval non-zero Failure
2223  */
2224 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2225 {
2226         pjsip_evsub *evsub = sub_tree->evsub;
2227         pjsip_tx_data *tdata;
2228
2229         if (ast_shutdown_final()
2230                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2231                 && sub_tree->persistence) {
2232                 return 0;
2233         }
2234
2235         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2236                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2237                 return -1;
2238         }
2239
2240         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2241         if (!tdata->msg->body) {
2242                 pjsip_tx_data_dec_ref(tdata);
2243                 return -1;
2244         }
2245
2246         if (sub_tree->is_list) {
2247                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2248                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2249         }
2250
2251         if (sip_subscription_send_request(sub_tree, tdata)) {
2252                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2253                 return -1;
2254         }
2255
2256         sub_tree->send_scheduled_notify = 0;
2257
2258         return 0;
2259 }
2260
2261 static int serialized_send_notify(void *userdata)
2262 {
2263         struct sip_subscription_tree *sub_tree = userdata;
2264         pjsip_dialog *dlg = sub_tree->dlg;
2265
2266         pjsip_dlg_inc_lock(dlg);
2267
2268         /* It's possible that between when the notification was scheduled
2269          * and now a new SUBSCRIBE arrived requiring full state to be
2270          * sent out in an immediate NOTIFY. It's also possible that we're
2271          * already processing a terminate.  If that has happened, we need to
2272          * bail out here instead of sending the batched NOTIFY.
2273          */
2274
2275         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2276                 || !sub_tree->send_scheduled_notify) {
2277                 pjsip_dlg_dec_lock(dlg);
2278                 ao2_cleanup(sub_tree);
2279                 return 0;
2280         }
2281
2282         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2283                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2284         }
2285
2286         send_notify(sub_tree, 0);
2287
2288         ast_test_suite_event_notify(
2289                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2290                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2291                 "Resource: %s", sub_tree->root->resource);
2292
2293         sub_tree->notify_sched_id = -1;
2294         pjsip_dlg_dec_lock(dlg);
2295         ao2_cleanup(sub_tree);
2296         return 0;
2297 }
2298
2299 static int sched_cb(const void *data)
2300 {
2301         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2302
2303         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2304         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2305                 ao2_cleanup(sub_tree);
2306         }
2307
2308         return 0;
2309 }
2310
2311 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2312 {
2313         /* There's already a notification scheduled */
2314         if (sub_tree->notify_sched_id > -1) {
2315                 return 0;
2316         }
2317
2318         sub_tree->send_scheduled_notify = 1;
2319         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2320         if (sub_tree->notify_sched_id < 0) {
2321                 ao2_cleanup(sub_tree);
2322                 return -1;
2323         }
2324
2325         return 0;
2326 }
2327
2328 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2329                 int terminate)
2330 {
2331         int res;
2332         pjsip_dialog *dlg = sub->tree->dlg;
2333
2334         pjsip_dlg_inc_lock(dlg);
2335
2336         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2337                 pjsip_dlg_dec_lock(dlg);
2338                 return 0;
2339         }
2340
2341         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2342                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2343                 pjsip_dlg_dec_lock(dlg);
2344                 return -1;
2345         }
2346
2347         sub->body_changed = 1;
2348         if (terminate) {
2349                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2350                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2351         }
2352
2353         if (sub->tree->notification_batch_interval) {
2354                 res = schedule_notification(sub->tree);
2355         } else {
2356                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2357                 ao2_ref(sub->tree, +1);
2358                 if (terminate) {
2359                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2360                 }
2361                 res = send_notify(sub->tree, 0);
2362                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2363                                 "Resource: %s",
2364                                 sub->tree->root->resource);
2365                 ao2_ref(sub->tree, -1);
2366         }
2367
2368         pjsip_dlg_dec_lock(dlg);
2369         return res;
2370 }
2371
2372 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2373 {
2374         return sub->uri;
2375 }
2376
2377 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2378 {
2379         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2380 }
2381
2382 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2383 {
2384         pjsip_dialog *dlg;
2385
2386         dlg = sub->tree->dlg;
2387         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2388 }
2389
2390 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2391 {
2392         return sub->resource;
2393 }
2394
2395 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2396 {
2397         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2398 }
2399
2400 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2401 {
2402         pjsip_hdr res_hdr;
2403
2404         /* If this is a persistence recreation the subscription has already been accepted */
2405         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2406                 return 0;
2407         }
2408
2409         pj_list_init(&res_hdr);
2410         if (sub_tree->is_list) {
2411                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2412                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2413         }
2414
2415         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2416 }
2417
2418 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2419 {
2420         return ast_datastores_alloc_datastore(info, uid);
2421 }
2422
2423 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2424 {
2425         return ast_datastores_add(subscription->datastores, datastore);
2426 }
2427
2428 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2429 {
2430         return ast_datastores_find(subscription->datastores, name);
2431 }
2432
2433 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2434 {
2435         ast_datastores_remove(subscription->datastores, name);
2436 }
2437
2438 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2439 {
2440         return subscription->datastores;
2441 }
2442
2443 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2444 {
2445         return ast_datastores_add(publication->datastores, datastore);
2446 }
2447
2448 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2449 {
2450         return ast_datastores_find(publication->datastores, name);
2451 }
2452
2453 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2454 {
2455         ast_datastores_remove(publication->datastores, name);
2456 }
2457
2458 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2459 {
2460         return publication->datastores;
2461 }
2462
2463 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2464
2465 static int publication_hash_fn(const void *obj, const int flags)
2466 {
2467         const struct ast_sip_publication *publication = obj;
2468         const int *entity_tag = obj;
2469
2470         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2471 }
2472
2473 static int publication_cmp_fn(void *obj, void *arg, int flags)
2474 {
2475         const struct ast_sip_publication *publication1 = obj;
2476         const struct ast_sip_publication *publication2 = arg;
2477         const int *entity_tag = arg;
2478
2479         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2480                 CMP_MATCH | CMP_STOP : 0);
2481 }
2482
2483 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2484 {
2485         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2486         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2487 }
2488
2489 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2490 {
2491         if (ast_strlen_zero(handler->event_name)) {
2492                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2493                 return -1;
2494         }
2495
2496         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2497                 publication_hash_fn, publication_cmp_fn))) {
2498                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2499                         handler->event_name);
2500                 return -1;
2501         }
2502
2503         publish_add_handler(handler);
2504
2505         ast_module_ref(ast_module_info->self);
2506
2507         return 0;
2508 }
2509
2510 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2511 {
2512         struct ast_sip_publish_handler *iter;
2513         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2514         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2515                 if (handler == iter) {
2516                         AST_RWLIST_REMOVE_CURRENT(next);
2517                         ao2_cleanup(handler->publications);
2518                         ast_module_unref(ast_module_info->self);
2519                         break;
2520                 }
2521         }
2522         AST_RWLIST_TRAVERSE_SAFE_END;
2523 }
2524
2525 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2526
2527 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2528 {
2529         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2530         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2531         ast_module_ref(ast_module_info->self);
2532 }
2533
2534 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2535 {
2536         struct ast_sip_subscription_handler *iter;
2537         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2538
2539         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2540                 if (!strcmp(iter->event_name, event_name)) {
2541                         break;
2542                 }
2543         }
2544         return iter;
2545 }
2546
2547 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2548 {
2549         pj_str_t event;
2550         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2551         struct ast_sip_subscription_handler *existing;
2552         int i = 0;
2553
2554         if (ast_strlen_zero(handler->event_name)) {
2555                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2556                 return -1;
2557         }
2558
2559         existing = find_sub_handler_for_event_name(handler->event_name);
2560         if (existing) {
2561                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2562                                 "A handler is already registered\n", handler->event_name);
2563                 return -1;
2564         }
2565
2566         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2567                 pj_cstr(&accept[i], handler->accept[i]);
2568         }
2569
2570         pj_cstr(&event, handler->event_name);
2571
2572         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2573
2574         sub_add_handler(handler);
2575
2576         return 0;
2577 }
2578
2579 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2580 {
2581         struct ast_sip_subscription_handler *iter;
2582         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2583         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2584                 if (handler == iter) {
2585                         AST_RWLIST_REMOVE_CURRENT(next);
2586                         ast_module_unref(ast_module_info->self);
2587                         break;
2588                 }
2589         }
2590         AST_RWLIST_TRAVERSE_SAFE_END;
2591 }
2592
2593 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2594 {
2595         struct ast_sip_pubsub_body_generator *gen;
2596
2597         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2598                 if (!strcmp(gen->type, type)
2599                         && !strcmp(gen->subtype, subtype)) {
2600                         break;
2601                 }
2602         }
2603
2604         return gen;
2605 }
2606
2607 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2608 {
2609         struct ast_sip_pubsub_body_generator *gen;
2610
2611         AST_RWLIST_RDLOCK(&body_generators);
2612         gen = find_body_generator_type_subtype_nolock(type, subtype);
2613         AST_RWLIST_UNLOCK(&body_generators);
2614         return gen;
2615 }
2616
2617 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2618 {
2619         char *accept_copy = ast_strdupa(accept);
2620         char *subtype = accept_copy;
2621         char *type = strsep(&subtype, "/");
2622
2623         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2624                 return NULL;
2625         }
2626
2627         return find_body_generator_type_subtype(type, subtype);
2628 }
2629
2630 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2631                 size_t num_accept, const char *body_type)
2632 {
2633         int i;
2634         struct ast_sip_pubsub_body_generator *generator = NULL;
2635
2636         for (i = 0; i < num_accept; ++i) {
2637                 generator = find_body_generator_accept(accept[i]);
2638                 if (generator) {
2639                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2640                         if (strcmp(generator->body_type, body_type)) {
2641                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2642                                                 generator->type, generator->subtype, generator);
2643                                 generator = NULL;
2644                                 continue;
2645                         }
2646                         break;
2647                 } else {
2648                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2649                 }
2650         }
2651
2652         return generator;
2653 }
2654
2655 static int generate_initial_notify(struct ast_sip_subscription *sub)
2656 {
2657         void *notify_data;
2658         int res;
2659         struct ast_sip_body_data data = {
2660                 .body_type = sub->handler->body_type,
2661         };
2662
2663         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2664                 int i;
2665
2666                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2667                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2668                                 return -1;
2669                         }
2670                 }
2671
2672                 return 0;
2673         }
2674
2675         /* We notify subscription establishment only on the tree leaves. */
2676         if (sub->handler->notifier->subscription_established(sub)) {
2677                 return -1;
2678         }
2679
2680         notify_data = sub->handler->notifier->get_notify_data(sub);
2681         if (!notify_data) {
2682                 return -1;
2683         }
2684
2685         data.body_data = notify_data;
2686
2687         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2688                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2689
2690         ao2_cleanup(notify_data);
2691
2692         return res;
2693 }
2694
2695 static int initial_notify_task(void * obj)
2696 {
2697         struct sip_subscription_tree *sub_tree;
2698
2699         sub_tree = obj;
2700         if (generate_initial_notify(sub_tree->root)) {
2701                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2702         } else {
2703                 send_notify(sub_tree, 1);
2704                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2705                         "Resource: %s",
2706                         sub_tree->root->resource);
2707         }
2708
2709         ao2_ref(sub_tree, -1);
2710         return 0;
2711 }
2712
2713 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2714 {
2715         pjsip_expires_hdr *expires_header;
2716         struct ast_sip_subscription_handler *handler;
2717         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2718         struct sip_subscription_tree *sub_tree;
2719         struct ast_sip_pubsub_body_generator *generator;
2720         char *resource;
2721         pjsip_uri *request_uri;
2722         pjsip_sip_uri *request_uri_sip;
2723         size_t resource_size;
2724         int resp;
2725         struct resource_tree tree;
2726         pj_status_t dlg_status;
2727
2728         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2729         ast_assert(endpoint != NULL);
2730
2731         if (!endpoint->subscription.allow) {
2732                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2733                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2734                 return PJ_TRUE;
2735         }
2736
2737         request_uri = rdata->msg_info.msg->line.req.uri;
2738
2739         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2740                 char uri_str[PJSIP_MAX_URL_SIZE];
2741
2742                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2743                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2744                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2745                 return PJ_TRUE;
2746         }
2747
2748         request_uri_sip = pjsip_uri_get_uri(request_uri);
2749         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2750         resource = ast_alloca(resource_size);
2751         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2752
2753         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2754
2755         if (expires_header) {
2756                 if (expires_header->ivalue == 0) {
2757                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2758                                 ast_sorcery_object_get_id(endpoint));
2759                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2760                                 return PJ_TRUE;
2761                 }
2762                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2763                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2764                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2765                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2766                         return PJ_TRUE;
2767                 }
2768         }
2769
2770         handler = subscription_get_handler_from_rdata(rdata);
2771         if (!handler) {
2772                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2773                 return PJ_TRUE;
2774         }
2775
2776         generator = subscription_get_generator_from_rdata(rdata, handler);
2777         if (!generator) {
2778                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2779                 return PJ_TRUE;
2780         }
2781
2782         memset(&tree, 0, sizeof(tree));
2783         resp = build_resource_tree(endpoint, handler, resource, &tree,
2784                 ast_sip_pubsub_has_eventlist_support(rdata));
2785         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2786                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2787                 resource_tree_destroy(&tree);
2788                 return PJ_TRUE;
2789         }
2790
2791         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2792         if (!sub_tree) {
2793                 if (dlg_status != PJ_EEXISTS) {
2794                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2795                 }
2796         } else {
2797                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2798                 subscription_persistence_update(sub_tree, rdata);
2799                 sip_subscription_accept(sub_tree, rdata, resp);
2800                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
2801                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2802                         ao2_ref(sub_tree, -1);
2803                 }
2804         }
2805
2806         resource_tree_destroy(&tree);
2807         return PJ_TRUE;
2808 }
2809
2810 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2811 {
2812         struct ast_sip_publish_handler *iter = NULL;
2813         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2814
2815         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2816                 if (strcmp(event, iter->event_name)) {
2817                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2818                         continue;
2819                 }
2820                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2821                 break;
2822         }
2823
2824         return iter;
2825 }
2826
2827 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2828         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2829 {
2830         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2831
2832         if (etag_hdr) {
2833                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2834
2835                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2836
2837                 if (sscanf(etag, "%30d", entity_id) != 1) {
2838                         return SIP_PUBLISH_UNKNOWN;
2839                 }
2840         }
2841
2842         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2843
2844         if (!(*expires)) {
2845                 return SIP_PUBLISH_REMOVE;
2846         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2847                 return SIP_PUBLISH_INITIAL;
2848         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2849                 return SIP_PUBLISH_REFRESH;
2850         } else if (etag_hdr && rdata->msg_info.msg->body) {
2851                 return SIP_PUBLISH_MODIFY;
2852         }
2853
2854         return SIP_PUBLISH_UNKNOWN;
2855 }
2856
2857 /*! \brief Internal destructor for publications */
2858 static void publication_destroy_fn(void *obj)
2859 {
2860         struct ast_sip_publication *publication = obj;
2861
2862         ast_debug(3, "Destroying SIP publication\n");
2863
2864         ao2_cleanup(publication->datastores);
2865         ao2_cleanup(publication->endpoint);
2866 }
2867
2868 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2869         const char *resource, const char *event_configuration_name)
2870 {
2871         struct ast_sip_publication *publication;
2872         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2873         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2874         char *dst;
2875
2876         ast_assert(endpoint != NULL);
2877
2878         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2879                 return NULL;
2880         }
2881
2882         if (!(publication->datastores = ast_datastores_alloc())) {
2883                 ao2_ref(publication, -1);
2884                 return NULL;
2885         }
2886
2887         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2888         ao2_ref(endpoint, +1);
2889         publication->endpoint = endpoint;
2890         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2891         publication->sched_id = -1;
2892         dst = publication->data;
2893         publication->resource = strcpy(dst, resource);
2894         dst += resource_len;
2895         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2896
2897         return publication;
2898 }
2899
2900 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2901                 pjsip_rx_data *rdata)
2902 {
2903         pj_status_t status;
2904         pjsip_tx_data *tdata;
2905         pjsip_transaction *tsx;
2906
2907         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2908                 return -1;
2909         }
2910
2911         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2912                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
2913                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
2914
2915                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
2916                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
2917                         pjsip_tx_data_dec_ref(tdata);
2918                         return -1;
2919                 }
2920
2921                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
2922                 ast_sip_add_header(tdata, "Expires", expires);
2923         }
2924
2925         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
2926                 return -1;
2927         }
2928
2929         pjsip_tsx_recv_msg(tsx, rdata);
2930
2931         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2932                 return -1;
2933         }
2934
2935         return 0;
2936 }
2937
2938 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2939         struct ast_sip_publish_handler *handler)
2940 {
2941         struct ast_sip_publication *publication;
2942         char *resource_name;
2943         size_t resource_size;
2944         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2945         struct ast_variable *event_configuration_name = NULL;
2946         pjsip_uri *request_uri;
2947         pjsip_sip_uri *request_uri_sip;
2948         int resp;
2949
2950         request_uri = rdata->msg_info.msg->line.req.uri;
2951
2952         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2953                 char uri_str[PJSIP_MAX_URL_SIZE];
2954
2955                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2956                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2957                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2958                 return NULL;
2959         }
2960
2961         request_uri_sip = pjsip_uri_get_uri(request_uri);
2962         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2963         resource_name = ast_alloca(resource_size);
2964         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2965
2966         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2967         if (!resource) {
2968                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
2969                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2970                 return NULL;
2971         }
2972
2973         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2974                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
2975                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
2976                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2977                 return NULL;
2978         }
2979
2980         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2981                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2982                         break;
2983                 }
2984         }
2985
2986         if (!event_configuration_name) {
2987                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
2988                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2989                 return NULL;
2990         }
2991
2992         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
2993
2994         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2995                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2996                 return NULL;
2997         }
2998
2999         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3000
3001         if (!publication) {
3002                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3003                 return NULL;
3004         }
3005
3006         publication->handler = handler;
3007         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3008                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3009                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3010                 ao2_cleanup(publication);
3011                 return NULL;
3012         }
3013
3014         sip_publication_respond(publication, resp, rdata);
3015
3016         return publication;
3017 }
3018
3019 static int publish_expire_callback(void *data)
3020 {
3021         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3022
3023         if (publication->handler->publish_expire) {
3024                 publication->handler->publish_expire(publication);
3025         }
3026
3027         return 0;
3028 }
3029
3030 static int publish_expire(const void *data)
3031 {
3032         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3033
3034         ao2_unlink(publication->handler->publications, publication);
3035         publication->sched_id = -1;
3036
3037         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3038                 ao2_cleanup(publication);
3039         }
3040
3041         return 0;
3042 }
3043
3044 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3045 {
3046         pjsip_event_hdr *event_header;
3047         struct ast_sip_publish_handler *handler;
3048         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3049         char event[32];
3050         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3051         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3052         enum sip_publish_type publish_type;
3053         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3054         int expires = 0, entity_id, response = 0;
3055
3056         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3057         ast_assert(endpoint != NULL);
3058
3059         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3060         if (!event_header) {
3061                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3062                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3063                 return PJ_TRUE;
3064         }
3065         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3066
3067         handler = find_pub_handler(event);
3068         if (!handler) {
3069                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3070                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3071                 return PJ_TRUE;
3072         }
3073
3074         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3075
3076         /* If this is not an initial publish ensure that a publication is present */
3077         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3078                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3079                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3080
3081                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3082                                 NULL, NULL);
3083                         return PJ_TRUE;
3084                 }
3085
3086                 /* Per the RFC every response has to have a new entity tag */
3087                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3088
3089                 /* Update the expires here so that the created responses will contain the correct value */
3090                 publication->expires = expires;
3091         }
3092
3093         switch (publish_type) {
3094                 case SIP_PUBLISH_INITIAL:
3095                         publication = publish_request_initial(endpoint, rdata, handler);
3096                         break;
3097                 case SIP_PUBLISH_REFRESH:
3098                 case SIP_PUBLISH_MODIFY:
3099                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3100                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3101                                 /* If an error occurs we want to terminate the publication */
3102                                 expires = 0;
3103                         }
3104                         response = 200;
3105                         break;
3106                 case SIP_PUBLISH_REMOVE:
3107                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3108                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3109                         response = 200;
3110                         break;
3111                 case SIP_PUBLISH_UNKNOWN:
3112                 default:
3113                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3114                         break;
3115         }
3116
3117         if (publication) {
3118                 if (expires) {
3119                         ao2_link(handler->publications, publication);
3120
3121                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3122                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3123                 } else {
3124                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3125                 }
3126         }
3127
3128         if (response) {
3129                 sip_publication_respond(publication, response, rdata);
3130         }
3131
3132         return PJ_TRUE;
3133 }
3134
3135 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3136 {
3137         return pub->endpoint;
3138 }
3139
3140 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3141 {
3142         return pub->resource;
3143 }
3144
3145 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3146 {
3147         return pub->event_configuration_name;
3148 }
3149
3150 int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype)
3151 {
3152         return !!find_body_generator_type_subtype(type, subtype);
3153 }
3154
3155 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3156 {
3157         struct ast_sip_pubsub_body_generator *existing;
3158         pj_str_t accept;
3159         pj_size_t accept_len;
3160
3161         AST_RWLIST_WRLOCK(&body_generators);
3162         existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype);
3163         if (existing) {
3164                 AST_RWLIST_UNLOCK(&body_generators);
3165                 ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n",
3166                         generator->type, generator->subtype);
3167                 return -1;
3168         }
3169         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3170         AST_RWLIST_UNLOCK(&body_generators);
3171
3172         /* Lengths of type and subtype plus a slash. */
3173         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3174
3175         /* Add room for null terminator that sprintf() will set. */
3176         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3177         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3178
3179         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3180                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3181
3182         return 0;
3183 }
3184
3185 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3186 {
3187         struct ast_sip_pubsub_body_generator *iter;
3188         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3189
3190         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3191                 if (iter == generator) {
3192                         AST_LIST_REMOVE_CURRENT(list);
3193                         break;
3194                 }
3195         }
3196         AST_RWLIST_TRAVERSE_SAFE_END;
3197 }
3198
3199 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3200 {
3201         AST_RWLIST_WRLOCK(&body_supplements);
3202         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3203         AST_RWLIST_UNLOCK(&body_supplements);
3204
3205         return 0;
3206 }
3207
3208 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3209 {
3210         struct ast_sip_pubsub_body_supplement *iter;
3211         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3212
3213         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3214                 if (iter == supplement) {
3215                         AST_LIST_REMOVE_CURRENT(list);
3216                         break;
3217                 }
3218         }
3219         AST_RWLIST_TRAVERSE_SAFE_END;
3220 }
3221
3222 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3223 {
3224         return sub->body_generator->type;
3225 }
3226
3227 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3228 {
3229         return sub->body_generator->subtype;
3230 }
3231
3232 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3233                 struct ast_sip_body_data *data, struct ast_str **str)
3234 {
3235         struct ast_sip_pubsub_body_supplement *supplement;
3236         struct ast_sip_pubsub_body_generator *generator;
3237         int res = 0;
3238         void *body;
3239
3240         generator = find_body_generator_type_subtype(type, subtype);
3241         if (!generator) {
3242                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3243                                 type, subtype);
3244                 return -1;
3245         }
3246
3247         if (strcmp(data->body_type, generator->body_type)) {
3248                 ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n",
3249                         type, subtype);
3250                 return -1;
3251         }
3252
3253         body = generator->allocate_body(data->body_data);
3254         if (!body) {
3255                 ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n",
3256                         type, subtype);
3257                 return -1;
3258         }
3259
3260         if (generator->generate_body_content(body, data->body_data)) {
3261                 res = -1;
3262                 goto end;
3263         }
3264
3265         AST_RWLIST_RDLOCK(&body_supplements);
3266         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3267                 if (!strcmp(generator->type, supplement->type) &&
3268                                 !strcmp(generator->subtype, supplement->subtype)) {
3269                         res = supplement->supplement_body(body, data->body_data);
3270                         if (res) {
3271                                 break;
3272                         }
3273                 }
3274         }
3275         AST_RWLIST_UNLOCK(&body_supplements);
3276
3277         if (!res) {
3278                 generator->to_string(body, str);
3279         }
3280
3281 end:
3282         if (generator->destroy_body) {
3283                 generator->destroy_body(body);
3284         }
3285
3286         return res;
3287 }
3288
3289 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3290 {
3291         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3292                 return pubsub_on_rx_subscribe_request(rdata);
3293         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3294                 return pubsub_on_rx_publish_request(rdata);
3295         }
3296
3297         return PJ_FALSE;
3298 }
3299
3300 static void set_state_terminated(struct ast_sip_subscription *sub)
3301 {
3302         int i;
3303
3304         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3305         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3306                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3307         }
3308 }
3309
3310 /*!
3311  * \brief Callback sequence for subscription terminate:
3312  *
3313  * * Client initiated:
3314  *     pjproject receives SUBSCRIBE on the subscription's serializer thread
3315  *         calls pubsub_on_rx_refresh with dialog locked
3316  *             pubsub_on_rx_refresh sets TERMINATE_PENDING
3317  *             pushes serialized_pubsub_on_refresh_timeout
3318  *             returns to pjproject
3319  *         pjproject calls pubsub_on_evsub_state
3320  *             pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no)
3321  *             ignore and return
3322  *         pjproject unlocks dialog
3323  *     serialized_pubsub_on_refresh_timeout starts (1)
3324  *       locks dialog
3325  *       checks state == TERMINATE_PENDING
3326  *       sets TERMINATE_IN_PROGRESS
3327  *       calls send_notify (2)
3328  *           send_notify ultimately calls pjsip_evsub_send_request
3329  *               pjsip_evsub_send_request calls evsub's set_state
3330  *                   set_state calls pubsub_evsub_set_state
3331  *                       pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS
3332  *                       removes the subscriptions
3333  *                       cleans up references to evsub
3334  *                       sets state = TERMINATED
3335  *       serialized_pubsub_on_refresh_timeout unlocks dialog
3336  *
3337  * * Subscription timer expires:
3338  *     pjproject timer expires
3339  *         locks dialog
3340  *         calls pubsub_on_server_timeout
3341  *             pubsub_on_server_timeout checks state == NORMAL
3342  *             sets TERMINATE_PENDING
3343  *             pushes serialized_pubsub_on_refresh_timeout
3344  *             returns to pjproject
3345  *         pjproject unlocks dialog
3346  *     serialized_pubsub_on_refresh_timeout starts
3347  *         See (1) Above
3348  *
3349  *
3350  * * ast_sip_subscription_notify is called
3351  *       checks state == NORMAL
3352  *       if not batched...
3353  *           sets TERMINATE_IN_PROGRESS (if terminate is requested)
3354  *           calls send_notify
3355  *               See (2) Above
3356  *       if batched...
3357  *           sets TERMINATE_PENDING
3358  *           schedules task
3359  *       scheduler runs sched_task
3360  *           sched_task pushes serialized_send_notify
3361  *       serialized_send_notify starts
3362  *           checks state <= TERMINATE_PENDING
3363  *           if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS
3364  *           call send_notify
3365  *               See (2) Above
3366  *
3367  */
3368
3369 /*!
3370  * \brief PJSIP callback when underlying SIP subscription changes state
3371  *
3372  * Although this function is called for every state change, we only care
3373  * about the TERMINATED state, and only when we're actually processing the final
3374  * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS).  In this case, we do all
3375  * the subscription tree cleanup tasks and decrement the evsub reference.
3376  */
3377 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3378 {
3379         struct sip_subscription_tree *sub_tree;
3380
3381         ast_debug(3, "on_evsub_state called with state %s\n", pjsip_evsub_get_state_name(evsub));
3382
3383         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3384                 return;
3385         }
3386
3387         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3388         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3389                 ast_debug(1, "Possible terminate race prevented %p\n", sub_tree);
3390                 return;
3391         }
3392
3393         remove_subscription(sub_tree);
3394
3395         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3396
3397 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
3398         pjsip_evsub_dec_ref(sub_tree->evsub);
3399 #endif
3400
3401         sub_tree->evsub = NULL;
3402
3403         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
3404         ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
3405
3406         subscription_persistence_remove(sub_tree);
3407         shutdown_subscriptions(sub_tree->root);
3408
3409         sub_tree->state = SIP_SUB_TREE_TERMINATED;
3410         /* Remove evsub's reference to the sub_tree */
3411         ao2_ref(sub_tree, -1);
3412 }
3413
3414 static int serialized_pubsub_on_refresh_timeout(void *userdata)
3415 {
3416         struct sip_subscription_tree *sub_tree = userdata;
3417         pjsip_dialog *dlg = sub_tree->dlg;
3418
3419         pjsip_dlg_inc_lock(dlg);
3420         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3421                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree->evsub, sub_tree->state);
3422                 pjsip_dlg_dec_lock(dlg);
3423                 ao2_cleanup(sub_tree);
3424                 return 0;
3425         }
3426
3427         if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) {
3428                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
3429                 set_state_terminated(sub_tree->root);
3430         }
3431
3432         send_notify(sub_tree, 1);
3433
3434         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3435                                 "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3436                                 "Resource: %s", sub_tree->root->resource);
3437
3438         pjsip_dlg_dec_lock(dlg);
3439         ao2_cleanup(sub_tree);
3440         return 0;
3441 }
3442
3443 /*!
3444  * \brief Called whenever an in-dialog SUBSCRIBE is received
3445  *
3446  * This includes both SUBSCRIBE requests that actually refresh the subscription
3447  * as well as SUBSCRIBE requests that end the subscription.
3448  *
3449  * In either case we push serialized_pubsub_on_refresh_timeout to send an
3450  * appropriate NOTIFY request.
3451  */
3452 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
3453                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3454 {
3455         struct sip_subscription_tree *sub_tree;
3456
3457         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3458         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3459                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 );
3460                 return;
3461         }
3462
3463         /* PJSIP will set the evsub's state to terminated before calling into this function
3464          * if the Expires value of the incoming SUBSCRIBE is 0.
3465          */
3466
3467         if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
3468                 sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3469         }
3470
3471         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3472                 /* If we can't push the NOTIFY refreshing task...we'll just go with it. */
3473                 ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n");
3474                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3475                 ao2_ref(sub_tree, -1);
3476         }
3477
3478         if (sub_tree->is_list) {
3479                 pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
3480         }
3481 }
3482
3483 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
3484                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3485 {
3486         struct ast_sip_subscription *sub;
3487
3488         if (!(sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3489                 return;
3490         }
3491
3492         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
3493                         pjsip_evsub_get_state(evsub));
3494 }
3495
3496 static int serialized_pubsub_on_client_refresh(void *userdata)
3497 {
3498         struct sip_subscription_tree *sub_tree = userdata;
3499         pjsip_tx_data *tdata;
3500
3501         if (!sub_tree->evsub) {
3502                 ao2_cleanup(sub_tree);
3503                 return 0;
3504         }
3505
3506         if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
3507                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
3508         } else {
3509                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
3510         }
3511
3512         ao2_cleanup(sub_tree);
3513         return 0;
3514 }
3515
3516 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
3517 {
3518         struct sip_subscription_tree *sub_tree;
3519
3520         if (!(sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3521                 return;
3522         }
3523
3524         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, ao2_bump(sub_tree))) {
3525                 ao2_cleanup(sub_tree);
3526         }
3527 }
3528
3529 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
3530 {
3531         struct sip_subscription_tree *sub_tree;
3532
3533         /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE
3534          * with Expires: 0 arrives to end a subscription, nor does it terminate
3535          * this timer when we send a NOTIFY request in response to receiving such
3536          * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the
3537          * NOTIFY transaction has finished (either through receiving a response
3538          * or through a transaction timeout).
3539          *
3540          * Therefore, it is possible that we can be told that a server timeout
3541          * occurred after we already thought that the subscription had been
3542          * terminated. In such a case, we will have already removed the sub_tree
3543          * from the evsub's mod_data array.
3544          */
3545
3546         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3547         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3548                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 );
3549         return;
3550         }
3551
3552         sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3553         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3554                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3555                 ao2_cleanup(sub_tree);
3556         }
3557 }
3558
3559 static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
3560                                    struct ast_sip_ami *ami,
3561                                    const char *event)
3562 {
3563         struct ast_str *buf;
3564
3565         buf = ast_sip_create_ami_event(event, ami);
3566         if (!buf) {
3567                 return -1;
3568         }
3569
3570         sip_subscription_to_ami(sub_tree, &buf);
3571         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3572         ast_free(buf);
3573         return 0;
3574 }
3575
3576 static int ami_subscription_detail_inbound(struct sip_subscription_tree *sub_tree, void *arg)
3577 {
3578         return sub_tree->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
3579                 sub_tree, arg, "InboundSubscriptionDetail") : 0;
3580 }
3581
3582 static int ami_subscription_detail_outbound(struct sip_subscription_tree *sub_tree, void *arg)
3583 {
3584         return sub_tree->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
3585                 sub_tree, arg, "OutboundSubscriptionDetail") : 0;
3586 }
3587
3588 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
3589 {
3590         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3591         int num;
3592
3593         astman_send_listack(s, m, "Following are Events for each inbound Subscription",
3594                 "start");
3595
3596         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
3597
3598         astman_send_list_complete_start(s, m, "InboundSubscriptionDetailComplete", num);
3599         astman_send_list_complete_end(s);
3600         return 0;
3601 }
3602
3603 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
3604 {
3605         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3606         int num;
3607
3608         astman_send_listack(s, m, "Following are Events for each outbound Subscription",
3609                 "start");
3610
3611         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
3612
3613         astman_send_list_complete_start(s, m, "OutboundSubscriptionDetailComplete", num);
3614         astman_send_list_complete_end(s);
3615         return 0;
3616 }
3617
3618 static int format_ami_resource_lists(void *obj, void *arg, int flags)
3619 {
3620         struct resource_list *list = obj;
3621         struct ast_sip_ami *ami = arg;
3622         struct ast_str *buf;
3623
3624         buf = ast_sip_create_ami_event("ResourceListDetail", ami);
3625         if (!buf) {
3626                 return CMP_STOP;
3627         }
3628
3629         if (ast_sip_sorcery_object_to_ami(list, &buf)) {
3630                 ast_free(buf);
3631                 return CMP_STOP;
3632         }
3633         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3634
3635         ast_free(buf);
3636         return 0;
3637 }
3638
3639 static int ami_show_resource_lists(struct mansession *s, const struct message *m)
3640 {
3641         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3642         int num;
3643         struct ao2_container *lists;
3644
3645         lists = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "resource_list",
3646                         AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
3647
3648         if (!lists || !(num = ao2_container_count(lists))) {
3649                 astman_send_error(s, m, "No resource lists found\n");
3650                 return 0;
3651         }
3652
3653         astman_send_listack(s, m, "A listing of resource lists follows, presented as ResourceListDetail events",
3654                 "start");
3655
3656         ao2_callback(lists, OBJ_NODATA, format_ami_resource_lists, &ami);
3657
3658         astman_send_list_complete_start(s, m, "ResourceListDetailComplete", num);
3659         astman_send_list_complete_end(s);
3660         return 0;
3661 }
3662
3663 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
3664 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
3665
3666 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3667 {
3668         struct subscription_persistence *persistence = obj;
3669
3670         persistence->endpoint = ast_strdup(var->value);
3671         return 0;
3672 }
3673
3674 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
3675 {
3676         const struct subscription_persistence *persistence = obj;
3677
3678         *buf = ast_strdup(persistence->endpoint);
3679         return 0;
3680 }
3681
3682 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3683 {
3684         struct subscription_persistence *persistence = obj;
3685
3686         persistence->tag = ast_strdup(var->value);
3687         return 0;
3688 }
3689
3690 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
3691 {
3692         const struct subscription_persistence *persistence = obj;
3693
3694         *buf = ast_strdup(persistence->tag);
3695         return 0;
3696 }
3697
3698 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3699 {
3700         struct subscription_persistence *persistence = obj;
3701         return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
3702 }
3703
3704 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
3705 {
3706         const struct subscription_persistence *persistence = obj;
3707         return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
3708 }
3709
3710 #define RESOURCE_LIST_INIT_SIZE 4
3711
3712 static void resource_list_destructor(void *obj)
3713 {
3714         struct resource_list *list = obj;
3715         int i;
3716
3717         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
3718                 ast_free((char *) AST_VECTOR_GET(&list->items, i));
3719         }
3720
3721         AST_VECTOR_FREE(&list->items);
3722 }
3723
3724 static void *resource_list_alloc(const char *name)
3725 {
3726         struct resource_list *list;
3727
3728         list = ast_sorcery_generic_alloc(sizeof(*list), resource_list_destructor);
3729         if (!list) {
3730                 return NULL;
3731         }
3732
3733         if (AST_VECTOR_INIT(&list->items, RESOURCE_LIST_INIT_SIZE)) {
3734                 ao2_cleanup(list);
3735                 return NULL;
3736         }
3737
3738         return list;
3739 }
3740
3741 static int item_in_vector(const struct resource_list *list, const char *item)
3742 {
3743         int i;
3744
3745         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
3746                 if (!strcmp(item, AST_VECTOR_GET(&list->items, i))) {
3747                         return 1;
3748                 }
3749         }
3750
3751         return 0;
3752 }
3753
3754 static int list_item_handler(const struct aco_option *opt,
3755                 struct ast_variable *var, void *obj)
3756 {
3757         struct resource_list *list = obj;
3758         char *items = ast_strdupa(var->value);
3759         char *item;
3760
3761         while ((item = ast_strip(strsep(&items, ",")))) {
3762                 if (ast_strlen_zero(item)) {
3763                         continue;
3764                 }
3765
3766                 if (item_in_vector(list, item)) {
3767                         ast_log(LOG_WARNING, "Ignoring duplicated list item '%s'\n", item);
3768                         continue;
3769                 }
3770                 if (AST_VECTOR_APPEND(&list->items, ast_strdup(item))) {
3771                         return -1;
3772                 }
3773         }
3774
3775         return 0;
3776 }
3777
3778 static int list_item_to_str(const void *obj, const intptr_t *args, char **buf)
3779 {
3780         const struct resource_list *list = obj;
3781         int i;
3782         struct ast_str *str = ast_str_create(32);
3783
3784         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
3785                 ast_str_append(&str, 0, "%s,", AST_VECTOR_GET(&list->items, i));
3786         }
3787
3788         /* Chop off trailing comma */
3789         ast_str_truncate(str, -1);
3790         *buf = ast_strdup(ast_str_buffer(str));
3791         ast_free(str);
3792         return 0;
3793 }