res_pjsip_pubsub.c: Fix AMI event list counts.
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="dialog"><para>
152                                                                 This is identical to <replaceable>presence</replaceable>.
153                                                         </para></enum>
154                                                         <enum name="message-summary"><para>
155                                                                 Message-waiting indication (MWI) reporting.
156                                                         </para></enum>
157                                                 </enumlist>
158                                         </description>
159                                 </configOption>
160                                 <configOption name="list_item">
161                                         <synopsis>The name of a resource to report state on</synopsis>
162                                         <description>
163                                                 <para>In general Asterisk looks up list items in the following way:</para>
164                                                 <para>1. Check if the list item refers to another configured resource list.</para>
165                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
166                                                    to find the specified resource.</para>
167                                                 <para>The second part means that the way the list item is specified depends
168                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
169                                                 set to <literal>presence</literal>, then list items should be in the form of
170                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
171                                                 names should be listed.</para>
172                                         </description>
173                                 </configOption>
174                                 <configOption name="full_state" default="no">
175                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
176                                         <description>
177                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
178                                                 a notification that contains the state of all resources in the list. If the option is
179                                                 disabled, Asterisk will construct a notification that only contains the states of
180                                                 resources that have changed.</para>
181                                                 <note>
182                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
183                                                         to send a notification with the states of all resources in the list. When a subscriber
184                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
185                                                         notification.</para>
186                                                 </note>
187                                         </description>
188                                 </configOption>
189                                 <configOption name="notification_batch_interval" default="0">
190                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
191                                         <description>
192                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
193                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
194                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
195                                                 many notifications.</para>
196                                         </description>
197                                 </configOption>
198                         </configObject>
199                         <configObject name="inbound-publication">
200                                 <synopsis>The configuration for inbound publications</synopsis>
201                                 <configOption name="endpoint" default="">
202                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
203                                 </configOption>
204                                 <configOption name="type">
205                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
206                                 </configOption>
207                         </configObject>
208                 </configFile>
209         </configInfo>
210  ***/
211
212 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
213
214 static struct pjsip_module pubsub_module = {
215         .name = { "PubSub Module", 13 },
216         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
217         .on_rx_request = pubsub_on_rx_request,
218 };
219
220 #define MOD_DATA_PERSISTENCE "sub_persistence"
221 #define MOD_DATA_MSG "sub_msg"
222
223 static const pj_str_t str_event_name = { "Event", 5 };
224
225 /*! \brief Scheduler used for automatically expiring publications */
226 static struct ast_sched_context *sched;
227
228 /*! \brief Number of buckets for publications (on a per handler) */
229 #define PUBLICATIONS_BUCKETS 37
230
231 /*! \brief Default expiration time for PUBLISH if one is not specified */
232 #define DEFAULT_PUBLISH_EXPIRES 3600
233
234 /*! \brief Number of buckets for subscription datastore */
235 #define DATASTORE_BUCKETS 53
236
237 /*! \brief Default expiration for subscriptions */
238 #define DEFAULT_EXPIRES 3600
239
240 /*! \brief Defined method for PUBLISH */
241 const pjsip_method pjsip_publish_method =
242 {
243         PJSIP_OTHER_METHOD,
244         { "PUBLISH", 7 }
245 };
246
247 /*!
248  * \brief The types of PUBLISH messages defined in RFC 3903
249  */
250 enum sip_publish_type {
251         /*!
252          * \brief Unknown
253          *
254          * \details
255          * This actually is not defined in RFC 3903. We use this as a constant
256          * to indicate that an incoming PUBLISH does not fit into any of the
257          * other categories and is thus invalid.
258          */
259         SIP_PUBLISH_UNKNOWN,
260
261         /*!
262          * \brief Initial
263          *
264          * \details
265          * The first PUBLISH sent. This will contain a non-zero Expires header
266          * as well as a body that indicates the current state of the endpoint
267          * that has sent the message. The initial PUBLISH is the only type
268          * of PUBLISH to not contain a Sip-If-Match header in it.
269          */
270         SIP_PUBLISH_INITIAL,
271
272         /*!
273          * \brief Refresh
274          *
275          * \details
276          * Used to keep a published state from expiring. This will contain a
277          * non-zero Expires header but no body since its purpose is not to
278          * update state.
279          */
280         SIP_PUBLISH_REFRESH,
281
282         /*!
283          * \brief Modify
284          *
285          * \details
286          * Used to change state from its previous value. This will contain
287          * a body updating the published state. May or may not contain an
288          * Expires header.
289          */
290         SIP_PUBLISH_MODIFY,
291
292         /*!
293          * \brief Remove
294          *
295          * \details
296          * Used to remove published state from an ESC. This will contain
297          * an Expires header set to 0 and likely no body.
298          */
299         SIP_PUBLISH_REMOVE,
300 };
301
302 /*!
303  * \brief A vector of strings commonly used throughout this module
304  */
305 AST_VECTOR(resources, const char *);
306
307 /*!
308  * \brief Resource list configuration item
309  */
310 struct resource_list {
311         SORCERY_OBJECT(details);
312         /*! SIP event package the list uses. */
313         char event[32];
314         /*! Strings representing resources in the list. */
315         struct resources items;
316         /*! Indicates if Asterisk sends full or partial state on notifications. */
317         unsigned int full_state;
318         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
319         unsigned int notification_batch_interval;
320 };
321
322 /*!
323  * Used to create new entity IDs by ESCs.
324  */
325 static int esc_etag_counter;
326
327 /*!
328  * \brief Structure representing a SIP publication
329  */
330 struct ast_sip_publication {
331         /*! Publication datastores set up by handlers */
332         struct ao2_container *datastores;
333         /*! \brief Entity tag for the publication */
334         int entity_tag;
335         /*! \brief Handler for this publication */
336         struct ast_sip_publish_handler *handler;
337         /*! \brief The endpoint with which the subscription is communicating */
338         struct ast_sip_endpoint *endpoint;
339         /*! \brief Expiration time of the publication */
340         int expires;
341         /*! \brief Scheduled item for expiration of publication */
342         int sched_id;
343         /*! \brief The resource the publication is to */
344         char *resource;
345         /*! \brief The name of the event type configuration */
346         char *event_configuration_name;
347         /*! \brief Data containing the above */
348         char data[0];
349 };
350
351
352 /*!
353  * \brief Structure used for persisting an inbound subscription
354  */
355 struct subscription_persistence {
356         /*! Sorcery object details */
357         SORCERY_OBJECT(details);
358         /*! The name of the endpoint involved in the subscription */
359         char *endpoint;
360         /*! SIP message that creates the subscription */
361         char packet[PJSIP_MAX_PKT_LEN];
362         /*! Source address of the message */
363         char src_name[PJ_INET6_ADDRSTRLEN];
364         /*! Source port of the message */
365         int src_port;
366         /*! Local transport key type */
367         char transport_key[32];
368         /*! Local transport address */
369         char local_name[PJ_INET6_ADDRSTRLEN];
370         /*! Local transport port */
371         int local_port;
372         /*! Next CSeq to use for message */
373         unsigned int cseq;
374         /*! Local tag of the dialog */
375         char *tag;
376         /*! When this subscription expires */
377         struct timeval expires;
378 };
379
380 /*!
381  * \brief The state of the subscription tree
382  */
383 enum sip_subscription_tree_state {
384         /*! Normal operation */
385         SIP_SUB_TREE_NORMAL = 0,
386         /*! A terminate has been requested by Asterisk, the client, or pjproject */
387         SIP_SUB_TREE_TERMINATE_PENDING,
388         /*! The terminate is in progress */
389         SIP_SUB_TREE_TERMINATE_IN_PROGRESS,
390         /*! The terminate process has finished and the subscription tree is no longer valid */
391         SIP_SUB_TREE_TERMINATED,
392 };
393
394 /*!
395  * \brief A tree of SIP subscriptions
396  *
397  * Because of the ability to subscribe to resource lists, a SIP
398  * subscription can result in a tree of subscriptions being created.
399  * This structure represents the information relevant to the subscription
400  * as a whole, to include the underlying PJSIP structure for the
401  * subscription.
402  */
403 struct sip_subscription_tree {
404         /*! The endpoint with which the subscription is communicating */
405         struct ast_sip_endpoint *endpoint;
406         /*! Serializer on which to place operations for this subscription */
407         struct ast_taskprocessor *serializer;
408         /*! The role for this subscription */
409         enum ast_sip_subscription_role role;
410         /*! Persistence information */
411         struct subscription_persistence *persistence;
412         /*! The underlying PJSIP event subscription structure */
413         pjsip_evsub *evsub;
414         /*! The underlying PJSIP dialog */
415         pjsip_dialog *dlg;
416         /*! Interval to use for batching notifications */
417         unsigned int notification_batch_interval;
418         /*! Scheduler ID for batched notification */
419         int notify_sched_id;
420         /*! Indicator if scheduled batched notification should be sent */
421         unsigned int send_scheduled_notify;
422         /*! The root of the subscription tree */
423         struct ast_sip_subscription *root;
424         /*! Is this subscription to a list? */
425         int is_list;
426         /*! Next item in the list */
427         AST_LIST_ENTRY(sip_subscription_tree) next;
428         /*! Subscription tree state */
429         enum sip_subscription_tree_state state;
430 };
431
432 /*!
433  * \brief Structure representing a "virtual" SIP subscription.
434  *
435  * This structure serves a dual purpose. Structurally, it is
436  * the constructed tree of subscriptions based on the resources
437  * being subscribed to. API-wise, this serves as the handle that
438  * subscription handlers use in order to interact with the pubsub API.
439  */
440 struct ast_sip_subscription {
441         /*! Subscription datastores set up by handlers */
442         struct ao2_container *datastores;
443         /*! The handler for this subscription */
444         const struct ast_sip_subscription_handler *handler;
445         /*! Pointer to the base of the tree */
446         struct sip_subscription_tree *tree;
447         /*! Body generaator for NOTIFYs */
448         struct ast_sip_pubsub_body_generator *body_generator;
449         /*! Vector of child subscriptions */
450         AST_VECTOR(, struct ast_sip_subscription *) children;
451         /*! Saved NOTIFY body text for this subscription */
452         struct ast_str *body_text;
453         /*! Indicator that the body text has changed since the last notification */
454         int body_changed;
455         /*! The current state of the subscription */
456         pjsip_evsub_state subscription_state;
457         /*! For lists, the current version to place in the RLMI body */
458         unsigned int version;
459         /*! For lists, indicates if full state should always be communicated. */
460         unsigned int full_state;
461         /*! URI associated with the subscription */
462         pjsip_sip_uri *uri;
463         /*! Name of resource being subscribed to */
464         char resource[0];
465 };
466
467 /*!
468  * \brief Structure representing a publication resource
469  */
470 struct ast_sip_publication_resource {
471         /*! \brief Sorcery object details */
472         SORCERY_OBJECT(details);
473         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
474         char *endpoint;
475         /*! \brief Mapping for event types to configuration */
476         struct ast_variable *events;
477 };
478
479 static const char *sip_subscription_roles_map[] = {
480         [AST_SIP_SUBSCRIBER] = "Subscriber",
481         [AST_SIP_NOTIFIER] = "Notifier"
482 };
483
484 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
485
486 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
487 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
488
489 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
490 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
491                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
492 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
493                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
494 static void pubsub_on_client_refresh(pjsip_evsub *sub);
495 static void pubsub_on_server_timeout(pjsip_evsub *sub);
496  
497 static pjsip_evsub_user pubsub_cb = {
498         .on_evsub_state = pubsub_on_evsub_state,
499         .on_rx_refresh = pubsub_on_rx_refresh,
500         .on_rx_notify = pubsub_on_rx_notify,
501         .on_client_refresh = pubsub_on_client_refresh,
502         .on_server_timeout = pubsub_on_server_timeout,
503 };
504
505 /*! \brief Destructor for publication resource */
506 static void publication_resource_destroy(void *obj)
507 {
508         struct ast_sip_publication_resource *resource = obj;
509
510         ast_free(resource->endpoint);
511         ast_variables_destroy(resource->events);
512 }
513
514 /*! \brief Allocator for publication resource */
515 static void *publication_resource_alloc(const char *name)
516 {
517         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
518 }
519
520 /*! \brief Destructor for subscription persistence */
521 static void subscription_persistence_destroy(void *obj)
522 {
523         struct subscription_persistence *persistence = obj;
524
525         ast_free(persistence->endpoint);
526         ast_free(persistence->tag);
527 }
528
529 /*! \brief Allocator for subscription persistence */
530 static void *subscription_persistence_alloc(const char *name)
531 {
532         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
533 }
534
535 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
536 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
537 {
538         char tag[PJ_GUID_STRING_LENGTH + 1];
539
540         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
541          * look it up by id at all.
542          */
543         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
544                 "subscription_persistence", NULL);
545
546         pjsip_dialog *dlg = sub_tree->dlg;
547
548         if (!persistence) {
549                 return NULL;
550         }
551
552         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
553         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
554         persistence->tag = ast_strdup(tag);
555
556         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
557         return persistence;
558 }
559
560 /*! \brief Function which updates persistence information of a subscription in sorcery */
561 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
562         pjsip_rx_data *rdata)
563 {
564         pjsip_dialog *dlg;
565
566         if (!sub_tree->persistence) {
567                 return;
568         }
569
570         dlg = sub_tree->dlg;
571         sub_tree->persistence->cseq = dlg->local.cseq;
572
573         if (rdata) {
574                 int expires;
575                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
576
577                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
578                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
579
580                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
581                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
582                  * will always point to the proper SIP message that is to be processed. When updating subscription
583                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
584                  * only ever have a single SIP message on it, and so we base persistence on that.
585                  */
586                 if (rdata->msg_info.msg_buf) {
587                         ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
588                                         MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
589                 } else {
590                         ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
591                                         sizeof(sub_tree->persistence->packet));
592                 }
593                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
594                                 sizeof(sub_tree->persistence->src_name));
595                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
596                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
597                         sizeof(sub_tree->persistence->transport_key));
598                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
599                         sizeof(sub_tree->persistence->local_name));
600                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
601         }
602
603         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
604 }
605
606 /*! \brief Function which removes persistence of a subscription from sorcery */
607 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
608 {
609         if (!sub_tree->persistence) {
610                 return;
611         }
612
613         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
614         ao2_ref(sub_tree->persistence, -1);
615         sub_tree->persistence = NULL;
616 }
617
618
619 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
620 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
621                 size_t num_accept, const char *body_type);
622
623 /*! \brief Retrieve a handler using the Event header of an rdata message */
624 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
625 {
626         pjsip_event_hdr *event_header;
627         char event[32];
628         struct ast_sip_subscription_handler *handler;
629
630         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
631         if (!event_header) {
632                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
633                 return NULL;
634         }
635         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
636
637         handler = find_sub_handler_for_event_name(event);
638         if (!handler) {
639                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
640         }
641
642         return handler;
643 }
644
645 /*!
646  * \brief Accept headers that are exceptions to the rule
647  *
648  * Typically, when a SUBSCRIBE arrives, we attempt to find a
649  * body generator that matches one of the Accept headers in
650  * the request. When subscribing to a single resource, this works
651  * great. However, when subscribing to a list, things work
652  * differently. Most Accept header values are fine, but there
653  * are a couple that are endemic to resource lists that need
654  * to be ignored when searching for a body generator to use
655  * for the individual resources of the subscription.
656  */
657 const char *accept_exceptions[] =  {
658         "multipart/related",
659         "application/rlmi+xml",
660 };
661
662 /*!
663  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
664  *
665  * \retval 1 This Accept header value is an exception to the rule.
666  * \retval 0 This Accept header is not an exception to the rule.
667  */
668 static int exceptional_accept(const pj_str_t *accept)
669 {
670         int i;
671
672         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
673                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
674                         return 1;
675                 }
676         }
677
678         return 0;
679 }
680
681 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
682 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
683         const struct ast_sip_subscription_handler *handler)
684 {
685         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
686         char accept[AST_SIP_MAX_ACCEPT][64];
687         size_t num_accept_headers = 0;
688
689         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
690                 int i;
691
692                 for (i = 0; i < accept_header->count; ++i) {
693                         if (!exceptional_accept(&accept_header->values[i])) {
694                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
695                                 ++num_accept_headers;
696                         }
697                 }
698         }
699
700         if (num_accept_headers == 0) {
701                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
702                  * the default accept type for the event package is to be used.
703                  */
704                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
705                 num_accept_headers = 1;
706         }
707
708         return find_body_generator(accept, num_accept_headers, handler->body_type);
709 }
710
711 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
712  *
713  *  \retval 1 rdata has an eventlist containing supported header
714  *  \retval 0 rdata doesn't have an eventlist containing supported header
715  */
716 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
717 {
718         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
719
720         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
721                 int i;
722
723                 for (i = 0; i < supported_header->count; i++) {
724                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
725                                 return 1;
726                         }
727                 }
728         }
729
730         return 0;
731 }
732
733 struct resource_tree;
734
735 /*!
736  * \brief A node for a resource tree.
737  */
738 struct tree_node {
739         AST_VECTOR(, struct tree_node *) children;
740         unsigned int full_state;
741         char resource[0];
742 };
743
744 /*!
745  * \brief Helper function for retrieving a resource list for a given event.
746  *
747  * This will retrieve a resource list that corresponds to the resource and event provided.
748  *
749  * \param resource The name of the resource list to retrieve
750  * \param event The expected event name on the resource list
751  */
752 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
753 {
754         struct resource_list *list;
755
756         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
757         if (!list) {
758                 return NULL;
759         }
760
761         if (strcmp(list->event, event)) {
762                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
763                                 resource, list->event, event);
764                 ao2_cleanup(list);
765                 return NULL;
766         }
767
768         return list;
769 }
770
771 /*!
772  * \brief Allocate a tree node
773  *
774  * In addition to allocating and initializing the tree node, the node is also added
775  * to the vector of visited resources. See \ref build_resource_tree for more information
776  * on the visited resources.
777  *
778  * \param resource The name of the resource for this tree node.
779  * \param visited The vector of resources that have been visited.
780  * \param if allocating a list, indicate whether full state is requested in notifications.
781  * \retval NULL Allocation failure.
782  * \retval non-NULL The newly-allocated tree_node
783  */
784 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
785 {
786         struct tree_node *node;
787
788         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
789         if (!node) {
790                 return NULL;
791         }
792
793         strcpy(node->resource, resource);
794         if (AST_VECTOR_INIT(&node->children, 4)) {
795                 ast_free(node);
796                 return NULL;
797         }
798         node->full_state = full_state;
799
800         if (visited) {
801                 AST_VECTOR_APPEND(visited, resource);
802         }
803         return node;
804 }
805
806 /*!
807  * \brief Destructor for a tree node
808  *
809  * This function calls recursively in order to destroy
810  * all nodes lower in the tree from the given node in
811  * addition to the node itself.
812  *
813  * \param node The node to destroy.
814  */
815 static void tree_node_destroy(struct tree_node *node)
816 {
817         int i;
818         if (!node) {
819                 return;
820         }
821
822         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
823                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
824         }
825         AST_VECTOR_FREE(&node->children);
826         ast_free(node);
827 }
828
829 /*!
830  * \brief Determine if this resource has been visited already
831  *
832  * See \ref build_resource_tree for more information
833  *
834  * \param resource The resource currently being visited
835  * \param visited The resources that have previously been visited
836  */
837 static int have_visited(const char *resource, struct resources *visited)
838 {
839         int i;
840
841         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
842                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
843                         return 1;
844                 }
845         }
846
847         return 0;
848 }
849
850 /*!
851  * \brief Build child nodes for a given parent.
852  *
853  * This iterates through the items on a resource list and creates tree nodes for each one. The
854  * tree nodes created are children of the supplied parent node. If an item in the resource
855  * list is itself a list, then this function is called recursively to provide children for
856  * the the new node.
857  *
858  * If an item in a resource list is not a list, then the supplied subscription handler is
859  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
860  * is used to determine if the node can be added to the tree or not.
861  *
862  * If a parent node ends up having no child nodes added under it, then the parent node is
863  * pruned from the tree.
864  *
865  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
866  * \param handler The subscription handler for leaf nodes in the tree.
867  * \param list The configured resource list from which the child node is being built.
868  * \param parent The parent node for these children.
869  * \param visited The resources that have already been visited.
870  */
871 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
872                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
873 {
874         int i;
875
876         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
877                 struct tree_node *current;
878                 struct resource_list *child_list;
879                 const char *resource = AST_VECTOR_GET(&list->items, i);
880
881                 if (have_visited(resource, visited)) {
882                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
883                         continue;
884                 }
885
886                 child_list = retrieve_resource_list(resource, list->event);
887                 if (!child_list) {
888                         int resp = handler->notifier->new_subscribe(endpoint, resource);
889                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
890                                 current = tree_node_alloc(resource, visited, 0);
891                                 if (!current) {
892                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
893                                                         "allocation error afterwards\n", resource);
894                                         continue;
895                                 }
896                                 ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
897                                                 resource, parent->resource);
898                                 AST_VECTOR_APPEND(&parent->children, current);
899                         } else {
900                                 ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n",
901                                                 resource, resp);
902                         }
903                 } else {
904                         ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource);
905                         current = tree_node_alloc(resource, visited, child_list->full_state);
906                         if (!current) {
907                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
908                                 continue;
909                         }
910                         build_node_children(endpoint, handler, child_list, current, visited);
911                         if (AST_VECTOR_SIZE(&current->children) > 0) {
912                                 ast_debug(1, "List %s had no successful children.\n", resource);
913                                 AST_VECTOR_APPEND(&parent->children, current);
914                         } else {
915                                 ast_debug(2, "List %s had successful children. Adding to parent %s\n",
916                                                 resource, parent->resource);
917                                 tree_node_destroy(current);
918                         }
919                         ao2_cleanup(child_list);
920                 }
921         }
922 }
923
924 /*!
925  * \brief A resource tree
926  *
927  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
928  * be a resource list. If this is the case, the resource list may contain resources
929  * that are themselves lists. The structure needed to hold the resources is
930  * a tree.
931  *
932  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
933  * to the individual resources in the tree would be successful or not. Any successful
934  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
935  * result in no node being created.
936  *
937  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
938  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
939  */
940 struct resource_tree {
941         struct tree_node *root;
942         unsigned int notification_batch_interval;
943 };
944
945 /*!
946  * \brief Destroy a resource tree.
947  *
948  * This function makes no assumptions about how the tree itself was
949  * allocated and does not attempt to free the tree itself. Callers
950  * of this function are responsible for freeing the tree.
951  *
952  * \param tree The tree to destroy.
953  */
954 static void resource_tree_destroy(struct resource_tree *tree)
955 {
956         if (tree) {
957                 tree_node_destroy(tree->root);
958         }
959 }
960
961 /*!
962  * \brief Build a resource tree
963  *
964  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
965  *
966  * This function also creates a container that has all resources that have been visited during
967  * creation of the tree, whether those resources resulted in a tree node being created or not.
968  * Keeping this container of visited resources allows for misconfigurations such as loops in
969  * the tree or duplicated resources to be detected.
970  *
971  * \param endpoint The endpoint that sent the SUBSCRIBE request.
972  * \param handler The subscription handler for leaf nodes in the tree.
973  * \param resource The resource requested in the SUBSCRIBE request.
974  * \param tree The tree that is to be built.
975  * \param has_eventlist_support
976  *
977  * \retval 200-299 Successfully subscribed to at least one resource.
978  * \retval 300-699 Failure to subscribe to requested resource.
979  */
980 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
981                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
982 {
983         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
984         struct resources visited;
985
986         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
987                 ast_debug(2, "Subscription to resource %s is not to a list\n", resource);
988                 tree->root = tree_node_alloc(resource, NULL, 0);
989                 if (!tree->root) {
990                         return 500;
991                 }
992                 return handler->notifier->new_subscribe(endpoint, resource);
993         }
994
995         ast_debug(2, "Subscription to resource %s is a list\n", resource);
996         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
997                 return 500;
998         }
999
1000         tree->root = tree_node_alloc(resource, &visited, list->full_state);
1001         if (!tree->root) {
1002                 AST_VECTOR_FREE(&visited);
1003                 return 500;
1004         }
1005
1006         tree->notification_batch_interval = list->notification_batch_interval;
1007
1008         build_node_children(endpoint, handler, list, tree->root, &visited);
1009         AST_VECTOR_FREE(&visited);
1010
1011         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
1012                 return 200;
1013         } else {
1014                 return 500;
1015         }
1016 }
1017
1018 static void add_subscription(struct sip_subscription_tree *obj)
1019 {
1020         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1021         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1022 }
1023
1024 static void remove_subscription(struct sip_subscription_tree *obj)
1025 {
1026         struct sip_subscription_tree *i;
1027         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1028         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1029                 if (i == obj) {
1030                         AST_RWLIST_REMOVE_CURRENT(next);
1031                         if (i->root) {
1032                                 ast_debug(2, "Removing subscription to resource %s from list of subscriptions\n",
1033                                                 ast_sip_subscription_get_resource_name(i->root));
1034                         }
1035                         break;
1036                 }
1037         }
1038         AST_RWLIST_TRAVERSE_SAFE_END;
1039 }
1040
1041 static void destroy_subscription(struct ast_sip_subscription *sub)
1042 {
1043         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1044         ast_free(sub->body_text);
1045
1046         AST_VECTOR_FREE(&sub->children);
1047         ao2_cleanup(sub->datastores);
1048         ast_free(sub);
1049 }
1050
1051 static void destroy_subscriptions(struct ast_sip_subscription *root)
1052 {
1053         int i;
1054
1055         if (!root) {
1056                 return;
1057         }
1058
1059         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1060                 struct ast_sip_subscription *child;
1061
1062                 child = AST_VECTOR_GET(&root->children, i);
1063                 destroy_subscriptions(child);
1064         }
1065
1066         destroy_subscription(root);
1067 }
1068
1069 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1070                 const char *resource, struct sip_subscription_tree *tree)
1071 {
1072         struct ast_sip_subscription *sub;
1073         pjsip_sip_uri *contact_uri;
1074
1075         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1076         if (!sub) {
1077                 return NULL;
1078         }
1079         strcpy(sub->resource, resource); /* Safe */
1080
1081         sub->datastores = ast_datastores_alloc();
1082         if (!sub->datastores) {
1083                 destroy_subscription(sub);
1084                 return NULL;
1085         }
1086
1087         sub->body_text = ast_str_create(128);
1088         if (!sub->body_text) {
1089                 destroy_subscription(sub);
1090                 return NULL;
1091         }
1092
1093         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1094         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1095         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1096         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1097
1098         sub->handler = handler;
1099         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1100         sub->tree = ao2_bump(tree);
1101
1102         return sub;
1103 }
1104
1105 /*!
1106  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1107  *
1108  * \param handler The handler to supply to leaf subscriptions.
1109  * \param resource The requested resource for this subscription.
1110  * \param generator Body generator to use for leaf subscriptions.
1111  * \param tree The root of the subscription tree.
1112  * \param current The tree node that corresponds to the subscription being created.
1113  */
1114 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1115                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1116                 struct sip_subscription_tree *tree, struct tree_node *current)
1117 {
1118         int i;
1119         struct ast_sip_subscription *sub;
1120
1121         sub = allocate_subscription(handler, resource, tree);
1122         if (!sub) {
1123                 return NULL;
1124         }
1125
1126         sub->full_state = current->full_state;
1127         sub->body_generator = generator;
1128         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1129
1130         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1131                 struct ast_sip_subscription *child;
1132                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1133
1134                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1135                                 tree, child_node);
1136
1137                 if (!child) {
1138                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1139                                         child_node->resource);
1140                         continue;
1141                 }
1142
1143                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1144                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1145                                         child_node->resource);
1146                 }
1147         }
1148
1149         return sub;
1150 }
1151
1152 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1153 {
1154         int i;
1155
1156         if (!sub) {
1157                 return;
1158         }
1159
1160         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1161                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1162                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1163                 }
1164                 return;
1165         }
1166
1167         /* We notify subscription shutdown only on the tree leaves. */
1168         if (sub->handler->subscription_shutdown) {
1169                 sub->handler->subscription_shutdown(sub);
1170         }
1171 }
1172 static int subscription_unreference_dialog(void *obj)
1173 {
1174         struct sip_subscription_tree *sub_tree = obj;
1175
1176         /* This is why we keep the dialog on the subscription. When the subscription
1177          * is destroyed, there is no guarantee that the underlying dialog is ready
1178          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1179          * either. The dialog could be destroyed before our subscription is. We fix
1180          * this problem by keeping a reference to the dialog until it is time to
1181          * destroy the subscription. We need to have the dialog available when the
1182          * subscription is destroyed so that we can guarantee that our attempt to
1183          * remove the serializer will be successful.
1184          */
1185         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1186         sub_tree->dlg = NULL;
1187
1188         return 0;
1189 }
1190
1191 static void subscription_tree_destructor(void *obj)
1192 {
1193         struct sip_subscription_tree *sub_tree = obj;
1194
1195         ast_debug(3, "Destroying subscription tree %p\n", sub_tree);
1196
1197         ao2_cleanup(sub_tree->endpoint);
1198
1199         destroy_subscriptions(sub_tree->root);
1200
1201         if (sub_tree->dlg) {
1202                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1203         }
1204
1205         ast_taskprocessor_unreference(sub_tree->serializer);
1206         ast_module_unref(ast_module_info->self);
1207 }
1208
1209 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1210 {
1211         ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree);
1212         ao2_cleanup(sub->tree);
1213 }
1214
1215 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1216 {
1217         sub_tree->dlg = dlg;
1218         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1219         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1220         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1221         pjsip_dlg_inc_session(dlg, &pubsub_module);
1222 }
1223
1224 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1225 {
1226         struct sip_subscription_tree *sub_tree;
1227
1228         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1229         if (!sub_tree) {
1230                 return NULL;
1231         }
1232
1233         ast_module_ref(ast_module_info->self);
1234
1235         if (rdata) {
1236                 /*
1237                  * We must continue using the serializer that the original
1238                  * SUBSCRIBE came in on for the dialog.  There may be
1239                  * retransmissions already enqueued in the original
1240                  * serializer that can result in reentrancy and message
1241                  * sequencing problems.
1242                  */
1243                 sub_tree->serializer = ast_sip_get_distributor_serializer(rdata);
1244         } else {
1245                 char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1246
1247                 /* Create name with seq number appended. */
1248                 ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1249                         ast_sorcery_object_get_id(endpoint));
1250
1251                 sub_tree->serializer = ast_sip_create_serializer(tps_name);
1252         }
1253         if (!sub_tree->serializer) {
1254                 ao2_ref(sub_tree, -1);
1255                 return NULL;
1256         }
1257
1258         sub_tree->endpoint = ao2_bump(endpoint);
1259         sub_tree->notify_sched_id = -1;
1260
1261         return sub_tree;
1262 }
1263
1264 /*!
1265  * \brief Create a subscription tree based on a resource tree.
1266  *
1267  * Using the previously-determined valid resources in the provided resource tree,
1268  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1269  * subscription tree is a real subscription, and the rest in the tree are
1270  * virtual subscriptions.
1271  *
1272  * \param handler The handler to use for leaf subscriptions
1273  * \param endpoint The endpoint that sent the SUBSCRIBE request
1274  * \param rdata The SUBSCRIBE content
1275  * \param resource The requested resource in the SUBSCRIBE request
1276  * \param generator The body generator to use in leaf subscriptions
1277  * \param tree The resource tree on which the subscription tree is based
1278  * \param dlg_status[out] The result of attempting to create a dialog.
1279  *
1280  * \retval NULL Could not create the subscription tree
1281  * \retval non-NULL The root of the created subscription tree
1282  */
1283
1284 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1285                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1286                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1287                 pj_status_t *dlg_status)
1288 {
1289         struct sip_subscription_tree *sub_tree;
1290         pjsip_dialog *dlg;
1291         struct subscription_persistence *persistence;
1292
1293         sub_tree = allocate_subscription_tree(endpoint, rdata);
1294         if (!sub_tree) {
1295                 *dlg_status = PJ_ENOMEM;
1296                 return NULL;
1297         }
1298         sub_tree->role = AST_SIP_NOTIFIER;
1299
1300         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1301         if (!dlg) {
1302                 if (*dlg_status != PJ_EEXISTS) {
1303                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1304                 }
1305                 ao2_ref(sub_tree, -1);
1306                 return NULL;
1307         }
1308
1309         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1310                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1311         if (persistence) {
1312                 /* Update the created dialog with the persisted information */
1313                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1314                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1315                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1316                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1317                 dlg->local.cseq = persistence->cseq;
1318                 dlg->remote.cseq = persistence->cseq;
1319         }
1320
1321         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1322         subscription_setup_dialog(sub_tree, dlg);
1323
1324 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
1325         pjsip_evsub_add_ref(sub_tree->evsub);
1326 #endif
1327
1328         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1329                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1330
1331         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1332
1333         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1334         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1335                 sub_tree->is_list = 1;
1336         }
1337
1338         add_subscription(sub_tree);
1339
1340         return sub_tree;
1341 }
1342
1343 static int initial_notify_task(void *obj);
1344 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1345
1346 /*! Persistent subscription recreation continuation under distributor serializer data */
1347 struct persistence_recreate_data {
1348         struct subscription_persistence *persistence;
1349         pjsip_rx_data *rdata;
1350 };
1351
1352 /*!
1353  * \internal
1354  * \brief subscription_persistence_recreate continuation under distributor serializer.
1355  * \since 13.10.0
1356  *
1357  * \retval 0 on success.
1358  * \retval -1 on error.
1359  */
1360 static int sub_persistence_recreate(void *obj)
1361 {
1362         struct persistence_recreate_data *recreate_data = obj;
1363         struct subscription_persistence *persistence = recreate_data->persistence;
1364         pjsip_rx_data *rdata = recreate_data->rdata;
1365         struct ast_sip_endpoint *endpoint;
1366         struct sip_subscription_tree *sub_tree;
1367         struct ast_sip_pubsub_body_generator *generator;
1368         struct ast_sip_subscription_handler *handler;
1369         char *resource;
1370         pjsip_sip_uri *request_uri;
1371         size_t resource_size;
1372         int resp;
1373         struct resource_tree tree;
1374         pjsip_expires_hdr *expires_header;
1375
1376         request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri);
1377         resource_size = pj_strlen(&request_uri->user) + 1;
1378         resource = ast_alloca(resource_size);
1379         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1380
1381         /*
1382          * We may want to match without any user options getting
1383          * in the way.
1384          */
1385         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
1386
1387         handler = subscription_get_handler_from_rdata(rdata);
1388         if (!handler || !handler->notifier) {
1389                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n",
1390                         persistence->endpoint);
1391                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1392                 return 0;
1393         }
1394
1395         generator = subscription_get_generator_from_rdata(rdata, handler);
1396         if (!generator) {
1397                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n",
1398                         persistence->endpoint);
1399                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1400                 return 0;
1401         }
1402
1403         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1404                 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1405
1406         /* Getting the endpoint may take some time that can affect the expiration. */
1407         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint",
1408                 persistence->endpoint);
1409         if (!endpoint) {
1410                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n",
1411                         persistence->endpoint);
1412                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1413                 ao2_ref(endpoint, -1);
1414                 return 0;
1415         }
1416
1417         /* Update the expiration header with the new expiration */
1418         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES,
1419                 rdata->msg_info.msg->hdr.next);
1420         if (!expires_header) {
1421                 expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0);
1422                 if (!expires_header) {
1423                         ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n",
1424                                 persistence->endpoint);
1425                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1426                         ao2_ref(endpoint, -1);
1427                         return 0;
1428                 }
1429                 pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header);
1430         }
1431         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1432         if (expires_header->ivalue <= 0) {
1433                 /* The subscription expired since we started recreating the subscription. */
1434                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1435                 ao2_ref(endpoint, -1);
1436                 return 0;
1437         }
1438
1439         memset(&tree, 0, sizeof(tree));
1440         resp = build_resource_tree(endpoint, handler, resource, &tree,
1441                 ast_sip_pubsub_has_eventlist_support(rdata));
1442         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1443                 pj_status_t dlg_status;
1444
1445                 sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator,
1446                         &tree, &dlg_status);
1447                 if (!sub_tree) {
1448                         if (dlg_status != PJ_EEXISTS) {
1449                                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n",
1450                                         persistence->endpoint);
1451                                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1452                         }
1453                 } else {
1454                         sub_tree->persistence = ao2_bump(persistence);
1455                         subscription_persistence_update(sub_tree, rdata);
1456                         if (ast_sip_push_task(sub_tree->serializer, initial_notify_task,
1457                                 ao2_bump(sub_tree))) {
1458                                 /* Could not send initial subscribe NOTIFY */
1459                                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1460                                 ao2_ref(sub_tree, -1);
1461                         }
1462                 }
1463         } else {
1464                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1465         }
1466         resource_tree_destroy(&tree);
1467         ao2_ref(endpoint, -1);
1468
1469         return 0;
1470 }
1471
1472 /*! \brief Callback function to perform the actual recreation of a subscription */
1473 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1474 {
1475         struct subscription_persistence *persistence = obj;
1476         pj_pool_t *pool = arg;
1477         struct ast_taskprocessor *serializer;
1478         pjsip_rx_data rdata;
1479         struct persistence_recreate_data recreate_data;
1480
1481         /* If this subscription has already expired remove it */
1482         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1483                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1484                 return 0;
1485         }
1486
1487         memset(&rdata, 0, sizeof(rdata));
1488         pj_pool_reset(pool);
1489         rdata.tp_info.pool = pool;
1490
1491         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1492                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1493                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n",
1494                         persistence->endpoint);
1495                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1496                 return 0;
1497         }
1498
1499         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1500                 ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n",
1501                         persistence->endpoint);
1502                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1503                 return 0;
1504         }
1505
1506         /* Continue the remainder in the distributor serializer */
1507         serializer = ast_sip_get_distributor_serializer(&rdata);
1508         if (!serializer) {
1509                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n",
1510                         persistence->endpoint);
1511                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1512                 return 0;
1513         }
1514         recreate_data.persistence = persistence;
1515         recreate_data.rdata = &rdata;
1516         if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) {
1517                 ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n",
1518                         persistence->endpoint);
1519                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1520         }
1521         ast_taskprocessor_unreference(serializer);
1522
1523         return 0;
1524 }
1525
1526 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1527 static int subscription_persistence_load(void *data)
1528 {
1529         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1530                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1531         pj_pool_t *pool;
1532
1533         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1534                 PJSIP_POOL_RDATA_INC);
1535         if (!pool) {
1536                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1537                 return 0;
1538         }
1539
1540         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1541
1542         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1543
1544         ao2_ref(persisted_subscriptions, -1);
1545         return 0;
1546 }
1547
1548 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1549 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1550 {
1551         struct ast_json_payload *payload;
1552         const char *type;
1553
1554         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1555                 return;
1556         }
1557
1558         payload = stasis_message_data(message);
1559         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1560
1561         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1562          * recreate SIP subscriptions.
1563          */
1564         if (strcmp(type, "FullyBooted")) {
1565                 return;
1566         }
1567
1568         /* This has to be here so the subscription is recreated when the body generator is available */
1569         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1570
1571         /* Once the system is fully booted we don't care anymore */
1572         stasis_unsubscribe(sub);
1573 }
1574
1575 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1576
1577 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1578 {
1579         int num = 0;
1580         struct sip_subscription_tree *i;
1581         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1582
1583         if (!on_subscription) {
1584                 return num;
1585         }
1586
1587         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1588                 if (on_subscription(i, arg)) {
1589                         break;
1590                 }
1591                 ++num;
1592         }
1593         return num;
1594 }
1595
1596 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1597                                     struct ast_str **buf)
1598 {
1599         char str[256];
1600         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1601
1602         ast_str_append(buf, 0, "Role: %s\r\n",
1603                        sip_subscription_roles_map[sub_tree->role]);
1604         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1605                        ast_sorcery_object_get_id(sub_tree->endpoint));
1606
1607         if (sub_tree->dlg) {
1608                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1609         } else {
1610                 ast_copy_string(str, "<unknown>", sizeof(str));
1611         }
1612         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1613
1614         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1615
1616         ast_callerid_merge(str, sizeof(str),
1617                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1618                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1619                            "Unknown");
1620
1621         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1622
1623         /* XXX This needs to be done recursively for lists */
1624         if (sub_tree->root->handler->to_ami) {
1625                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1626         }
1627 }
1628
1629
1630 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1631 {
1632         pjsip_dialog *dlg;
1633         pjsip_msg *msg;
1634         pj_str_t name;
1635
1636         dlg = sub->tree->dlg;
1637         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1638         pj_cstr(&name, header);
1639
1640         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1641 }
1642
1643 /*!
1644  * \internal
1645  * \brief Wrapper for pjsip_evsub_send_request
1646  *
1647  * This function (re)sets the transport before sending to catch cases
1648  * where the transport might have changed.
1649  *
1650  * If pjproject gives us the ability to resend, we'll only reset the transport
1651  * if PJSIP_ETPNOTAVAIL is returned from send.
1652  *
1653  * \returns pj_status_t
1654  */
1655 static pj_status_t internal_pjsip_evsub_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1656 {
1657         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
1658
1659         ast_sip_set_tpselector_from_transport_name(sub_tree->endpoint->transport, &selector);
1660         pjsip_dlg_set_transport(sub_tree->dlg, &selector);
1661
1662         return pjsip_evsub_send_request(sub_tree->evsub, tdata);
1663 }
1664
1665 /* XXX This function is not used. */
1666 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1667                 struct ast_sip_endpoint *endpoint, const char *resource)
1668 {
1669         struct ast_sip_subscription *sub;
1670         pjsip_dialog *dlg;
1671         struct ast_sip_contact *contact;
1672         pj_str_t event;
1673         pjsip_tx_data *tdata;
1674         pjsip_evsub *evsub;
1675         struct sip_subscription_tree *sub_tree = NULL;
1676
1677         sub_tree = allocate_subscription_tree(endpoint, NULL);
1678         if (!sub_tree) {
1679                 return NULL;
1680         }
1681
1682         sub = allocate_subscription(handler, resource, sub_tree);
1683         if (!sub) {
1684                 ao2_cleanup(sub_tree);
1685                 return NULL;
1686         }
1687
1688         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1689         if (!contact || ast_strlen_zero(contact->uri)) {
1690                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1691                                 ast_sorcery_object_get_id(endpoint));
1692                 ao2_ref(sub_tree, -1);
1693                 ao2_cleanup(contact);
1694                 return NULL;
1695         }
1696
1697         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1698         ao2_cleanup(contact);
1699         if (!dlg) {
1700                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1701                 ao2_ref(sub_tree, -1);
1702                 return NULL;
1703         }
1704
1705         pj_cstr(&event, handler->event_name);
1706         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1707         subscription_setup_dialog(sub_tree, dlg);
1708
1709         evsub = sub_tree->evsub;
1710
1711         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1712                 internal_pjsip_evsub_send_request(sub_tree, tdata);
1713         } else {
1714                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1715                  * being called and terminating the subscription. Therefore, we don't
1716                  * need to decrease the reference count of sub here.
1717                  */
1718                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1719                 ao2_ref(sub_tree, -1);
1720                 return NULL;
1721         }
1722
1723         add_subscription(sub_tree);
1724
1725         return sub;
1726 }
1727
1728 pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub)
1729 {
1730         ast_assert(sub->tree->dlg != NULL);
1731         return sub->tree->dlg;
1732 }
1733
1734 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1735 {
1736         ast_assert(sub->tree->endpoint != NULL);
1737         return ao2_bump(sub->tree->endpoint);
1738 }
1739
1740 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1741 {
1742         ast_assert(sub->tree->serializer != NULL);
1743         return sub->tree->serializer;
1744 }
1745
1746 /*!
1747  * \brief Pre-allocate a buffer for the transmission
1748  *
1749  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1750  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1751  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1752  * packet, then we get told the message is too long to be sent.
1753  *
1754  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1755  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1756  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1757  * if the message will fit, and resizing the buffer as required.
1758  *
1759  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1760  * it at 64000 for a couple of reasons:
1761  * 1) Allocating more than 64K at a time is hard to justify
1762  * 2) If the message goes through proxies, those proxies will want to add Via and
1763  *    Record-Route headers, making the message even larger. Giving some space for
1764  *    those headers is a nice thing to do.
1765  *
1766  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1767  * going to impose the same 64K limit as a memory savings.
1768  *
1769  * \param tdata The tdata onto which to allocate a buffer
1770  * \retval 0 Success
1771  * \retval -1 The message is too large
1772  */
1773 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1774 {
1775         int buf_size;
1776         int size = -1;
1777         char *buf;
1778
1779         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1780                 buf = pj_pool_alloc(tdata->pool, buf_size);
1781                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1782         }
1783
1784         if (size == -1) {
1785                 return -1;
1786         }
1787
1788         tdata->buf.start = buf;
1789         tdata->buf.cur = tdata->buf.start;
1790         tdata->buf.end = tdata->buf.start + buf_size;
1791
1792         return 0;
1793 }
1794
1795 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1796 {
1797 #ifdef TEST_FRAMEWORK
1798         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1799         pjsip_evsub *evsub = sub_tree->evsub;
1800 #endif
1801         int res;
1802
1803         if (allocate_tdata_buffer(tdata)) {
1804                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1805                 pjsip_tx_data_dec_ref(tdata);
1806                 return -1;
1807         }
1808
1809         res = internal_pjsip_evsub_send_request(sub_tree, tdata);
1810
1811         subscription_persistence_update(sub_tree, NULL);
1812
1813         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1814                 "StateText: %s\r\n"
1815                 "Endpoint: %s\r\n",
1816                 pjsip_evsub_get_state_name(evsub),
1817                 ast_sorcery_object_get_id(endpoint));
1818
1819         return (res == PJ_SUCCESS ? 0 : -1);
1820 }
1821
1822 /*!
1823  * \brief Add a resource XML element to an RLMI body
1824  *
1825  * Each resource element represents a subscribed resource in the list. This function currently
1826  * will unconditionally add an instance element to each created resource element. Instance
1827  * elements refer to later parts in the multipart body.
1828  *
1829  * \param pool PJLIB allocation pool
1830  * \param cid Content-ID header of the resource
1831  * \param resource_name Name of the resource
1832  * \param resource_uri URI of the resource
1833  * \param state State of the subscribed resource
1834  */
1835 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1836                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1837 {
1838         static pj_str_t cid_name = { "cid", 3 };
1839         pj_xml_node *resource;
1840         pj_xml_node *name;
1841         pj_xml_node *instance;
1842         pj_xml_attr *cid_attr;
1843         char id[6];
1844         char uri[PJSIP_MAX_URL_SIZE];
1845
1846         /* This creates a string representing the Content-ID without the enclosing < > */
1847         const pj_str_t cid_stripped = {
1848                 .ptr = cid->hvalue.ptr + 1,
1849                 .slen = cid->hvalue.slen - 2,
1850         };
1851
1852         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1853         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1854         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1855
1856         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1857         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1858
1859         pj_strdup2(pool, &name->content, resource_name);
1860
1861         ast_generate_random_string(id, sizeof(id));
1862
1863         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1864         ast_sip_presence_xml_create_attr(pool, instance, "state",
1865                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1866
1867         /* Use the PJLIB-util XML library directly here since we are using a
1868          * pj_str_t
1869          */
1870
1871         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1872         pj_xml_add_attr(instance, cid_attr);
1873 }
1874
1875 /*!
1876  * \brief A multipart body part and meta-information
1877  *
1878  * When creating a multipart body part, the end result (the
1879  * pjsip_multipart_part) is hard to inspect without undoing
1880  * a lot of what was done to create it. Therefore, we use this
1881  * structure to store meta-information about the body part.
1882  *
1883  * The main consumer of this is the creator of the RLMI body
1884  * part of a multipart resource list body.
1885  */
1886 struct body_part {
1887         /*! Content-ID header for the body part */
1888         pjsip_generic_string_hdr *cid;
1889         /*! Subscribed resource represented in the body part */
1890         const char *resource;
1891         /*! URI for the subscribed body part */
1892         pjsip_sip_uri *uri;
1893         /*! Subscription state of the resource represented in the body part */
1894         pjsip_evsub_state state;
1895         /*! The actual body part that will be present in the multipart body */
1896         pjsip_multipart_part *part;
1897 };
1898
1899 /*!
1900  * \brief Type declaration for container of body part structures
1901  */
1902 AST_VECTOR(body_part_list, struct body_part *);
1903
1904 /*!
1905  * \brief Create a Content-ID header
1906  *
1907  * Content-ID headers are required by RFC2387 for multipart/related
1908  * bodies. They serve as identifiers for each part of the multipart body.
1909  *
1910  * \param pool PJLIB allocation pool
1911  * \param sub Subscription to a resource
1912  */
1913 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1914                 const struct ast_sip_subscription *sub)
1915 {
1916         static const pj_str_t cid_name = { "Content-ID", 10 };
1917         pjsip_generic_string_hdr *cid;
1918         char id[6];
1919         size_t alloc_size;
1920         pj_str_t cid_value;
1921
1922         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1923         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1924         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1925         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1926                         ast_generate_random_string(id, sizeof(id)),
1927                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1928         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1929
1930         return cid;
1931 }
1932
1933 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1934 {
1935         int num_printed;
1936         pj_xml_node *rlmi = msg_body->data;
1937
1938         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1939         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1940                 return -1;
1941         }
1942
1943         return num_printed;
1944 }
1945
1946 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1947 {
1948         const pj_xml_node *rlmi = data;
1949
1950         return pj_xml_clone(pool, rlmi);
1951 }
1952
1953 /*!
1954  * \brief Create an RLMI body part for a multipart resource list body
1955  *
1956  * RLMI (Resource list meta information) is a special body type that lists
1957  * the subscribed resources and tells subscribers the number of subscribed
1958  * resources and what other body parts are in the multipart body. The
1959  * RLMI body also has a version number that a subscriber can use to ensure
1960  * that the locally-stored state corresponds to server state.
1961  *
1962  * \param pool The allocation pool
1963  * \param sub The subscription representing the subscribed resource list
1964  * \param body_parts A container of body parts that RLMI will refer to
1965  * \param full_state Indicates whether this is a full or partial state notification
1966  * \return The multipart part representing the RLMI body
1967  */
1968 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1969                 struct body_part_list *body_parts, unsigned int full_state)
1970 {
1971         static const pj_str_t rlmi_type = { "application", 11 };
1972         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1973         pj_xml_node *rlmi;
1974         pj_xml_node *name;
1975         pjsip_multipart_part *rlmi_part;
1976         char version_str[32];
1977         char uri[PJSIP_MAX_URL_SIZE];
1978         pjsip_generic_string_hdr *cid;
1979         int i;
1980
1981         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1982         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1983
1984         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1985         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1986
1987         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1988         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1989         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1990
1991         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1992         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1993
1994         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1995                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1996
1997                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1998         }
1999
2000         rlmi_part = pjsip_multipart_create_part(pool);
2001
2002         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
2003         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
2004         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
2005         pj_list_init(&rlmi_part->body->content_type.param);
2006
2007         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
2008         rlmi_part->body->clone_data = rlmi_clone_data;
2009         rlmi_part->body->print_body = rlmi_print_body;
2010
2011         cid = generate_content_id_hdr(pool, sub);
2012         pj_list_insert_before(&rlmi_part->hdr, cid);
2013
2014         return rlmi_part;
2015 }
2016
2017 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2018                 unsigned int force_full_state);
2019
2020 /*!
2021  * \brief Destroy a list of body parts
2022  *
2023  * \param parts The container of parts to destroy
2024  */
2025 static void free_body_parts(struct body_part_list *parts)
2026 {
2027         int i;
2028
2029         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
2030                 struct body_part *part = AST_VECTOR_GET(parts, i);
2031                 ast_free(part);
2032         }
2033
2034         AST_VECTOR_FREE(parts);
2035 }
2036
2037 /*!
2038  * \brief Allocate and initialize a body part structure
2039  *
2040  * \param pool PJLIB allocation pool
2041  * \param sub Subscription representing a subscribed resource
2042  */
2043 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
2044 {
2045         struct body_part *bp;
2046
2047         bp = ast_calloc(1, sizeof(*bp));
2048         if (!bp) {
2049                 return NULL;
2050         }
2051
2052         bp->cid = generate_content_id_hdr(pool, sub);
2053         bp->resource = sub->resource;
2054         bp->state = sub->subscription_state;
2055         bp->uri = sub->uri;
2056
2057         return bp;
2058 }
2059
2060 /*!
2061  * \brief Create a multipart body part for a subscribed resource
2062  *
2063  * \param pool PJLIB allocation pool
2064  * \param sub The subscription representing a subscribed resource
2065  * \param parts A vector of parts to append the created part to.
2066  * \param use_full_state Unused locally, but may be passed to other functions
2067  */
2068 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
2069                 struct body_part_list *parts, unsigned int use_full_state)
2070 {
2071         struct body_part *bp;
2072         pjsip_msg_body *body;
2073
2074         bp = allocate_body_part(pool, sub);
2075         if (!bp) {
2076                 return;
2077         }
2078
2079         body = generate_notify_body(pool, sub, use_full_state);
2080         if (!body) {
2081                 /* Partial state was requested and the resource has not changed state */
2082                 ast_free(bp);
2083                 return;
2084         }
2085
2086         bp->part = pjsip_multipart_create_part(pool);
2087         bp->part->body = body;
2088         pj_list_insert_before(&bp->part->hdr, bp->cid);
2089
2090         AST_VECTOR_APPEND(parts, bp);
2091 }
2092
2093 /*!
2094  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2095  *
2096  * \param pool
2097  * \return The multipart message body
2098  */
2099 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2100 {
2101         pjsip_media_type media_type;
2102         pjsip_param *media_type_param;
2103         char boundary[6];
2104         pj_str_t pj_boundary;
2105
2106         pjsip_media_type_init2(&media_type, "multipart", "related");
2107
2108         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2109         pj_list_init(media_type_param);
2110
2111         pj_strdup2(pool, &media_type_param->name, "type");
2112         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2113
2114         pj_list_insert_before(&media_type.param, media_type_param);
2115
2116         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2117         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2118 }
2119
2120 /*!
2121  * \brief Create a resource list body for NOTIFY requests
2122  *
2123  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2124  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2125  * convey state of individual subscribed resources.
2126  *
2127  * \param pool PJLIB allocation pool
2128  * \param sub Subscription details from which to generate body
2129  * \param force_full_state If true, ignore resource list settings and send a full state notification
2130  * \return The generated multipart/related body
2131  */
2132 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2133                 unsigned int force_full_state)
2134 {
2135         int i;
2136         pjsip_multipart_part *rlmi_part;
2137         pjsip_msg_body *multipart;
2138         struct body_part_list body_parts;
2139         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2140
2141         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2142                 return NULL;
2143         }
2144
2145         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2146                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2147         }
2148
2149         /* This can happen if issuing partial state and no children of the list have changed state */
2150         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2151                 return NULL;
2152         }
2153
2154         multipart = create_multipart_body(pool);
2155
2156         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2157         if (!rlmi_part) {
2158                 return NULL;
2159         }
2160         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2161
2162         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2163                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2164         }
2165
2166         free_body_parts(&body_parts);
2167         return multipart;
2168 }
2169
2170 /*!
2171  * \brief Create the body for a NOTIFY request.
2172  *
2173  * \param pool The pool used for allocations
2174  * \param root The root of the subscription tree
2175  * \param force_full_state If true, ignore resource list settings and send a full state notification
2176  */
2177 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2178                 unsigned int force_full_state)
2179 {
2180         pjsip_msg_body *body;
2181
2182         if (AST_VECTOR_SIZE(&root->children) == 0) {
2183                 if (force_full_state || root->body_changed) {
2184                         /* Not a list. We've already generated the body and saved it on the subscription.
2185                          * Use that directly.
2186                          */
2187                         pj_str_t type;
2188                         pj_str_t subtype;
2189                         pj_str_t text;
2190
2191                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2192                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2193                         pj_cstr(&text, ast_str_buffer(root->body_text));
2194
2195                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2196                         root->body_changed = 0;
2197                 } else {
2198                         body = NULL;
2199                 }
2200         } else {
2201                 body = generate_list_body(pool, root, force_full_state);
2202         }
2203
2204         return body;
2205 }
2206
2207 /*!
2208  * \brief Shortcut method to create a Require: eventlist header
2209  */
2210 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2211 {
2212         pjsip_require_hdr *require;
2213
2214         require = pjsip_require_hdr_create(pool);
2215         pj_strdup2(pool, &require->values[0], "eventlist");
2216         require->count = 1;
2217
2218         return require;
2219 }
2220
2221 /*!
2222  * \brief Send a NOTIFY request to a subscriber
2223  *
2224  * \pre sub_tree->dlg is locked
2225  *
2226  * \param sub_tree The subscription tree representing the subscription
2227  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2228  * \retval 0 Success
2229  * \retval non-zero Failure
2230  */
2231 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2232 {
2233         pjsip_evsub *evsub = sub_tree->evsub;
2234         pjsip_tx_data *tdata;
2235
2236         if (ast_shutdown_final()
2237                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2238                 && sub_tree->persistence) {
2239                 return 0;
2240         }
2241
2242         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2243                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2244                 return -1;
2245         }
2246
2247         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2248         if (!tdata->msg->body) {
2249                 pjsip_tx_data_dec_ref(tdata);
2250                 return -1;
2251         }
2252
2253         if (sub_tree->is_list) {
2254                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2255                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2256         }
2257
2258         if (sip_subscription_send_request(sub_tree, tdata)) {
2259                 /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */
2260                 return -1;
2261         }
2262
2263         sub_tree->send_scheduled_notify = 0;
2264
2265         return 0;
2266 }
2267
2268 static int serialized_send_notify(void *userdata)
2269 {
2270         struct sip_subscription_tree *sub_tree = userdata;
2271         pjsip_dialog *dlg = sub_tree->dlg;
2272
2273         pjsip_dlg_inc_lock(dlg);
2274
2275         /* It's possible that between when the notification was scheduled
2276          * and now a new SUBSCRIBE arrived requiring full state to be
2277          * sent out in an immediate NOTIFY. It's also possible that we're
2278          * already processing a terminate.  If that has happened, we need to
2279          * bail out here instead of sending the batched NOTIFY.
2280          */
2281
2282         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS
2283                 || !sub_tree->send_scheduled_notify) {
2284                 pjsip_dlg_dec_lock(dlg);
2285                 ao2_cleanup(sub_tree);
2286                 return 0;
2287         }
2288
2289         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2290                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2291         }
2292
2293         send_notify(sub_tree, 0);
2294
2295         ast_test_suite_event_notify(
2296                 sub_tree->state == SIP_SUB_TREE_TERMINATED
2297                         ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2298                 "Resource: %s", sub_tree->root->resource);
2299
2300         sub_tree->notify_sched_id = -1;
2301         pjsip_dlg_dec_lock(dlg);
2302         ao2_cleanup(sub_tree);
2303         return 0;
2304 }
2305
2306 static int sched_cb(const void *data)
2307 {
2308         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2309
2310         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2311         if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) {
2312                 ao2_cleanup(sub_tree);
2313         }
2314
2315         return 0;
2316 }
2317
2318 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2319 {
2320         /* There's already a notification scheduled */
2321         if (sub_tree->notify_sched_id > -1) {
2322                 return 0;
2323         }
2324
2325         sub_tree->send_scheduled_notify = 1;
2326         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2327         if (sub_tree->notify_sched_id < 0) {
2328                 ao2_cleanup(sub_tree);
2329                 return -1;
2330         }
2331
2332         return 0;
2333 }
2334
2335 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2336                 int terminate)
2337 {
2338         int res;
2339         pjsip_dialog *dlg = sub->tree->dlg;
2340
2341         pjsip_dlg_inc_lock(dlg);
2342
2343         if (sub->tree->state != SIP_SUB_TREE_NORMAL) {
2344                 pjsip_dlg_dec_lock(dlg);
2345                 return 0;
2346         }
2347
2348         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2349                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2350                 pjsip_dlg_dec_lock(dlg);
2351                 return -1;
2352         }
2353
2354         sub->body_changed = 1;
2355         if (terminate) {
2356                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2357                 sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
2358         }
2359
2360         if (sub->tree->notification_batch_interval) {
2361                 res = schedule_notification(sub->tree);
2362         } else {
2363                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2364                 ao2_ref(sub->tree, +1);
2365                 if (terminate) {
2366                         sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
2367                 }
2368                 res = send_notify(sub->tree, 0);
2369                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2370                                 "Resource: %s",
2371                                 sub->tree->root->resource);
2372                 ao2_ref(sub->tree, -1);
2373         }
2374
2375         pjsip_dlg_dec_lock(dlg);
2376         return res;
2377 }
2378
2379 pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub)
2380 {
2381         return sub->uri;
2382 }
2383
2384 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2385 {
2386         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2387 }
2388
2389 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2390 {
2391         pjsip_dialog *dlg;
2392
2393         dlg = sub->tree->dlg;
2394         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2395 }
2396
2397 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2398 {
2399         return sub->resource;
2400 }
2401
2402 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2403 {
2404         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2405 }
2406
2407 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2408 {
2409         pjsip_hdr res_hdr;
2410
2411         /* If this is a persistence recreation the subscription has already been accepted */
2412         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2413                 return 0;
2414         }
2415
2416         pj_list_init(&res_hdr);
2417         if (sub_tree->is_list) {
2418                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2419                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2420         }
2421
2422         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2423 }
2424
2425 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2426 {
2427         return ast_datastores_alloc_datastore(info, uid);
2428 }
2429
2430 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2431 {
2432         return ast_datastores_add(subscription->datastores, datastore);
2433 }
2434
2435 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2436 {
2437         return ast_datastores_find(subscription->datastores, name);
2438 }
2439
2440 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2441 {
2442         ast_datastores_remove(subscription->datastores, name);
2443 }
2444
2445 struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription)
2446 {
2447         return subscription->datastores;
2448 }
2449
2450 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2451 {
2452         return ast_datastores_add(publication->datastores, datastore);
2453 }
2454
2455 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2456 {
2457         return ast_datastores_find(publication->datastores, name);
2458 }
2459
2460 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2461 {
2462         ast_datastores_remove(publication->datastores, name);
2463 }
2464
2465 struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication)
2466 {
2467         return publication->datastores;
2468 }
2469
2470 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2471
2472 static int publication_hash_fn(const void *obj, const int flags)
2473 {
2474         const struct ast_sip_publication *publication = obj;
2475         const int *entity_tag = obj;
2476
2477         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2478 }
2479
2480 static int publication_cmp_fn(void *obj, void *arg, int flags)
2481 {
2482         const struct ast_sip_publication *publication1 = obj;
2483         const struct ast_sip_publication *publication2 = arg;
2484         const int *entity_tag = arg;
2485
2486         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2487                 CMP_MATCH | CMP_STOP : 0);
2488 }
2489
2490 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2491 {
2492         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2493         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2494 }
2495
2496 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2497 {
2498         if (ast_strlen_zero(handler->event_name)) {
2499                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2500                 return -1;
2501         }
2502
2503         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2504                 publication_hash_fn, publication_cmp_fn))) {
2505                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2506                         handler->event_name);
2507                 return -1;
2508         }
2509
2510         publish_add_handler(handler);
2511
2512         ast_module_ref(ast_module_info->self);
2513
2514         return 0;
2515 }
2516
2517 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2518 {
2519         struct ast_sip_publish_handler *iter;
2520         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2521         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2522                 if (handler == iter) {
2523                         AST_RWLIST_REMOVE_CURRENT(next);
2524                         ao2_cleanup(handler->publications);
2525                         ast_module_unref(ast_module_info->self);
2526                         break;
2527                 }
2528         }
2529         AST_RWLIST_TRAVERSE_SAFE_END;
2530 }
2531
2532 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2533
2534 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2535 {
2536         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2537         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2538         ast_module_ref(ast_module_info->self);
2539 }
2540
2541 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2542 {
2543         struct ast_sip_subscription_handler *iter;
2544         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2545
2546         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2547                 if (!strcmp(iter->event_name, event_name)) {
2548                         break;
2549                 }
2550         }
2551         return iter;
2552 }
2553
2554 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2555 {
2556         pj_str_t event;
2557         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2558         struct ast_sip_subscription_handler *existing;
2559         int i = 0;
2560
2561         if (ast_strlen_zero(handler->event_name)) {
2562                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2563                 return -1;
2564         }
2565
2566         existing = find_sub_handler_for_event_name(handler->event_name);
2567         if (existing) {
2568                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2569                                 "A handler is already registered\n", handler->event_name);
2570                 return -1;
2571         }
2572
2573         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2574                 pj_cstr(&accept[i], handler->accept[i]);
2575         }
2576
2577         pj_cstr(&event, handler->event_name);
2578
2579         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2580
2581         sub_add_handler(handler);
2582
2583         return 0;
2584 }
2585
2586 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2587 {
2588         struct ast_sip_subscription_handler *iter;
2589         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2590         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2591                 if (handler == iter) {
2592                         AST_RWLIST_REMOVE_CURRENT(next);
2593                         ast_module_unref(ast_module_info->self);
2594                         break;
2595                 }
2596         }
2597         AST_RWLIST_TRAVERSE_SAFE_END;
2598 }
2599
2600 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype)
2601 {
2602         struct ast_sip_pubsub_body_generator *gen;
2603
2604         AST_LIST_TRAVERSE(&body_generators, gen, list) {
2605                 if (!strcmp(gen->type, type)
2606                         && !strcmp(gen->subtype, subtype)) {
2607                         break;
2608                 }
2609         }
2610
2611         return gen;
2612 }
2613
2614 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype)
2615 {
2616         struct ast_sip_pubsub_body_generator *gen;
2617
2618         AST_RWLIST_RDLOCK(&body_generators);
2619         gen = find_body_generator_type_subtype_nolock(type, subtype);
2620         AST_RWLIST_UNLOCK(&body_generators);
2621         return gen;
2622 }
2623
2624 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2625 {
2626         char *accept_copy = ast_strdupa(accept);
2627         char *subtype = accept_copy;
2628         char *type = strsep(&subtype, "/");
2629
2630         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2631                 return NULL;
2632         }
2633
2634         return find_body_generator_type_subtype(type, subtype);
2635 }
2636
2637 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2638                 size_t num_accept, const char *body_type)
2639 {
2640         int i;
2641         struct ast_sip_pubsub_body_generator *generator = NULL;
2642
2643         for (i = 0; i < num_accept; ++i) {
2644                 generator = find_body_generator_accept(accept[i]);
2645                 if (generator) {
2646                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2647                         if (strcmp(generator->body_type, body_type)) {
2648                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2649                                                 generator->type, generator->subtype, generator);
2650                                 generator = NULL;
2651                                 continue;
2652                         }
2653                         break;
2654                 } else {
2655                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2656                 }
2657         }
2658
2659         return generator;
2660 }
2661
2662 static int generate_initial_notify(struct ast_sip_subscription *sub)
2663 {
2664         void *notify_data;
2665         int res;
2666         struct ast_sip_body_data data = {
2667                 .body_type = sub->handler->body_type,
2668         };
2669
2670         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2671                 int i;
2672
2673                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2674                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2675                                 return -1;
2676                         }
2677                 }
2678
2679                 return 0;
2680         }
2681
2682         /* We notify subscription establishment only on the tree leaves. */
2683         if (sub->handler->notifier->subscription_established(sub)) {
2684                 return -1;
2685         }
2686
2687         notify_data = sub->handler->notifier->get_notify_data(sub);
2688         if (!notify_data) {
2689                 return -1;
2690         }
2691
2692         data.body_data = notify_data;
2693
2694         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2695                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2696
2697         ao2_cleanup(notify_data);
2698
2699         return res;
2700 }
2701
2702 static int initial_notify_task(void * obj)
2703 {
2704         struct sip_subscription_tree *sub_tree;
2705
2706         sub_tree = obj;
2707         if (generate_initial_notify(sub_tree->root)) {
2708                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2709         } else {
2710                 send_notify(sub_tree, 1);
2711                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2712                         "Resource: %s",
2713                         sub_tree->root->resource);
2714         }
2715
2716         ao2_ref(sub_tree, -1);
2717         return 0;
2718 }
2719
2720 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2721 {
2722         pjsip_expires_hdr *expires_header;
2723         struct ast_sip_subscription_handler *handler;
2724         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2725         struct sip_subscription_tree *sub_tree;
2726         struct ast_sip_pubsub_body_generator *generator;
2727         char *resource;
2728         pjsip_uri *request_uri;
2729         pjsip_sip_uri *request_uri_sip;
2730         size_t resource_size;
2731         int resp;
2732         struct resource_tree tree;
2733         pj_status_t dlg_status;
2734
2735         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2736         ast_assert(endpoint != NULL);
2737
2738         if (!endpoint->subscription.allow) {
2739                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2740                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2741                 return PJ_TRUE;
2742         }
2743
2744         request_uri = rdata->msg_info.msg->line.req.uri;
2745
2746         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2747                 char uri_str[PJSIP_MAX_URL_SIZE];
2748
2749                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2750                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2751                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2752                 return PJ_TRUE;
2753         }
2754
2755         request_uri_sip = pjsip_uri_get_uri(request_uri);
2756         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2757         resource = ast_alloca(resource_size);
2758         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2759
2760         /*
2761          * We may want to match without any user options getting
2762          * in the way.
2763          */
2764         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource);
2765
2766         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2767
2768         if (expires_header) {
2769                 if (expires_header->ivalue == 0) {
2770                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2771                                 ast_sorcery_object_get_id(endpoint));
2772                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2773                                 return PJ_TRUE;
2774                 }
2775                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2776                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2777                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2778                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2779                         return PJ_TRUE;
2780                 }
2781         }
2782
2783         handler = subscription_get_handler_from_rdata(rdata);
2784         if (!handler) {
2785                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2786                 return PJ_TRUE;
2787         }
2788
2789         generator = subscription_get_generator_from_rdata(rdata, handler);
2790         if (!generator) {
2791                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2792                 return PJ_TRUE;
2793         }
2794
2795         memset(&tree, 0, sizeof(tree));
2796         resp = build_resource_tree(endpoint, handler, resource, &tree,
2797                 ast_sip_pubsub_has_eventlist_support(rdata));
2798         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2799                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2800                 resource_tree_destroy(&tree);
2801                 return PJ_TRUE;
2802         }
2803
2804         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2805         if (!sub_tree) {
2806                 if (dlg_status != PJ_EEXISTS) {
2807                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2808                 }
2809         } else {
2810                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2811                 subscription_persistence_update(sub_tree, rdata);
2812                 sip_subscription_accept(sub_tree, rdata, resp);
2813                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
2814                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2815                         ao2_ref(sub_tree, -1);
2816                 }
2817         }
2818
2819         resource_tree_destroy(&tree);
2820         return PJ_TRUE;
2821 }
2822
2823 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2824 {
2825         struct ast_sip_publish_handler *iter = NULL;
2826         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2827
2828         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2829                 if (strcmp(event, iter->event_name)) {
2830                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2831                         continue;
2832                 }
2833                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2834                 break;
2835         }
2836
2837         return iter;
2838 }
2839
2840 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2841         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2842 {
2843         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2844
2845         if (etag_hdr) {
2846                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2847
2848                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2849
2850                 if (sscanf(etag, "%30d", entity_id) != 1) {
2851                         return SIP_PUBLISH_UNKNOWN;
2852                 }
2853         }
2854
2855         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2856
2857         if (!(*expires)) {
2858                 return SIP_PUBLISH_REMOVE;
2859         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2860                 return SIP_PUBLISH_INITIAL;
2861         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2862                 return SIP_PUBLISH_REFRESH;
2863         } else if (etag_hdr && rdata->msg_info.msg->body) {
2864                 return SIP_PUBLISH_MODIFY;
2865         }
2866
2867         return SIP_PUBLISH_UNKNOWN;
2868 }
2869
2870 /*! \brief Internal destructor for publications */
2871 static void publication_destroy_fn(void *obj)
2872 {
2873         struct ast_sip_publication *publication = obj;
2874
2875         ast_debug(3, "Destroying SIP publication\n");
2876
2877         ao2_cleanup(publication->datastores);
2878         ao2_cleanup(publication->endpoint);
2879 }
2880
2881 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2882         const char *resource, const char *event_configuration_name)
2883 {
2884         struct ast_sip_publication *publication;
2885         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2886         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2887         char *dst;
2888
2889         ast_assert(endpoint != NULL);
2890
2891         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2892                 return NULL;
2893         }
2894
2895         if (!(publication->datastores = ast_datastores_alloc())) {
2896                 ao2_ref(publication, -1);
2897                 return NULL;
2898         }
2899
2900         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2901         ao2_ref(endpoint, +1);
2902         publication->endpoint = endpoint;
2903         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2904         publication->sched_id = -1;
2905         dst = publication->data;
2906         publication->resource = strcpy(dst, resource);
2907         dst += resource_len;
2908         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2909
2910         return publication;
2911 }
2912
2913 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2914                 pjsip_rx_data *rdata)
2915 {
2916         pjsip_tx_data *tdata;
2917         pjsip_transaction *tsx;
2918
2919         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2920                 return -1;
2921         }
2922
2923         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2924                 char buf[30];
2925
2926                 snprintf(buf, sizeof(buf), "%d", pub->entity_tag);
2927                 ast_sip_add_header(tdata, "SIP-ETag", buf);
2928
2929                 snprintf(buf, sizeof(buf), "%d", pub->expires);
2930                 ast_sip_add_header(tdata, "Expires", buf);
2931         }
2932
2933         if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) {
2934                 pjsip_tx_data_dec_ref(tdata);
2935                 return -1;
2936         }
2937
2938         pjsip_tsx_recv_msg(tsx, rdata);
2939
2940         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2941                 pjsip_tx_data_dec_ref(tdata);
2942                 return -1;
2943         }
2944
2945         return 0;
2946 }
2947
2948 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2949         struct ast_sip_publish_handler *handler)
2950 {
2951         struct ast_sip_publication *publication;
2952         char *resource_name;
2953         size_t resource_size;
2954         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2955         struct ast_variable *event_configuration_name = NULL;
2956         pjsip_uri *request_uri;
2957         pjsip_sip_uri *request_uri_sip;
2958         int resp;
2959
2960         request_uri = rdata->msg_info.msg->line.req.uri;
2961
2962         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2963                 char uri_str[PJSIP_MAX_URL_SIZE];
2964
2965                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2966                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2967                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2968                 return NULL;
2969         }
2970
2971         request_uri_sip = pjsip_uri_get_uri(request_uri);
2972         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2973         resource_name = ast_alloca(resource_size);
2974         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2975
2976         /*
2977          * We may want to match without any user options getting
2978          * in the way.
2979          */
2980         AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name);
2981
2982         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2983         if (!resource) {
2984                 ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name);
2985                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2986                 return NULL;
2987         }
2988
2989         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2990                 ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n",
2991                         resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint));
2992                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2993                 return NULL;
2994         }
2995
2996         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2997                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2998                         break;
2999                 }
3000         }
3001
3002         if (!event_configuration_name) {
3003                 ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name);
3004                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
3005                 return NULL;
3006         }
3007
3008         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
3009
3010         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
3011                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
3012                 return NULL;
3013         }
3014
3015         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
3016
3017         if (!publication) {
3018                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
3019                 return NULL;
3020         }
3021
3022         publication->handler = handler;
3023         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
3024                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
3025                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
3026                 ao2_cleanup(publication);
3027                 return NULL;
3028         }
3029
3030         sip_publication_respond(publication, resp, rdata);
3031
3032         return publication;
3033 }
3034
3035 static int publish_expire_callback(void *data)
3036 {
3037         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
3038
3039         if (publication->handler->publish_expire) {
3040                 publication->handler->publish_expire(publication);
3041         }
3042
3043         return 0;
3044 }
3045
3046 static int publish_expire(const void *data)
3047 {
3048         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
3049
3050         ao2_unlink(publication->handler->publications, publication);
3051         publication->sched_id = -1;
3052
3053         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
3054                 ao2_cleanup(publication);
3055         }
3056
3057         return 0;
3058 }
3059
3060 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
3061 {
3062         pjsip_event_hdr *event_header;
3063         struct ast_sip_publish_handler *handler;
3064         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
3065         char event[32];
3066         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
3067         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
3068         enum sip_publish_type publish_type;
3069         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
3070         int expires = 0, entity_id, response = 0;
3071
3072         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
3073         ast_assert(endpoint != NULL);
3074
3075         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
3076         if (!event_header) {
3077                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
3078                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3079                 return PJ_TRUE;
3080         }
3081         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
3082
3083         handler = find_pub_handler(event);
3084         if (!handler) {
3085                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
3086                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
3087                 return PJ_TRUE;
3088         }
3089
3090         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3091
3092         /* If this is not an initial publish ensure that a publication is present */
3093         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3094                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3095                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3096
3097                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3098                                 NULL, NULL);
3099                         return PJ_TRUE;
3100                 }
3101
3102                 /* Per the RFC every response has to have a new entity tag */
3103                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3104
3105                 /* Update the expires here so that the created responses will contain the correct value */
3106                 publication->expires = expires;
3107         }
3108
3109         switch (publish_type) {
3110                 case SIP_PUBLISH_INITIAL:
3111                         publication = publish_request_initial(endpoint, rdata, handler);
3112                         break;
3113                 case SIP_PUBLISH_REFRESH:
3114                 case SIP_PUBLISH_MODIFY:
3115                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3116                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3117                                 /* If an error occurs we want to terminate the publication */
3118                                 expires = 0;
3119                         }
3120                         response = 200;
3121                         break;
3122                 case SIP_PUBLISH_REMOVE:
3123                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3124                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3125                         response = 200;
3126                         break;
3127                 case SIP_PUBLISH_UNKNOWN:
3128                 default:
3129                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3130                         break;
3131         }
3132
3133         if (publication) {
3134                 if (expires) {
3135                         ao2_link(handler->publications, publication);
3136
3137                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3138                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3139                 } else {
3140                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3141                 }
3142         }
3143
3144         if (response) {
3145                 sip_publication_respond(publication, response, rdata);
3146         }
3147
3148         return PJ_TRUE;
3149 }
3150
3151 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3152 {
3153         return pub->endpoint;
3154 }
3155
3156 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3157 {
3158         return pub->resource;
3159 }
3160
3161 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3162 {
3163         return pub->event_configuration_name;
3164 }
3165
3166 int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype)
3167 {
3168         return !!find_body_generator_type_subtype(type, subtype);
3169 }
3170
3171 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3172 {
3173         struct ast_sip_pubsub_body_generator *existing;
3174         pj_str_t accept;
3175         pj_size_t accept_len;
3176
3177         AST_RWLIST_WRLOCK(&body_generators);
3178         existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype);
3179         if (existing) {
3180                 AST_RWLIST_UNLOCK(&body_generators);
3181                 ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n",
3182                         generator->type, generator->subtype);
3183                 return -1;
3184         }
3185         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3186         AST_RWLIST_UNLOCK(&body_generators);
3187
3188         /* Lengths of type and subtype plus a slash. */
3189         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3190
3191         /* Add room for null terminator that sprintf() will set. */
3192         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3193         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3194
3195         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3196                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3197
3198         return 0;
3199 }
3200
3201 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3202 {
3203         struct ast_sip_pubsub_body_generator *iter;
3204         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3205
3206         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3207                 if (iter == generator) {
3208                         AST_LIST_REMOVE_CURRENT(list);
3209                         break;
3210                 }
3211         }
3212         AST_RWLIST_TRAVERSE_SAFE_END;
3213 }
3214
3215 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3216 {
3217         AST_RWLIST_WRLOCK(&body_supplements);
3218         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3219         AST_RWLIST_UNLOCK(&body_supplements);
3220
3221         return 0;
3222 }
3223
3224 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3225 {
3226         struct ast_sip_pubsub_body_supplement *iter;
3227         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3228
3229         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3230                 if (iter == supplement) {
3231                         AST_LIST_REMOVE_CURRENT(list);
3232                         break;
3233                 }
3234         }
3235         AST_RWLIST_TRAVERSE_SAFE_END;
3236 }
3237
3238 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3239 {
3240         return sub->body_generator->type;
3241 }
3242
3243 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3244 {
3245         return sub->body_generator->subtype;
3246 }
3247
3248 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3249                 struct ast_sip_body_data *data, struct ast_str **str)
3250 {
3251         struct ast_sip_pubsub_body_supplement *supplement;
3252         struct ast_sip_pubsub_body_generator *generator;
3253         int res = 0;
3254         void *body;
3255
3256         generator = find_body_generator_type_subtype(type, subtype);
3257         if (!generator) {
3258                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3259                                 type, subtype);
3260                 return -1;
3261         }
3262
3263         if (strcmp(data->body_type, generator->body_type)) {
3264                 ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n",
3265                         type, subtype);
3266                 return -1;
3267         }
3268
3269         body = generator->allocate_body(data->body_data);
3270         if (!body) {
3271                 ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n",
3272                         type, subtype);
3273                 return -1;
3274         }
3275
3276         if (generator->generate_body_content(body, data->body_data)) {
3277                 res = -1;
3278                 goto end;
3279         }
3280
3281         AST_RWLIST_RDLOCK(&body_supplements);
3282         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3283                 if (!strcmp(generator->type, supplement->type) &&
3284                                 !strcmp(generator->subtype, supplement->subtype)) {
3285                         res = supplement->supplement_body(body, data->body_data);
3286                         if (res) {
3287                                 break;
3288                         }
3289                 }
3290         }
3291         AST_RWLIST_UNLOCK(&body_supplements);
3292
3293         if (!res) {
3294                 generator->to_string(body, str);
3295         }
3296
3297 end:
3298         if (generator->destroy_body) {
3299                 generator->destroy_body(body);
3300         }
3301
3302         return res;
3303 }
3304
3305 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3306 {
3307         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3308                 return pubsub_on_rx_subscribe_request(rdata);
3309         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3310                 return pubsub_on_rx_publish_request(rdata);
3311         }
3312
3313         return PJ_FALSE;
3314 }
3315
3316 static void set_state_terminated(struct ast_sip_subscription *sub)
3317 {
3318         int i;
3319
3320         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3321         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3322                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3323         }
3324 }
3325
3326 /*!
3327  * \brief Callback sequence for subscription terminate:
3328  *
3329  * * Client initiated:
3330  *     pjproject receives SUBSCRIBE on the subscription's serializer thread
3331  *         calls pubsub_on_rx_refresh with dialog locked
3332  *             pubsub_on_rx_refresh sets TERMINATE_PENDING
3333  *             pushes serialized_pubsub_on_refresh_timeout
3334  *             returns to pjproject
3335  *         pjproject calls pubsub_on_evsub_state
3336  *             pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no)
3337  *             ignore and return
3338  *         pjproject unlocks dialog
3339  *     serialized_pubsub_on_refresh_timeout starts (1)
3340  *       locks dialog
3341  *       checks state == TERMINATE_PENDING
3342  *       sets TERMINATE_IN_PROGRESS
3343  *       calls send_notify (2)
3344  *           send_notify ultimately calls pjsip_evsub_send_request
3345  *               pjsip_evsub_send_request calls evsub's set_state
3346  *                   set_state calls pubsub_evsub_set_state
3347  *                       pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS
3348  *                       removes the subscriptions
3349  *                       cleans up references to evsub
3350  *                       sets state = TERMINATED
3351  *       serialized_pubsub_on_refresh_timeout unlocks dialog
3352  *
3353  * * Subscription timer expires:
3354  *     pjproject timer expires
3355  *         locks dialog
3356  *         calls pubsub_on_server_timeout
3357  *             pubsub_on_server_timeout checks state == NORMAL
3358  *             sets TERMINATE_PENDING
3359  *             pushes serialized_pubsub_on_refresh_timeout
3360  *             returns to pjproject
3361  *         pjproject unlocks dialog
3362  *     serialized_pubsub_on_refresh_timeout starts
3363  *         See (1) Above
3364  *
3365  *
3366  * * ast_sip_subscription_notify is called
3367  *       checks state == NORMAL
3368  *       if not batched...
3369  *           sets TERMINATE_IN_PROGRESS (if terminate is requested)
3370  *           calls send_notify
3371  *               See (2) Above
3372  *       if batched...
3373  *           sets TERMINATE_PENDING
3374  *           schedules task
3375  *       scheduler runs sched_task
3376  *           sched_task pushes serialized_send_notify
3377  *       serialized_send_notify starts
3378  *           checks state <= TERMINATE_PENDING
3379  *           if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS
3380  *           call send_notify
3381  *               See (2) Above
3382  *
3383  */
3384
3385 /*!
3386  * \brief PJSIP callback when underlying SIP subscription changes state
3387  *
3388  * Although this function is called for every state change, we only care
3389  * about the TERMINATED state, and only when we're actually processing the final
3390  * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS).  In this case, we do all
3391  * the subscription tree cleanup tasks and decrement the evsub reference.
3392  */
3393 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
3394 {
3395         struct sip_subscription_tree *sub_tree;
3396
3397         ast_debug(3, "on_evsub_state called with state %s\n", pjsip_evsub_get_state_name(evsub));
3398
3399         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
3400                 return;
3401         }
3402
3403         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3404         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3405                 ast_debug(1, "Possible terminate race prevented %p\n", sub_tree);
3406                 return;
3407         }
3408
3409         remove_subscription(sub_tree);
3410
3411         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
3412
3413 #ifdef HAVE_PJSIP_EVSUB_GRP_LOCK
3414         pjsip_evsub_dec_ref(sub_tree->evsub);
3415 #endif
3416
3417         sub_tree->evsub = NULL;
3418
3419         ast_sip_dialog_set_serializer(sub_tree->dlg, NULL);
3420         ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL);
3421
3422         subscription_persistence_remove(sub_tree);
3423         shutdown_subscriptions(sub_tree->root);
3424
3425         sub_tree->state = SIP_SUB_TREE_TERMINATED;
3426         /* Remove evsub's reference to the sub_tree */
3427         ao2_ref(sub_tree, -1);
3428 }
3429
3430 static int serialized_pubsub_on_refresh_timeout(void *userdata)
3431 {
3432         struct sip_subscription_tree *sub_tree = userdata;
3433         pjsip_dialog *dlg = sub_tree->dlg;
3434
3435         pjsip_dlg_inc_lock(dlg);
3436         if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) {
3437                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree->evsub, sub_tree->state);
3438                 pjsip_dlg_dec_lock(dlg);
3439                 ao2_cleanup(sub_tree);
3440                 return 0;
3441         }
3442
3443         if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) {
3444                 sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS;
3445                 set_state_terminated(sub_tree->root);
3446         }
3447
3448         send_notify(sub_tree, 1);
3449
3450         ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ?
3451                                 "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED",
3452                                 "Resource: %s", sub_tree->root->resource);
3453
3454         pjsip_dlg_dec_lock(dlg);
3455         ao2_cleanup(sub_tree);
3456         return 0;
3457 }
3458
3459 /*!
3460  * \brief Called whenever an in-dialog SUBSCRIBE is received
3461  *
3462  * This includes both SUBSCRIBE requests that actually refresh the subscription
3463  * as well as SUBSCRIBE requests that end the subscription.
3464  *
3465  * In either case we push serialized_pubsub_on_refresh_timeout to send an
3466  * appropriate NOTIFY request.
3467  */
3468 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
3469                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3470 {
3471         struct sip_subscription_tree *sub_tree;
3472
3473         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3474         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3475                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 );
3476                 return;
3477         }
3478
3479         /* PJSIP will set the evsub's state to terminated before calling into this function
3480          * if the Expires value of the incoming SUBSCRIBE is 0.
3481          */
3482
3483         if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
3484                 sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3485         }
3486
3487         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3488                 /* If we can't push the NOTIFY refreshing task...we'll just go with it. */
3489                 ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n");
3490                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3491                 ao2_ref(sub_tree, -1);
3492         }
3493
3494         if (sub_tree->is_list) {
3495                 pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool));
3496         }
3497 }
3498
3499 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
3500                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
3501 {
3502         struct ast_sip_subscription *sub;
3503
3504         if (!(sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3505                 return;
3506         }
3507
3508         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
3509                         pjsip_evsub_get_state(evsub));
3510 }
3511
3512 static int serialized_pubsub_on_client_refresh(void *userdata)
3513 {
3514         struct sip_subscription_tree *sub_tree = userdata;
3515         pjsip_tx_data *tdata;
3516
3517         if (!sub_tree->evsub) {
3518                 ao2_cleanup(sub_tree);
3519                 return 0;
3520         }
3521
3522         if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
3523                 pjsip_evsub_send_request(sub_tree->evsub, tdata);
3524         } else {
3525                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
3526         }
3527
3528         ao2_cleanup(sub_tree);
3529         return 0;
3530 }
3531
3532 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
3533 {
3534         struct sip_subscription_tree *sub_tree;
3535
3536         if (!(sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) {
3537                 return;
3538         }
3539
3540         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, ao2_bump(sub_tree))) {
3541                 ao2_cleanup(sub_tree);
3542         }
3543 }
3544
3545 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
3546 {
3547         struct sip_subscription_tree *sub_tree;
3548
3549         /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE
3550          * with Expires: 0 arrives to end a subscription, nor does it terminate
3551          * this timer when we send a NOTIFY request in response to receiving such
3552          * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the
3553          * NOTIFY transaction has finished (either through receiving a response
3554          * or through a transaction timeout).
3555          *
3556          * Therefore, it is possible that we can be told that a server timeout
3557          * occurred after we already thought that the subscription had been
3558          * terminated. In such a case, we will have already removed the sub_tree
3559          * from the evsub's mod_data array.
3560          */
3561
3562         sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
3563         if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) {
3564                 ast_debug(1, "Possible terminate race prevented %p %d\n", sub_tree, sub_tree ? sub_tree->state : -1 );
3565         return;
3566         }
3567
3568         sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING;
3569         if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) {
3570                 sub_tree->state = SIP_SUB_TREE_NORMAL;
3571                 ao2_cleanup(sub_tree);
3572         }
3573 }
3574
3575 static int ami_subscription_detail(struct sip_subscription_tree *sub_tree,
3576                                    struct ast_sip_ami *ami,
3577                                    const char *event)
3578 {
3579         struct ast_str *buf;
3580
3581         buf = ast_sip_create_ami_event(event, ami);
3582         if (!buf) {
3583                 return -1;
3584         }
3585
3586         sip_subscription_to_ami(sub_tree, &buf);
3587         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3588         ast_free(buf);
3589
3590         ++ami->count;
3591         return 0;
3592 }
3593
3594 static int ami_subscription_detail_inbound(struct sip_subscription_tree *sub_tree, void *arg)
3595 {
3596         return sub_tree->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
3597                 sub_tree, arg, "InboundSubscriptionDetail") : 0;
3598 }
3599
3600 static int ami_subscription_detail_outbound(struct sip_subscription_tree *sub_tree, void *arg)
3601 {
3602         return sub_tree->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
3603                 sub_tree, arg, "OutboundSubscriptionDetail") : 0;
3604 }
3605
3606 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
3607 {
3608         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3609
3610         astman_send_listack(s, m, "Following are Events for each inbound Subscription",
3611                 "start");
3612
3613         for_each_subscription(ami_subscription_detail_inbound, &ami);
3614
3615         astman_send_list_complete_start(s, m, "InboundSubscriptionDetailComplete", ami.count);
3616         astman_send_list_complete_end(s);
3617         return 0;
3618 }
3619
3620 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
3621 {
3622         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3623
3624         astman_send_listack(s, m, "Following are Events for each outbound Subscription",
3625                 "start");
3626
3627         for_each_subscription(ami_subscription_detail_outbound, &ami);
3628
3629         astman_send_list_complete_start(s, m, "OutboundSubscriptionDetailComplete", ami.count);
3630         astman_send_list_complete_end(s);
3631         return 0;
3632 }
3633
3634 static int format_ami_resource_lists(void *obj, void *arg, int flags)
3635 {
3636         struct resource_list *list = obj;
3637         struct ast_sip_ami *ami = arg;
3638         struct ast_str *buf;
3639
3640         buf = ast_sip_create_ami_event("ResourceListDetail", ami);
3641         if (!buf) {
3642                 return CMP_STOP;
3643         }
3644
3645         if (ast_sip_sorcery_object_to_ami(list, &buf)) {
3646                 ast_free(buf);
3647                 return CMP_STOP;
3648         }
3649         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
3650         ast_free(buf);
3651
3652         ++ami->count;
3653         return 0;
3654 }
3655
3656 static int ami_show_resource_lists(struct mansession *s, const struct message *m)
3657 {
3658         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
3659         struct ao2_container *lists;
3660
3661         lists = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "resource_list",
3662                         AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
3663
3664         if (!lists || !ao2_container_count(lists)) {
3665                 astman_send_error(s, m, "No resource lists found\n");
3666                 return 0;
3667         }
3668
3669         astman_send_listack(s, m, "A listing of resource lists follows, presented as ResourceListDetail events",
3670                 "start");
3671
3672         ao2_callback(lists, OBJ_NODATA, format_ami_resource_lists, &ami);
3673
3674         astman_send_list_complete_start(s, m, "ResourceListDetailComplete", ami.count);
3675         astman_send_list_complete_end(s);
3676         return 0;
3677 }
3678
3679 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
3680 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
3681
3682 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3683 {
3684         struct subscription_persistence *persistence = obj;
3685
3686         persistence->endpoint = ast_strdup(var->value);
3687         return 0;
3688 }
3689
3690 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
3691 {
3692         const struct subscription_persistence *persistence = obj;
3693
3694         *buf = ast_strdup(persistence->endpoint);
3695         return 0;
3696 }
3697
3698 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3699 {
3700         struct subscription_persistence *persistence = obj;
3701
3702         persistence->tag = ast_strdup(var->value);
3703         return 0;
3704 }
3705
3706 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
3707 {
3708         const struct subscription_persistence *persistence = obj;
3709
3710         *buf = ast_strdup(persistence->tag);
3711         return 0;
3712 }
3713
3714 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
3715 {
3716         struct subscription_persistence *persistence = obj;
3717         return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
3718 }
3719
3720 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
3721 {
3722         const struct subscription_persistence *persistence = obj;
3723         return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
3724 }
3725
3726 #define RESOURCE_LIST_INIT_SIZE 4
3727
3728 static void resource_list_destructor(void *obj)
3729 {
3730         struct resource_list *list = obj;
3731         int i;
3732
3733         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
3734                 ast_free((char *) AST_VECTOR_GET(&list->items, i));
3735         }
3736
3737         AST_VECTOR_FREE(&list->items);
3738 }
3739
3740 static void *resource_list_alloc(const char *name)
3741 {
3742         struct resource_list *list;
3743
3744         list = ast_sorcery_generic_alloc(sizeof(*list), resource_list_destructor);
3745         if (!list) {
3746                 return NULL;
3747         }
3748
3749         if (AST_VECTOR_INIT(&list->items, RESOURCE_LIST_INIT_SIZE)) {
3750                 ao2_cleanup(list);
3751                 return NULL;
3752         }
3753
3754         return list;
3755 }
3756
3757 static int item_in_vector(const struct resource_list *list, const char *item)
3758 {
3759         int i;
3760
3761         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
3762                 if (!strcmp(item, AST_VECTOR_GET(&list->items, i))) {
3763                         return 1;
3764                 }
3765         }
3766
3767         return 0;
3768 }
3769
3770 static int list_item_handler(const struct aco_option *opt,
3771                 struct ast_variable *var, void *obj)
3772 {
3773         struct resource_list *list = obj;
3774         char *items = ast_strdupa(var->value);
3775         char *item;
3776
3777         while ((item = ast_strip(strsep(&items, ",")))) {