res_pjsip/config_transport: Allow reloading transports.
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47 #include "asterisk/res_pjsip_presence_xml.h"
48
49 /*** DOCUMENTATION
50         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
51                 <synopsis>
52                         Lists subscriptions.
53                 </synopsis>
54                 <syntax />
55                 <description>
56                         <para>
57                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
58                         is issued for each subscription object.  Once all detail events are completed an
59                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
60                         </para>
61                 </description>
62         </manager>
63         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
64                 <synopsis>
65                         Lists subscriptions.
66                 </synopsis>
67                 <syntax />
68                 <description>
69                         <para>
70                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
71                         is issued for each subscription object.  Once all detail events are completed an
72                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
73                         </para>
74                 </description>
75         </manager>
76         <manager name="PJSIPShowResourceLists" language="en_US">
77                 <synopsis>
78                         Displays settings for configured resource lists.
79                 </synopsis>
80                 <syntax />
81                 <description>
82                         <para>
83                         Provides a listing of all resource lists.  An event <literal>ResourceListDetail</literal>
84                         is issued for each resource list object.  Once all detail events are completed a
85                         <literal>ResourceListDetailComplete</literal> event is issued.
86                         </para>
87                 </description>
88         </manager>
89
90         <configInfo name="res_pjsip_pubsub" language="en_US">
91                 <synopsis>Module that implements publish and subscribe support.</synopsis>
92                 <configFile name="pjsip.conf">
93                         <configObject name="subscription_persistence">
94                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
95                                 <configOption name="packet">
96                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
97                                 </configOption>
98                                 <configOption name="src_name">
99                                         <synopsis>The source address of the subscription</synopsis>
100                                 </configOption>
101                                 <configOption name="src_port">
102                                         <synopsis>The source port of the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="transport_key">
105                                         <synopsis>The type of transport the subscription was received on</synopsis>
106                                 </configOption>
107                                 <configOption name="local_name">
108                                         <synopsis>The local address the subscription was received on</synopsis>
109                                 </configOption>
110                                 <configOption name="local_port">
111                                         <synopsis>The local port the subscription was received on</synopsis>
112                                 </configOption>
113                                 <configOption name="cseq">
114                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
115                                 </configOption>
116                                 <configOption name="tag">
117                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
118                                 </configOption>
119                                 <configOption name="endpoint">
120                                         <synopsis>The name of the endpoint that subscribed</synopsis>
121                                 </configOption>
122                                 <configOption name="expires">
123                                         <synopsis>The time at which the subscription expires</synopsis>
124                                 </configOption>
125                         </configObject>
126                         <configObject name="resource_list">
127                                 <synopsis>Resource list configuration parameters.</synopsis>
128                                 <description>
129                                         <para>This configuration object allows for RFC 4662 resource list subscriptions
130                                         to be specified. This can be useful to decrease the amount of subscription traffic
131                                         that a server has to process.</para>
132                                         <note>
133                                                 <para>Current limitations limit the size of SIP NOTIFY requests that Asterisk sends
134                                                 to 64000 bytes. If your resource list notifications are larger than this maximum, you
135                                                 will need to make adjustments.</para>
136                                         </note>
137                                 </description>
138                                 <configOption name="type">
139                                         <synopsis>Must be of type 'resource_list'</synopsis>
140                                 </configOption>
141                                 <configOption name="event">
142                                         <synopsis>The SIP event package that the list resource belong to.</synopsis>
143                                         <description><para>
144                                                 The SIP event package describes the types of resources that Asterisk reports
145                                                 the state of.
146                                         </para>
147                                                 <enumlist>
148                                                         <enum name="presence"><para>
149                                                                 Device state and presence reporting.
150                                                         </para></enum>
151                                                         <enum name="dialog"><para>
152                                                                 This is identical to <replaceable>presence</replaceable>.
153                                                         </para></enum>
154                                                         <enum name="message-summary"><para>
155                                                                 Message-waiting indication (MWI) reporting.
156                                                         </para></enum>
157                                                 </enumlist>
158                                         </description>
159                                 </configOption>
160                                 <configOption name="list_item">
161                                         <synopsis>The name of a resource to report state on</synopsis>
162                                         <description>
163                                                 <para>In general Asterisk looks up list items in the following way:</para>
164                                                 <para>1. Check if the list item refers to another configured resource list.</para>
165                                                 <para>2. Pass the name of the resource off to event-package-specific handlers
166                                                    to find the specified resource.</para>
167                                                 <para>The second part means that the way the list item is specified depends
168                                                 on what type of list this is. For instance, if you have the <replaceable>event</replaceable>
169                                                 set to <literal>presence</literal>, then list items should be in the form of
170                                                 dialplan_extension@dialplan_context. For <literal>message-summary</literal> mailbox
171                                                 names should be listed.</para>
172                                         </description>
173                                 </configOption>
174                                 <configOption name="full_state" default="no">
175                                         <synopsis>Indicates if the entire list's state should be sent out.</synopsis>
176                                         <description>
177                                                 <para>If this option is enabled, and a resource changes state, then Asterisk will construct
178                                                 a notification that contains the state of all resources in the list. If the option is
179                                                 disabled, Asterisk will construct a notification that only contains the states of
180                                                 resources that have changed.</para>
181                                                 <note>
182                                                         <para>Even with this option disabled, there are certain situations where Asterisk is forced
183                                                         to send a notification with the states of all resources in the list. When a subscriber
184                                                         renews or terminates its subscription to the list, Asterisk MUST send a full state
185                                                         notification.</para>
186                                                 </note>
187                                         </description>
188                                 </configOption>
189                                 <configOption name="notification_batch_interval" default="0">
190                                         <synopsis>Time Asterisk should wait, in milliseconds, before sending notifications.</synopsis>
191                                         <description>
192                                                 <para>When a resource's state changes, it may be desired to wait a certain amount before Asterisk
193                                                 sends a notification to subscribers. This allows for other state changes to accumulate, so that
194                                                 Asterisk can communicate multiple state changes in a single notification instead of rapidly sending
195                                                 many notifications.</para>
196                                         </description>
197                                 </configOption>
198                         </configObject>
199                         <configObject name="inbound-publication">
200                                 <synopsis>The configuration for inbound publications</synopsis>
201                                 <configOption name="endpoint" default="">
202                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
203                                 </configOption>
204                                 <configOption name="type">
205                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
206                                 </configOption>
207                         </configObject>
208                 </configFile>
209         </configInfo>
210  ***/
211
212 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
213
214 static struct pjsip_module pubsub_module = {
215         .name = { "PubSub Module", 13 },
216         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
217         .on_rx_request = pubsub_on_rx_request,
218 };
219
220 #define MOD_DATA_PERSISTENCE "sub_persistence"
221 #define MOD_DATA_MSG "sub_msg"
222
223 static const pj_str_t str_event_name = { "Event", 5 };
224
225 /*! \brief Scheduler used for automatically expiring publications */
226 static struct ast_sched_context *sched;
227
228 /*! \brief Number of buckets for publications (on a per handler) */
229 #define PUBLICATIONS_BUCKETS 37
230
231 /*! \brief Default expiration time for PUBLISH if one is not specified */
232 #define DEFAULT_PUBLISH_EXPIRES 3600
233
234 /*! \brief Number of buckets for subscription datastore */
235 #define DATASTORE_BUCKETS 53
236
237 /*! \brief Default expiration for subscriptions */
238 #define DEFAULT_EXPIRES 3600
239
240 /*! \brief Defined method for PUBLISH */
241 const pjsip_method pjsip_publish_method =
242 {
243         PJSIP_OTHER_METHOD,
244         { "PUBLISH", 7 }
245 };
246
247 /*!
248  * \brief The types of PUBLISH messages defined in RFC 3903
249  */
250 enum sip_publish_type {
251         /*!
252          * \brief Unknown
253          *
254          * \details
255          * This actually is not defined in RFC 3903. We use this as a constant
256          * to indicate that an incoming PUBLISH does not fit into any of the
257          * other categories and is thus invalid.
258          */
259         SIP_PUBLISH_UNKNOWN,
260
261         /*!
262          * \brief Initial
263          *
264          * \details
265          * The first PUBLISH sent. This will contain a non-zero Expires header
266          * as well as a body that indicates the current state of the endpoint
267          * that has sent the message. The initial PUBLISH is the only type
268          * of PUBLISH to not contain a Sip-If-Match header in it.
269          */
270         SIP_PUBLISH_INITIAL,
271
272         /*!
273          * \brief Refresh
274          *
275          * \details
276          * Used to keep a published state from expiring. This will contain a
277          * non-zero Expires header but no body since its purpose is not to
278          * update state.
279          */
280         SIP_PUBLISH_REFRESH,
281
282         /*!
283          * \brief Modify
284          *
285          * \details
286          * Used to change state from its previous value. This will contain
287          * a body updating the published state. May or may not contain an
288          * Expires header.
289          */
290         SIP_PUBLISH_MODIFY,
291
292         /*!
293          * \brief Remove
294          *
295          * \details
296          * Used to remove published state from an ESC. This will contain
297          * an Expires header set to 0 and likely no body.
298          */
299         SIP_PUBLISH_REMOVE,
300 };
301
302 /*!
303  * \brief A vector of strings commonly used throughout this module
304  */
305 AST_VECTOR(resources, const char *);
306
307 /*!
308  * \brief Resource list configuration item
309  */
310 struct resource_list {
311         SORCERY_OBJECT(details);
312         /*! SIP event package the list uses. */
313         char event[32];
314         /*! Strings representing resources in the list. */
315         struct resources items;
316         /*! Indicates if Asterisk sends full or partial state on notifications. */
317         unsigned int full_state;
318         /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/
319         unsigned int notification_batch_interval;
320 };
321
322 /*!
323  * Used to create new entity IDs by ESCs.
324  */
325 static int esc_etag_counter;
326
327 /*!
328  * \brief Structure representing a SIP publication
329  */
330 struct ast_sip_publication {
331         /*! Publication datastores set up by handlers */
332         struct ao2_container *datastores;
333         /*! \brief Entity tag for the publication */
334         int entity_tag;
335         /*! \brief Handler for this publication */
336         struct ast_sip_publish_handler *handler;
337         /*! \brief The endpoint with which the subscription is communicating */
338         struct ast_sip_endpoint *endpoint;
339         /*! \brief Expiration time of the publication */
340         int expires;
341         /*! \brief Scheduled item for expiration of publication */
342         int sched_id;
343         /*! \brief The resource the publication is to */
344         char *resource;
345         /*! \brief The name of the event type configuration */
346         char *event_configuration_name;
347         /*! \brief Data containing the above */
348         char data[0];
349 };
350
351
352 /*!
353  * \brief Structure used for persisting an inbound subscription
354  */
355 struct subscription_persistence {
356         /*! Sorcery object details */
357         SORCERY_OBJECT(details);
358         /*! The name of the endpoint involved in the subscrption */
359         char *endpoint;
360         /*! SIP message that creates the subscription */
361         char packet[PJSIP_MAX_PKT_LEN];
362         /*! Source address of the message */
363         char src_name[PJ_INET6_ADDRSTRLEN];
364         /*! Source port of the message */
365         int src_port;
366         /*! Local transport key type */
367         char transport_key[32];
368         /*! Local transport address */
369         char local_name[PJ_INET6_ADDRSTRLEN];
370         /*! Local transport port */
371         int local_port;
372         /*! Next CSeq to use for message */
373         unsigned int cseq;
374         /*! Local tag of the dialog */
375         char *tag;
376         /*! When this subscription expires */
377         struct timeval expires;
378 };
379
380 /*!
381  * \brief A tree of SIP subscriptions
382  *
383  * Because of the ability to subscribe to resource lists, a SIP
384  * subscription can result in a tree of subscriptions being created.
385  * This structure represents the information relevant to the subscription
386  * as a whole, to include the underlying PJSIP structure for the
387  * subscription.
388  */
389 struct sip_subscription_tree {
390         /*! The endpoint with which the subscription is communicating */
391         struct ast_sip_endpoint *endpoint;
392         /*! Serializer on which to place operations for this subscription */
393         struct ast_taskprocessor *serializer;
394         /*! The role for this subscription */
395         enum ast_sip_subscription_role role;
396         /*! Persistence information */
397         struct subscription_persistence *persistence;
398         /*! The underlying PJSIP event subscription structure */
399         pjsip_evsub *evsub;
400         /*! The underlying PJSIP dialog */
401         pjsip_dialog *dlg;
402         /*! Interval to use for batching notifications */
403         unsigned int notification_batch_interval;
404         /*! Scheduler ID for batched notification */
405         int notify_sched_id;
406         /*! Indicator if scheduled batched notification should be sent */
407         unsigned int send_scheduled_notify;
408         /*! The root of the subscription tree */
409         struct ast_sip_subscription *root;
410         /*! Is this subscription to a list? */
411         int is_list;
412         /*! Next item in the list */
413         AST_LIST_ENTRY(sip_subscription_tree) next;
414         /*! Indicates that a NOTIFY is currently being sent on the SIP subscription */
415         int last_notify;
416 };
417
418 /*!
419  * \brief Structure representing a "virtual" SIP subscription.
420  *
421  * This structure serves a dual purpose. Structurally, it is
422  * the constructed tree of subscriptions based on the resources
423  * being subscribed to. API-wise, this serves as the handle that
424  * subscription handlers use in order to interact with the pubsub API.
425  */
426 struct ast_sip_subscription {
427         /*! Subscription datastores set up by handlers */
428         struct ao2_container *datastores;
429         /*! The handler for this subscription */
430         const struct ast_sip_subscription_handler *handler;
431         /*! Pointer to the base of the tree */
432         struct sip_subscription_tree *tree;
433         /*! Body generaator for NOTIFYs */
434         struct ast_sip_pubsub_body_generator *body_generator;
435         /*! Vector of child subscriptions */
436         AST_VECTOR(, struct ast_sip_subscription *) children;
437         /*! Saved NOTIFY body text for this subscription */
438         struct ast_str *body_text;
439         /*! Indicator that the body text has changed since the last notification */
440         int body_changed;
441         /*! The current state of the subscription */
442         pjsip_evsub_state subscription_state;
443         /*! For lists, the current version to place in the RLMI body */
444         unsigned int version;
445         /*! For lists, indicates if full state should always be communicated. */
446         unsigned int full_state;
447         /*! URI associated with the subscription */
448         pjsip_sip_uri *uri;
449         /*! Name of resource being subscribed to */
450         char resource[0];
451 };
452
453 /*!
454  * \brief Structure representing a publication resource
455  */
456 struct ast_sip_publication_resource {
457         /*! \brief Sorcery object details */
458         SORCERY_OBJECT(details);
459         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
460         char *endpoint;
461         /*! \brief Mapping for event types to configuration */
462         struct ast_variable *events;
463 };
464
465 static const char *sip_subscription_roles_map[] = {
466         [AST_SIP_SUBSCRIBER] = "Subscriber",
467         [AST_SIP_NOTIFIER] = "Notifier"
468 };
469
470 AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree);
471
472 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
473 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
474
475 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
476 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
477                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
478 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
479                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
480 static void pubsub_on_client_refresh(pjsip_evsub *sub);
481 static void pubsub_on_server_timeout(pjsip_evsub *sub);
482  
483 static pjsip_evsub_user pubsub_cb = {
484         .on_evsub_state = pubsub_on_evsub_state,
485         .on_rx_refresh = pubsub_on_rx_refresh,
486         .on_rx_notify = pubsub_on_rx_notify,
487         .on_client_refresh = pubsub_on_client_refresh,
488         .on_server_timeout = pubsub_on_server_timeout,
489 };
490
491 /*! \brief Destructor for publication resource */
492 static void publication_resource_destroy(void *obj)
493 {
494         struct ast_sip_publication_resource *resource = obj;
495
496         ast_free(resource->endpoint);
497         ast_variables_destroy(resource->events);
498 }
499
500 /*! \brief Allocator for publication resource */
501 static void *publication_resource_alloc(const char *name)
502 {
503         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
504 }
505
506 /*! \brief Destructor for subscription persistence */
507 static void subscription_persistence_destroy(void *obj)
508 {
509         struct subscription_persistence *persistence = obj;
510
511         ast_free(persistence->endpoint);
512         ast_free(persistence->tag);
513 }
514
515 /*! \brief Allocator for subscription persistence */
516 static void *subscription_persistence_alloc(const char *name)
517 {
518         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
519 }
520
521 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
522 static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree)
523 {
524         char tag[PJ_GUID_STRING_LENGTH + 1];
525
526         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
527          * look it up by id at all.
528          */
529         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
530                 "subscription_persistence", NULL);
531
532         pjsip_dialog *dlg = sub_tree->dlg;
533
534         if (!persistence) {
535                 return NULL;
536         }
537
538         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint));
539         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
540         persistence->tag = ast_strdup(tag);
541
542         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
543         return persistence;
544 }
545
546 /*! \brief Function which updates persistence information of a subscription in sorcery */
547 static void subscription_persistence_update(struct sip_subscription_tree *sub_tree,
548         pjsip_rx_data *rdata)
549 {
550         pjsip_dialog *dlg;
551
552         if (!sub_tree->persistence) {
553                 return;
554         }
555
556         dlg = sub_tree->dlg;
557         sub_tree->persistence->cseq = dlg->local.cseq;
558
559         if (rdata) {
560                 int expires;
561                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
562
563                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
564                 sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
565
566                 /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP
567                  * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf
568                  * will always point to the proper SIP message that is to be processed. When updating subscription
569                  * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will
570                  * only ever have a single SIP message on it, and so we base persistence on that.
571                  */
572                 if (rdata->msg_info.msg_buf) {
573                         ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf,
574                                         MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len));
575                 } else {
576                         ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet,
577                                         sizeof(sub_tree->persistence->packet));
578                 }
579                 ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name,
580                                 sizeof(sub_tree->persistence->src_name));
581                 sub_tree->persistence->src_port = rdata->pkt_info.src_port;
582                 ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name,
583                         sizeof(sub_tree->persistence->transport_key));
584                 ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host,
585                         sizeof(sub_tree->persistence->local_name));
586                 sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port;
587         }
588
589         ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence);
590 }
591
592 /*! \brief Function which removes persistence of a subscription from sorcery */
593 static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree)
594 {
595         if (!sub_tree->persistence) {
596                 return;
597         }
598
599         ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence);
600         ao2_ref(sub_tree->persistence, -1);
601         sub_tree->persistence = NULL;
602 }
603
604
605 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
606 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
607                 size_t num_accept, const char *body_type);
608
609 /*! \brief Retrieve a handler using the Event header of an rdata message */
610 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
611 {
612         pjsip_event_hdr *event_header;
613         char event[32];
614         struct ast_sip_subscription_handler *handler;
615
616         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
617         if (!event_header) {
618                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
619                 return NULL;
620         }
621         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
622
623         handler = find_sub_handler_for_event_name(event);
624         if (!handler) {
625                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
626         }
627
628         return handler;
629 }
630
631 /*!
632  * \brief Accept headers that are exceptions to the rule
633  *
634  * Typically, when a SUBSCRIBE arrives, we attempt to find a
635  * body generator that matches one of the Accept headers in
636  * the request. When subscribing to a single resource, this works
637  * great. However, when subscribing to a list, things work
638  * differently. Most Accept header values are fine, but there
639  * are a couple that are endemic to resource lists that need
640  * to be ignored when searching for a body generator to use
641  * for the individual resources of the subscription.
642  */
643 const char *accept_exceptions[] =  {
644         "multipart/related",
645         "application/rlmi+xml",
646 };
647
648 /*!
649  * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions?
650  *
651  * \retval 1 This Accept header value is an exception to the rule.
652  * \retval 0 This Accept header is not an exception to the rule.
653  */
654 static int exceptional_accept(const pj_str_t *accept)
655 {
656         int i;
657
658         for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) {
659                 if (!pj_strcmp2(accept, accept_exceptions[i])) {
660                         return 1;
661                 }
662         }
663
664         return 0;
665 }
666
667 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
668 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
669         const struct ast_sip_subscription_handler *handler)
670 {
671         pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr;
672         char accept[AST_SIP_MAX_ACCEPT][64];
673         size_t num_accept_headers = 0;
674
675         while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) {
676                 int i;
677
678                 for (i = 0; i < accept_header->count; ++i) {
679                         if (!exceptional_accept(&accept_header->values[i])) {
680                                 ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers]));
681                                 ++num_accept_headers;
682                         }
683                 }
684         }
685
686         if (num_accept_headers == 0) {
687                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
688                  * the default accept type for the event package is to be used.
689                  */
690                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
691                 num_accept_headers = 1;
692         }
693
694         return find_body_generator(accept, num_accept_headers, handler->body_type);
695 }
696
697 /*! \brief Check if the rdata has a Supported header containing 'eventlist'
698  *
699  *  \retval 1 rdata has an eventlist containing supported header
700  *  \retval 0 rdata doesn't have an eventlist containing supported header
701  */
702 static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata)
703 {
704         pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr;
705
706         while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) {
707                 int i;
708
709                 for (i = 0; i < supported_header->count; i++) {
710                         if (!pj_stricmp2(&supported_header->values[i], "eventlist")) {
711                                 return 1;
712                         }
713                 }
714         }
715
716         return 0;
717 }
718
719 struct resource_tree;
720
721 /*!
722  * \brief A node for a resource tree.
723  */
724 struct tree_node {
725         AST_VECTOR(, struct tree_node *) children;
726         unsigned int full_state;
727         char resource[0];
728 };
729
730 /*!
731  * \brief Helper function for retrieving a resource list for a given event.
732  *
733  * This will retrieve a resource list that corresponds to the resource and event provided.
734  *
735  * \param resource The name of the resource list to retrieve
736  * \param event The expected event name on the resource list
737  */
738 static struct resource_list *retrieve_resource_list(const char *resource, const char *event)
739 {
740         struct resource_list *list;
741
742         list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource);
743         if (!list) {
744                 return NULL;
745         }
746
747         if (strcmp(list->event, event)) {
748                 ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n",
749                                 resource, list->event, event);
750                 ao2_cleanup(list);
751                 return NULL;
752         }
753
754         return list;
755 }
756
757 /*!
758  * \brief Allocate a tree node
759  *
760  * In addition to allocating and initializing the tree node, the node is also added
761  * to the vector of visited resources. See \ref build_resource_tree for more information
762  * on the visited resources.
763  *
764  * \param resource The name of the resource for this tree node.
765  * \param visited The vector of resources that have been visited.
766  * \param if allocating a list, indicate whether full state is requested in notifications.
767  * \retval NULL Allocation failure.
768  * \retval non-NULL The newly-allocated tree_node
769  */
770 static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state)
771 {
772         struct tree_node *node;
773
774         node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1);
775         if (!node) {
776                 return NULL;
777         }
778
779         strcpy(node->resource, resource);
780         if (AST_VECTOR_INIT(&node->children, 4)) {
781                 ast_free(node);
782                 return NULL;
783         }
784         node->full_state = full_state;
785
786         if (visited) {
787                 AST_VECTOR_APPEND(visited, resource);
788         }
789         return node;
790 }
791
792 /*!
793  * \brief Destructor for a tree node
794  *
795  * This function calls recursively in order to destroy
796  * all nodes lower in the tree from the given node in
797  * addition to the node itself.
798  *
799  * \param node The node to destroy.
800  */
801 static void tree_node_destroy(struct tree_node *node)
802 {
803         int i;
804         if (!node) {
805                 return;
806         }
807
808         for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) {
809                 tree_node_destroy(AST_VECTOR_GET(&node->children, i));
810         }
811         AST_VECTOR_FREE(&node->children);
812         ast_free(node);
813 }
814
815 /*!
816  * \brief Determine if this resource has been visited already
817  *
818  * See \ref build_resource_tree for more information
819  *
820  * \param resource The resource currently being visited
821  * \param visited The resources that have previously been visited
822  */
823 static int have_visited(const char *resource, struct resources *visited)
824 {
825         int i;
826
827         for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) {
828                 if (!strcmp(resource, AST_VECTOR_GET(visited, i))) {
829                         return 1;
830                 }
831         }
832
833         return 0;
834 }
835
836 /*!
837  * \brief Build child nodes for a given parent.
838  *
839  * This iterates through the items on a resource list and creates tree nodes for each one. The
840  * tree nodes created are children of the supplied parent node. If an item in the resource
841  * list is itself a list, then this function is called recursively to provide children for
842  * the the new node.
843  *
844  * If an item in a resource list is not a list, then the supplied subscription handler is
845  * called into as if a new SUBSCRIBE for the list item were presented. The handler's response
846  * is used to determine if the node can be added to the tree or not.
847  *
848  * If a parent node ends up having no child nodes added under it, then the parent node is
849  * pruned from the tree.
850  *
851  * \param endpoint The endpoint that sent the inbound SUBSCRIBE.
852  * \param handler The subscription handler for leaf nodes in the tree.
853  * \param list The configured resource list from which the child node is being built.
854  * \param parent The parent node for these children.
855  * \param visited The resources that have already been visited.
856  */
857 static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
858                 struct resource_list *list, struct tree_node *parent, struct resources *visited)
859 {
860         int i;
861
862         for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) {
863                 struct tree_node *current;
864                 struct resource_list *child_list;
865                 const char *resource = AST_VECTOR_GET(&list->items, i);
866
867                 if (have_visited(resource, visited)) {
868                         ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource);
869                         continue;
870                 }
871
872                 child_list = retrieve_resource_list(resource, list->event);
873                 if (!child_list) {
874                         int resp = handler->notifier->new_subscribe(endpoint, resource);
875                         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
876                                 current = tree_node_alloc(resource, visited, 0);
877                                 if (!current) {
878                                         ast_debug(1, "Subscription to leaf resource %s was successful, but encountered"
879                                                         "allocation error afterwards\n", resource);
880                                         continue;
881                                 }
882                                 ast_debug(1, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n",
883                                                 resource, parent->resource);
884                                 AST_VECTOR_APPEND(&parent->children, current);
885                         } else {
886                                 ast_debug(1, "Subscription to leaf resource %s resulted in error response %d\n",
887                                                 resource, resp);
888                         }
889                 } else {
890                         ast_debug(1, "Resource %s (child of %s) is a list\n", resource, parent->resource);
891                         current = tree_node_alloc(resource, visited, child_list->full_state);
892                         if (!current) {
893                                 ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource);
894                                 continue;
895                         }
896                         build_node_children(endpoint, handler, child_list, current, visited);
897                         if (AST_VECTOR_SIZE(&current->children) > 0) {
898                                 ast_debug(1, "List %s had no successful children.\n", resource);
899                                 AST_VECTOR_APPEND(&parent->children, current);
900                         } else {
901                                 ast_debug(1, "List %s had successful children. Adding to parent %s\n",
902                                                 resource, parent->resource);
903                                 tree_node_destroy(current);
904                         }
905                         ao2_cleanup(child_list);
906                 }
907         }
908 }
909
910 /*!
911  * \brief A resource tree
912  *
913  * When an inbound SUBSCRIBE arrives, the resource being subscribed to may
914  * be a resource list. If this is the case, the resource list may contain resources
915  * that are themselves lists. The structure needed to hold the resources is
916  * a tree.
917  *
918  * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions
919  * to the individual resources in the tree would be successful or not. Any successful
920  * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions
921  * result in no node being created.
922  *
923  * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that
924  * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog.
925  */
926 struct resource_tree {
927         struct tree_node *root;
928         unsigned int notification_batch_interval;
929 };
930
931 /*!
932  * \brief Destroy a resource tree.
933  *
934  * This function makes no assumptions about how the tree itself was
935  * allocated and does not attempt to free the tree itself. Callers
936  * of this function are responsible for freeing the tree.
937  *
938  * \param tree The tree to destroy.
939  */
940 static void resource_tree_destroy(struct resource_tree *tree)
941 {
942         if (tree) {
943                 tree_node_destroy(tree->root);
944         }
945 }
946
947 /*!
948  * \brief Build a resource tree
949  *
950  * This function builds a resource tree based on the requested resource in a SUBSCRIBE request.
951  *
952  * This function also creates a container that has all resources that have been visited during
953  * creation of the tree, whether those resources resulted in a tree node being created or not.
954  * Keeping this container of visited resources allows for misconfigurations such as loops in
955  * the tree or duplicated resources to be detected.
956  *
957  * \param endpoint The endpoint that sent the SUBSCRIBE request.
958  * \param handler The subscription handler for leaf nodes in the tree.
959  * \param resource The resource requested in the SUBSCRIBE request.
960  * \param tree The tree that is to be built.
961  * \param has_eventlist_support
962  *
963  * \retval 200-299 Successfully subscribed to at least one resource.
964  * \retval 300-699 Failure to subscribe to requested resource.
965  */
966 static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler,
967                 const char *resource, struct resource_tree *tree, int has_eventlist_support)
968 {
969         RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup);
970         struct resources visited;
971
972         if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) {
973                 ast_debug(1, "Subscription to resource %s is not to a list\n", resource);
974                 tree->root = tree_node_alloc(resource, NULL, 0);
975                 if (!tree->root) {
976                         return 500;
977                 }
978                 return handler->notifier->new_subscribe(endpoint, resource);
979         }
980
981         ast_debug(1, "Subscription to resource %s is a list\n", resource);
982         if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) {
983                 return 500;
984         }
985
986         tree->root = tree_node_alloc(resource, &visited, list->full_state);
987         if (!tree->root) {
988                 AST_VECTOR_FREE(&visited);
989                 return 500;
990         }
991
992         tree->notification_batch_interval = list->notification_batch_interval;
993
994         build_node_children(endpoint, handler, list, tree->root, &visited);
995         AST_VECTOR_FREE(&visited);
996
997         if (AST_VECTOR_SIZE(&tree->root->children) > 0) {
998                 return 200;
999         } else {
1000                 return 500;
1001         }
1002 }
1003
1004 static int datastore_hash(const void *obj, int flags)
1005 {
1006         const struct ast_datastore *datastore = obj;
1007         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
1008
1009         ast_assert(uid != NULL);
1010
1011         return ast_str_hash(uid);
1012 }
1013
1014 static int datastore_cmp(void *obj, void *arg, int flags)
1015 {
1016         const struct ast_datastore *datastore1 = obj;
1017         const struct ast_datastore *datastore2 = arg;
1018         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
1019
1020         ast_assert(datastore1->uid != NULL);
1021         ast_assert(uid2 != NULL);
1022
1023         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
1024 }
1025
1026 static void add_subscription(struct sip_subscription_tree *obj)
1027 {
1028         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1029         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
1030 }
1031
1032 static void remove_subscription(struct sip_subscription_tree *obj)
1033 {
1034         struct sip_subscription_tree *i;
1035         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1036         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
1037                 if (i == obj) {
1038                         AST_RWLIST_REMOVE_CURRENT(next);
1039                         if (i->root) {
1040                                 ast_debug(1, "Removing subscription to resource %s from list of subscriptions\n",
1041                                                 ast_sip_subscription_get_resource_name(i->root));
1042                         }
1043                         break;
1044                 }
1045         }
1046         AST_RWLIST_TRAVERSE_SAFE_END;
1047 }
1048
1049 static void destroy_subscription(struct ast_sip_subscription *sub)
1050 {
1051         ast_debug(3, "Destroying SIP subscription to resource %s\n", sub->resource);
1052         ast_free(sub->body_text);
1053
1054         AST_VECTOR_FREE(&sub->children);
1055         ao2_cleanup(sub->datastores);
1056         ast_free(sub);
1057 }
1058
1059 static void destroy_subscriptions(struct ast_sip_subscription *root)
1060 {
1061         int i;
1062
1063         if (!root) {
1064                 return;
1065         }
1066
1067         for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) {
1068                 struct ast_sip_subscription *child;
1069
1070                 child = AST_VECTOR_GET(&root->children, i);
1071                 destroy_subscriptions(child);
1072         }
1073
1074         destroy_subscription(root);
1075 }
1076
1077 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
1078                 const char *resource, struct sip_subscription_tree *tree)
1079 {
1080         struct ast_sip_subscription *sub;
1081         pjsip_sip_uri *contact_uri;
1082
1083         sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1);
1084         if (!sub) {
1085                 return NULL;
1086         }
1087         strcpy(sub->resource, resource); /* Safe */
1088
1089         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
1090         if (!sub->datastores) {
1091                 destroy_subscription(sub);
1092                 return NULL;
1093         }
1094
1095         sub->body_text = ast_str_create(128);
1096         if (!sub->body_text) {
1097                 destroy_subscription(sub);
1098                 return NULL;
1099         }
1100
1101         sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE);
1102         contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri);
1103         pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri);
1104         pj_strdup2(tree->dlg->pool, &sub->uri->user, resource);
1105
1106         sub->handler = handler;
1107         sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE;
1108         sub->tree = ao2_bump(tree);
1109
1110         return sub;
1111 }
1112
1113 /*!
1114  * \brief Create a tree of virtual subscriptions based on a resource tree node.
1115  *
1116  * \param handler The handler to supply to leaf subscriptions.
1117  * \param resource The requested resource for this subscription.
1118  * \param generator Body generator to use for leaf subscriptions.
1119  * \param tree The root of the subscription tree.
1120  * \param current The tree node that corresponds to the subscription being created.
1121  */
1122 static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler,
1123                 const char *resource, struct ast_sip_pubsub_body_generator *generator,
1124                 struct sip_subscription_tree *tree, struct tree_node *current)
1125 {
1126         int i;
1127         struct ast_sip_subscription *sub;
1128
1129         sub = allocate_subscription(handler, resource, tree);
1130         if (!sub) {
1131                 return NULL;
1132         }
1133
1134         sub->full_state = current->full_state;
1135         sub->body_generator = generator;
1136         AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(&current->children));
1137
1138         for (i = 0; i < AST_VECTOR_SIZE(&current->children); ++i) {
1139                 struct ast_sip_subscription *child;
1140                 struct tree_node *child_node = AST_VECTOR_GET(&current->children, i);
1141
1142                 child = create_virtual_subscriptions(handler, child_node->resource, generator,
1143                                 tree, child_node);
1144
1145                 if (!child) {
1146                         ast_debug(1, "Child subscription to resource %s could not be created\n",
1147                                         child_node->resource);
1148                         continue;
1149                 }
1150
1151                 if (AST_VECTOR_APPEND(&sub->children, child)) {
1152                         ast_debug(1, "Child subscription to resource %s could not be appended\n",
1153                                         child_node->resource);
1154                 }
1155         }
1156
1157         return sub;
1158 }
1159
1160 static void shutdown_subscriptions(struct ast_sip_subscription *sub)
1161 {
1162         int i;
1163
1164         if (!sub) {
1165                 return;
1166         }
1167
1168         if (AST_VECTOR_SIZE(&sub->children) > 0) {
1169                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
1170                         shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i));
1171                 }
1172                 return;
1173         }
1174
1175         /* We notify subscription shutdown only on the tree leaves. */
1176         if (sub->handler->subscription_shutdown) {
1177                 sub->handler->subscription_shutdown(sub);
1178         }
1179 }
1180 static int subscription_unreference_dialog(void *obj)
1181 {
1182         struct sip_subscription_tree *sub_tree = obj;
1183
1184         /* This is why we keep the dialog on the subscription. When the subscription
1185          * is destroyed, there is no guarantee that the underlying dialog is ready
1186          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
1187          * either. The dialog could be destroyed before our subscription is. We fix
1188          * this problem by keeping a reference to the dialog until it is time to
1189          * destroy the subscription. We need to have the dialog available when the
1190          * subscription is destroyed so that we can guarantee that our attempt to
1191          * remove the serializer will be successful.
1192          */
1193         pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module);
1194         sub_tree->dlg = NULL;
1195
1196         return 0;
1197 }
1198
1199 static void subscription_tree_destructor(void *obj)
1200 {
1201         struct sip_subscription_tree *sub_tree = obj;
1202
1203         ast_debug(3, "Destroying subscription tree %p\n", sub_tree);
1204
1205         ao2_cleanup(sub_tree->endpoint);
1206
1207         destroy_subscriptions(sub_tree->root);
1208
1209         if (sub_tree->dlg) {
1210                 ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree);
1211         }
1212
1213         ast_taskprocessor_unreference(sub_tree->serializer);
1214         ast_module_unref(ast_module_info->self);
1215 }
1216
1217 void ast_sip_subscription_destroy(struct ast_sip_subscription *sub)
1218 {
1219         ast_debug(3, "Removing subscription %p reference to subscription tree %p\n", sub, sub->tree);
1220         ao2_cleanup(sub->tree);
1221 }
1222
1223 static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg)
1224 {
1225         sub_tree->dlg = dlg;
1226         ast_sip_dialog_set_serializer(dlg, sub_tree->serializer);
1227         ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint);
1228         pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree);
1229         pjsip_dlg_inc_session(dlg, &pubsub_module);
1230 }
1231
1232 static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint)
1233 {
1234         struct sip_subscription_tree *sub_tree;
1235         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
1236
1237         sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor);
1238         if (!sub_tree) {
1239                 return NULL;
1240         }
1241
1242         ast_module_ref(ast_module_info->self);
1243
1244         /* Create name with seq number appended. */
1245         ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s",
1246                 ast_sorcery_object_get_id(endpoint));
1247
1248         sub_tree->serializer = ast_sip_create_serializer(tps_name);
1249         if (!sub_tree->serializer) {
1250                 ao2_ref(sub_tree, -1);
1251                 return NULL;
1252         }
1253
1254         sub_tree->endpoint = ao2_bump(endpoint);
1255         sub_tree->notify_sched_id = -1;
1256
1257         return sub_tree;
1258 }
1259
1260 /*!
1261  * \brief Create a subscription tree based on a resource tree.
1262  *
1263  * Using the previously-determined valid resources in the provided resource tree,
1264  * a corresponding tree of ast_sip_subscriptions are created. The root of the
1265  * subscription tree is a real subscription, and the rest in the tree are
1266  * virtual subscriptions.
1267  *
1268  * \param handler The handler to use for leaf subscriptions
1269  * \param endpoint The endpoint that sent the SUBSCRIBE request
1270  * \param rdata The SUBSCRIBE content
1271  * \param resource The requested resource in the SUBSCRIBE request
1272  * \param generator The body generator to use in leaf subscriptions
1273  * \param tree The resource tree on which the subscription tree is based
1274  * \param dlg_status[out] The result of attempting to create a dialog.
1275  *
1276  * \retval NULL Could not create the subscription tree
1277  * \retval non-NULL The root of the created subscription tree
1278  */
1279
1280 static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler,
1281                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
1282                 struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree,
1283                 pj_status_t *dlg_status)
1284 {
1285         struct sip_subscription_tree *sub_tree;
1286         pjsip_dialog *dlg;
1287         struct subscription_persistence *persistence;
1288
1289         sub_tree = allocate_subscription_tree(endpoint);
1290         if (!sub_tree) {
1291                 *dlg_status = PJ_ENOMEM;
1292                 return NULL;
1293         }
1294         sub_tree->role = AST_SIP_NOTIFIER;
1295
1296         dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status);
1297         if (!dlg) {
1298                 if (*dlg_status != PJ_EEXISTS) {
1299                         ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1300                 }
1301                 ao2_ref(sub_tree, -1);
1302                 return NULL;
1303         }
1304
1305         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
1306                         pubsub_module.id, MOD_DATA_PERSISTENCE);
1307         if (persistence) {
1308                 /* Update the created dialog with the persisted information */
1309                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
1310                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
1311                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
1312                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
1313                 dlg->local.cseq = persistence->cseq;
1314                 dlg->remote.cseq = persistence->cseq;
1315         }
1316
1317         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub);
1318         subscription_setup_dialog(sub_tree, dlg);
1319
1320         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
1321                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
1322
1323         sub_tree->notification_batch_interval = tree->notification_batch_interval;
1324
1325         sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root);
1326         if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) {
1327                 sub_tree->is_list = 1;
1328         }
1329
1330         add_subscription(sub_tree);
1331
1332         return sub_tree;
1333 }
1334
1335 static int initial_notify_task(void *obj);
1336 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state);
1337
1338 /*! \brief Callback function to perform the actual recreation of a subscription */
1339 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
1340 {
1341         struct subscription_persistence *persistence = obj;
1342         pj_pool_t *pool = arg;
1343         pjsip_rx_data rdata = { { 0, }, };
1344         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1345         struct sip_subscription_tree *sub_tree;
1346         struct ast_sip_pubsub_body_generator *generator;
1347         int resp;
1348         char *resource;
1349         size_t resource_size;
1350         pjsip_sip_uri *request_uri;
1351         struct resource_tree tree;
1352         pjsip_expires_hdr *expires_header;
1353         struct ast_sip_subscription_handler *handler;
1354
1355         /* If this subscription has already expired remove it */
1356         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
1357                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1358                 return 0;
1359         }
1360
1361         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
1362         if (!endpoint) {
1363                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
1364                         persistence->endpoint);
1365                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1366                 return 0;
1367         }
1368
1369         pj_pool_reset(pool);
1370         rdata.tp_info.pool = pool;
1371
1372         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
1373                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
1374                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
1375                         persistence->endpoint);
1376                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1377                 return 0;
1378         }
1379
1380         if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) {
1381                 ast_log(LOG_NOTICE, "Endpoint %s persisted a SIP response instead of a subscribe request. Unable to reload subscription.\n",
1382                                 ast_sorcery_object_get_id(endpoint));
1383                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1384                 return 0;
1385         }
1386
1387         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
1388         resource_size = pj_strlen(&request_uri->user) + 1;
1389         resource = ast_alloca(resource_size);
1390         ast_copy_pj_str(resource, &request_uri->user, resource_size);
1391
1392         /* Update the expiration header with the new expiration */
1393         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
1394         if (!expires_header) {
1395                 expires_header = pjsip_expires_hdr_create(pool, 0);
1396                 if (!expires_header) {
1397                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1398                         return 0;
1399                 }
1400                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
1401         }
1402         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
1403
1404         handler = subscription_get_handler_from_rdata(&rdata);
1405         if (!handler || !handler->notifier) {
1406                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1407                 return 0;
1408         }
1409
1410         generator = subscription_get_generator_from_rdata(&rdata, handler);
1411         if (!generator) {
1412                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1413                 return 0;
1414         }
1415
1416         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
1417                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
1418
1419         memset(&tree, 0, sizeof(tree));
1420         resp = build_resource_tree(endpoint, handler, resource, &tree,
1421                 ast_sip_pubsub_has_eventlist_support(&rdata));
1422         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1423                 pj_status_t dlg_status;
1424
1425                 sub_tree = create_subscription_tree(handler, endpoint, &rdata, resource, generator, &tree, &dlg_status);
1426                 if (!sub_tree) {
1427                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1428                         ast_log(LOG_WARNING, "Failed to re-create subscription for %s\n", persistence->endpoint);
1429                         return 0;
1430                 }
1431                 sub_tree->persistence = ao2_bump(persistence);
1432                 subscription_persistence_update(sub_tree, &rdata);
1433                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
1434                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
1435                         ao2_ref(sub_tree, -1);
1436                 }
1437         } else {
1438                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
1439         }
1440         resource_tree_destroy(&tree);
1441
1442         return 0;
1443 }
1444
1445 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
1446 static int subscription_persistence_load(void *data)
1447 {
1448         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
1449                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
1450         pj_pool_t *pool;
1451
1452         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
1453                 PJSIP_POOL_RDATA_INC);
1454         if (!pool) {
1455                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
1456                 return 0;
1457         }
1458
1459         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
1460
1461         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
1462
1463         ao2_ref(persisted_subscriptions, -1);
1464         return 0;
1465 }
1466
1467 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
1468 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
1469 {
1470         struct ast_json_payload *payload;
1471         const char *type;
1472
1473         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
1474                 return;
1475         }
1476
1477         payload = stasis_message_data(message);
1478         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
1479
1480         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
1481          * recreate SIP subscriptions.
1482          */
1483         if (strcmp(type, "FullyBooted")) {
1484                 return;
1485         }
1486
1487         /* This has to be here so the subscription is recreated when the body generator is available */
1488         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1489
1490         /* Once the system is fully booted we don't care anymore */
1491         stasis_unsubscribe(sub);
1492 }
1493
1494 typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg);
1495
1496 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
1497 {
1498         int num = 0;
1499         struct sip_subscription_tree *i;
1500         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1501
1502         if (!on_subscription) {
1503                 return num;
1504         }
1505
1506         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
1507                 if (on_subscription(i, arg)) {
1508                         break;
1509                 }
1510                 ++num;
1511         }
1512         return num;
1513 }
1514
1515 static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree,
1516                                     struct ast_str **buf)
1517 {
1518         char str[256];
1519         struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id;
1520
1521         ast_str_append(buf, 0, "Role: %s\r\n",
1522                        sip_subscription_roles_map[sub_tree->role]);
1523         ast_str_append(buf, 0, "Endpoint: %s\r\n",
1524                        ast_sorcery_object_get_id(sub_tree->endpoint));
1525
1526         if (sub_tree->dlg) {
1527                 ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str));
1528         } else {
1529                 ast_copy_string(str, "<unknown>", sizeof(str));
1530         }
1531         ast_str_append(buf, 0, "Callid: %s\r\n", str);
1532
1533         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub));
1534
1535         ast_callerid_merge(str, sizeof(str),
1536                            S_COR(id->self.name.valid, id->self.name.str, NULL),
1537                            S_COR(id->self.number.valid, id->self.number.str, NULL),
1538                            "Unknown");
1539
1540         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
1541
1542         /* XXX This needs to be done recursively for lists */
1543         if (sub_tree->root->handler->to_ami) {
1544                 sub_tree->root->handler->to_ami(sub_tree->root, buf);
1545         }
1546 }
1547
1548
1549 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
1550 {
1551         pjsip_dialog *dlg;
1552         pjsip_msg *msg;
1553         pj_str_t name;
1554
1555         dlg = sub->tree->dlg;
1556         msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
1557         pj_cstr(&name, header);
1558
1559         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
1560 }
1561
1562 /*!
1563  * \internal
1564  * \brief Wrapper for pjsip_evsub_send_request
1565  *
1566  * This function (re)sets the transport before sending to catch cases
1567  * where the transport might have changed.
1568  *
1569  * If pjproject gives us the ability to resend, we'll only reset the transport
1570  * if PJSIP_ETPNOTAVAIL is returned from send.
1571  *
1572  * \returns pj_status_t
1573  */
1574 static pj_status_t internal_pjsip_evsub_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1575 {
1576         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_NONE, };
1577
1578         ast_sip_set_tpselector_from_transport_name(sub_tree->endpoint->transport, &selector);
1579         pjsip_dlg_set_transport(sub_tree->dlg, &selector);
1580
1581         return pjsip_evsub_send_request(sub_tree->evsub, tdata);
1582 }
1583
1584 /* XXX This function is not used. */
1585 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
1586                 struct ast_sip_endpoint *endpoint, const char *resource)
1587 {
1588         struct ast_sip_subscription *sub;
1589         pjsip_dialog *dlg;
1590         struct ast_sip_contact *contact;
1591         pj_str_t event;
1592         pjsip_tx_data *tdata;
1593         pjsip_evsub *evsub;
1594         struct sip_subscription_tree *sub_tree = NULL;
1595
1596         sub_tree = allocate_subscription_tree(endpoint);
1597         if (!sub_tree) {
1598                 return NULL;
1599         }
1600
1601         sub = allocate_subscription(handler, resource, sub_tree);
1602         if (!sub) {
1603                 ao2_cleanup(sub_tree);
1604                 return NULL;
1605         }
1606
1607         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
1608         if (!contact || ast_strlen_zero(contact->uri)) {
1609                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
1610                                 ast_sorcery_object_get_id(endpoint));
1611                 ao2_ref(sub_tree, -1);
1612                 ao2_cleanup(contact);
1613                 return NULL;
1614         }
1615
1616         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
1617         ao2_cleanup(contact);
1618         if (!dlg) {
1619                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
1620                 ao2_ref(sub_tree, -1);
1621                 return NULL;
1622         }
1623
1624         pj_cstr(&event, handler->event_name);
1625         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub);
1626         subscription_setup_dialog(sub_tree, dlg);
1627
1628         evsub = sub_tree->evsub;
1629
1630         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1631                 internal_pjsip_evsub_send_request(sub_tree, tdata);
1632         } else {
1633                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
1634                  * being called and terminating the subscription. Therefore, we don't
1635                  * need to decrease the reference count of sub here.
1636                  */
1637                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1638                 ao2_ref(sub_tree, -1);
1639                 return NULL;
1640         }
1641
1642         add_subscription(sub_tree);
1643
1644         return sub;
1645 }
1646
1647 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
1648 {
1649         ast_assert(sub->tree->endpoint != NULL);
1650         return ao2_bump(sub->tree->endpoint);
1651 }
1652
1653 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
1654 {
1655         ast_assert(sub->tree->serializer != NULL);
1656         return sub->tree->serializer;
1657 }
1658
1659 /*!
1660  * \brief Pre-allocate a buffer for the transmission
1661  *
1662  * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer
1663  * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt
1664  * to write the packet to the allocated buffer. If the buffer is too small to hold the
1665  * packet, then we get told the message is too long to be sent.
1666  *
1667  * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed
1668  * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default,
1669  * we instead take the strategy of pre-allocating the buffer, testing for ourselves
1670  * if the message will fit, and resizing the buffer as required.
1671  *
1672  * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping
1673  * it at 64000 for a couple of reasons:
1674  * 1) Allocating more than 64K at a time is hard to justify
1675  * 2) If the message goes through proxies, those proxies will want to add Via and
1676  *    Record-Route headers, making the message even larger. Giving some space for
1677  *    those headers is a nice thing to do.
1678  *
1679  * RFC 3261 does not place an upper limit on the size of TCP requests, but we are
1680  * going to impose the same 64K limit as a memory savings.
1681  *
1682  * \param tdata The tdata onto which to allocate a buffer
1683  * \retval 0 Success
1684  * \retval -1 The message is too large
1685  */
1686 static int allocate_tdata_buffer(pjsip_tx_data *tdata)
1687 {
1688         int buf_size;
1689         int size = -1;
1690         char *buf;
1691
1692         for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) {
1693                 buf = pj_pool_alloc(tdata->pool, buf_size);
1694                 size = pjsip_msg_print(tdata->msg, buf, buf_size);
1695         }
1696
1697         if (size == -1) {
1698                 return -1;
1699         }
1700
1701         tdata->buf.start = buf;
1702         tdata->buf.cur = tdata->buf.start;
1703         tdata->buf.end = tdata->buf.start + buf_size;
1704
1705         return 0;
1706 }
1707
1708 static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata)
1709 {
1710 #ifdef TEST_FRAMEWORK
1711         struct ast_sip_endpoint *endpoint = sub_tree->endpoint;
1712         pjsip_evsub *evsub = sub_tree->evsub;
1713 #endif
1714         int res;
1715
1716         if (allocate_tdata_buffer(tdata)) {
1717                 ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info);
1718                 return -1;
1719         }
1720
1721         res = internal_pjsip_evsub_send_request(sub_tree, tdata);
1722
1723         subscription_persistence_update(sub_tree, NULL);
1724
1725         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
1726                 "StateText: %s\r\n"
1727                 "Endpoint: %s\r\n",
1728                 pjsip_evsub_get_state_name(evsub),
1729                 ast_sorcery_object_get_id(endpoint));
1730
1731         return (res == PJ_SUCCESS ? 0 : -1);
1732 }
1733
1734 /*!
1735  * \brief Add a resource XML element to an RLMI body
1736  *
1737  * Each resource element represents a subscribed resource in the list. This function currently
1738  * will unconditionally add an instance element to each created resource element. Instance
1739  * elements refer to later parts in the multipart body.
1740  *
1741  * \param pool PJLIB allocation pool
1742  * \param cid Content-ID header of the resource
1743  * \param resource_name Name of the resource
1744  * \param resource_uri URI of the resource
1745  * \param state State of the subscribed resource
1746  */
1747 static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid,
1748                 const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state)
1749 {
1750         static pj_str_t cid_name = { "cid", 3 };
1751         pj_xml_node *resource;
1752         pj_xml_node *name;
1753         pj_xml_node *instance;
1754         pj_xml_attr *cid_attr;
1755         char id[6];
1756         char uri[PJSIP_MAX_URL_SIZE];
1757
1758         /* This creates a string representing the Content-ID without the enclosing < > */
1759         const pj_str_t cid_stripped = {
1760                 .ptr = cid->hvalue.ptr + 1,
1761                 .slen = cid->hvalue.slen - 2,
1762         };
1763
1764         resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource");
1765         name = ast_sip_presence_xml_create_node(pool, resource, "name");
1766         instance = ast_sip_presence_xml_create_node(pool, resource, "instance");
1767
1768         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri));
1769         ast_sip_presence_xml_create_attr(pool, resource, "uri", uri);
1770
1771         pj_strdup2(pool, &name->content, resource_name);
1772
1773         ast_generate_random_string(id, sizeof(id));
1774
1775         ast_sip_presence_xml_create_attr(pool, instance, "id", id);
1776         ast_sip_presence_xml_create_attr(pool, instance, "state",
1777                         state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active");
1778
1779         /* Use the PJLIB-util XML library directly here since we are using a
1780          * pj_str_t
1781          */
1782
1783         cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped);
1784         pj_xml_add_attr(instance, cid_attr);
1785 }
1786
1787 /*!
1788  * \brief A multipart body part and meta-information
1789  *
1790  * When creating a multipart body part, the end result (the
1791  * pjsip_multipart_part) is hard to inspect without undoing
1792  * a lot of what was done to create it. Therefore, we use this
1793  * structure to store meta-information about the body part.
1794  *
1795  * The main consumer of this is the creator of the RLMI body
1796  * part of a multipart resource list body.
1797  */
1798 struct body_part {
1799         /*! Content-ID header for the body part */
1800         pjsip_generic_string_hdr *cid;
1801         /*! Subscribed resource represented in the body part */
1802         const char *resource;
1803         /*! URI for the subscribed body part */
1804         pjsip_sip_uri *uri;
1805         /*! Subscription state of the resource represented in the body part */
1806         pjsip_evsub_state state;
1807         /*! The actual body part that will be present in the multipart body */
1808         pjsip_multipart_part *part;
1809 };
1810
1811 /*!
1812  * \brief Type declaration for container of body part structures
1813  */
1814 AST_VECTOR(body_part_list, struct body_part *);
1815
1816 /*!
1817  * \brief Create a Content-ID header
1818  *
1819  * Content-ID headers are required by RFC2387 for multipart/related
1820  * bodies. They serve as identifiers for each part of the multipart body.
1821  *
1822  * \param pool PJLIB allocation pool
1823  * \param sub Subscription to a resource
1824  */
1825 static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool,
1826                 const struct ast_sip_subscription *sub)
1827 {
1828         static const pj_str_t cid_name = { "Content-ID", 10 };
1829         pjsip_generic_string_hdr *cid;
1830         char id[6];
1831         size_t alloc_size;
1832         pj_str_t cid_value;
1833
1834         /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */
1835         alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3;
1836         cid_value.ptr = pj_pool_alloc(pool, alloc_size);
1837         cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>",
1838                         ast_generate_random_string(id, sizeof(id)),
1839                         (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host));
1840         cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value);
1841
1842         return cid;
1843 }
1844
1845 static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size)
1846 {
1847         int num_printed;
1848         pj_xml_node *rlmi = msg_body->data;
1849
1850         num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE);
1851         if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) {
1852                 return -1;
1853         }
1854
1855         return num_printed;
1856 }
1857
1858 static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len)
1859 {
1860         const pj_xml_node *rlmi = data;
1861
1862         return pj_xml_clone(pool, rlmi);
1863 }
1864
1865 /*!
1866  * \brief Create an RLMI body part for a multipart resource list body
1867  *
1868  * RLMI (Resource list meta information) is a special body type that lists
1869  * the subscribed resources and tells subscribers the number of subscribed
1870  * resources and what other body parts are in the multipart body. The
1871  * RLMI body also has a version number that a subscriber can use to ensure
1872  * that the locally-stored state corresponds to server state.
1873  *
1874  * \param pool The allocation pool
1875  * \param sub The subscription representing the subscribed resource list
1876  * \param body_parts A container of body parts that RLMI will refer to
1877  * \param full_state Indicates whether this is a full or partial state notification
1878  * \return The multipart part representing the RLMI body
1879  */
1880 static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
1881                 struct body_part_list *body_parts, unsigned int full_state)
1882 {
1883         static const pj_str_t rlmi_type = { "application", 11 };
1884         static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 };
1885         pj_xml_node *rlmi;
1886         pj_xml_node *name;
1887         pjsip_multipart_part *rlmi_part;
1888         char version_str[32];
1889         char uri[PJSIP_MAX_URL_SIZE];
1890         pjsip_generic_string_hdr *cid;
1891         int i;
1892
1893         rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list");
1894         ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi");
1895
1896         ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri));
1897         ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri);
1898
1899         snprintf(version_str, sizeof(version_str), "%u", sub->version++);
1900         ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str);
1901         ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false");
1902
1903         name = ast_sip_presence_xml_create_node(pool, rlmi, "name");
1904         pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub));
1905
1906         for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) {
1907                 const struct body_part *part = AST_VECTOR_GET(body_parts, i);
1908
1909                 add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state);
1910         }
1911
1912         rlmi_part = pjsip_multipart_create_part(pool);
1913
1914         rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body);
1915         pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type);
1916         pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype);
1917         pj_list_init(&rlmi_part->body->content_type.param);
1918
1919         rlmi_part->body->data = pj_xml_clone(pool, rlmi);
1920         rlmi_part->body->clone_data = rlmi_clone_data;
1921         rlmi_part->body->print_body = rlmi_print_body;
1922
1923         cid = generate_content_id_hdr(pool, sub);
1924         pj_list_insert_before(&rlmi_part->hdr, cid);
1925
1926         return rlmi_part;
1927 }
1928
1929 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
1930                 unsigned int force_full_state);
1931
1932 /*!
1933  * \brief Destroy a list of body parts
1934  *
1935  * \param parts The container of parts to destroy
1936  */
1937 static void free_body_parts(struct body_part_list *parts)
1938 {
1939         int i;
1940
1941         for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) {
1942                 struct body_part *part = AST_VECTOR_GET(parts, i);
1943                 ast_free(part);
1944         }
1945
1946         AST_VECTOR_FREE(parts);
1947 }
1948
1949 /*!
1950  * \brief Allocate and initialize a body part structure
1951  *
1952  * \param pool PJLIB allocation pool
1953  * \param sub Subscription representing a subscribed resource
1954  */
1955 static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub)
1956 {
1957         struct body_part *bp;
1958
1959         bp = ast_calloc(1, sizeof(*bp));
1960         if (!bp) {
1961                 return NULL;
1962         }
1963
1964         bp->cid = generate_content_id_hdr(pool, sub);
1965         bp->resource = sub->resource;
1966         bp->state = sub->subscription_state;
1967         bp->uri = sub->uri;
1968
1969         return bp;
1970 }
1971
1972 /*!
1973  * \brief Create a multipart body part for a subscribed resource
1974  *
1975  * \param pool PJLIB allocation pool
1976  * \param sub The subscription representing a subscribed resource
1977  * \param parts A vector of parts to append the created part to.
1978  * \param use_full_state Unused locally, but may be passed to other functions
1979  */
1980 static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub,
1981                 struct body_part_list *parts, unsigned int use_full_state)
1982 {
1983         struct body_part *bp;
1984         pjsip_msg_body *body;
1985
1986         bp = allocate_body_part(pool, sub);
1987         if (!bp) {
1988                 return;
1989         }
1990
1991         body = generate_notify_body(pool, sub, use_full_state);
1992         if (!body) {
1993                 /* Partial state was requested and the resource has not changed state */
1994                 ast_free(bp);
1995                 return;
1996         }
1997
1998         bp->part = pjsip_multipart_create_part(pool);
1999         bp->part->body = body;
2000         pj_list_insert_before(&bp->part->hdr, bp->cid);
2001
2002         AST_VECTOR_APPEND(parts, bp);
2003 }
2004
2005 /*!
2006  * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription
2007  *
2008  * \param pool
2009  * \return The multipart message body
2010  */
2011 static pjsip_msg_body *create_multipart_body(pj_pool_t *pool)
2012 {
2013         pjsip_media_type media_type;
2014         pjsip_param *media_type_param;
2015         char boundary[6];
2016         pj_str_t pj_boundary;
2017
2018         pjsip_media_type_init2(&media_type, "multipart", "related");
2019
2020         media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param));
2021         pj_list_init(media_type_param);
2022
2023         pj_strdup2(pool, &media_type_param->name, "type");
2024         pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\"");
2025
2026         pj_list_insert_before(&media_type.param, media_type_param);
2027
2028         pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary)));
2029         return pjsip_multipart_create(pool, &media_type, &pj_boundary);
2030 }
2031
2032 /*!
2033  * \brief Create a resource list body for NOTIFY requests
2034  *
2035  * Resource list bodies are multipart/related bodies. The first part of the multipart body
2036  * is an RLMI body that describes the rest of the parts to come. The other parts of the body
2037  * convey state of individual subscribed resources.
2038  *
2039  * \param pool PJLIB allocation pool
2040  * \param sub Subscription details from which to generate body
2041  * \param force_full_state If true, ignore resource list settings and send a full state notification
2042  * \return The generated multipart/related body
2043  */
2044 static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub,
2045                 unsigned int force_full_state)
2046 {
2047         int i;
2048         pjsip_multipart_part *rlmi_part;
2049         pjsip_msg_body *multipart;
2050         struct body_part_list body_parts;
2051         unsigned int use_full_state = force_full_state ? 1 : sub->full_state;
2052
2053         if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) {
2054                 return NULL;
2055         }
2056
2057         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2058                 build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state);
2059         }
2060
2061         /* This can happen if issuing partial state and no children of the list have changed state */
2062         if (AST_VECTOR_SIZE(&body_parts) == 0) {
2063                 return NULL;
2064         }
2065
2066         multipart = create_multipart_body(pool);
2067
2068         rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state);
2069         if (!rlmi_part) {
2070                 return NULL;
2071         }
2072         pjsip_multipart_add_part(pool, multipart, rlmi_part);
2073
2074         for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) {
2075                 pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part);
2076         }
2077
2078         free_body_parts(&body_parts);
2079         return multipart;
2080 }
2081
2082 /*!
2083  * \brief Create the body for a NOTIFY request.
2084  *
2085  * \param pool The pool used for allocations
2086  * \param root The root of the subscription tree
2087  * \param force_full_state If true, ignore resource list settings and send a full state notification
2088  */
2089 static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root,
2090                 unsigned int force_full_state)
2091 {
2092         pjsip_msg_body *body;
2093
2094         if (AST_VECTOR_SIZE(&root->children) == 0) {
2095                 if (force_full_state || root->body_changed) {
2096                         /* Not a list. We've already generated the body and saved it on the subscription.
2097                          * Use that directly.
2098                          */
2099                         pj_str_t type;
2100                         pj_str_t subtype;
2101                         pj_str_t text;
2102
2103                         pj_cstr(&type, ast_sip_subscription_get_body_type(root));
2104                         pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root));
2105                         pj_cstr(&text, ast_str_buffer(root->body_text));
2106
2107                         body = pjsip_msg_body_create(pool, &type, &subtype, &text);
2108                         root->body_changed = 0;
2109                 } else {
2110                         body = NULL;
2111                 }
2112         } else {
2113                 body = generate_list_body(pool, root, force_full_state);
2114         }
2115
2116         return body;
2117 }
2118
2119 /*!
2120  * \brief Shortcut method to create a Require: eventlist header
2121  */
2122 static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool)
2123 {
2124         pjsip_require_hdr *require;
2125
2126         require = pjsip_require_hdr_create(pool);
2127         pj_strdup2(pool, &require->values[0], "eventlist");
2128         require->count = 1;
2129
2130         return require;
2131 }
2132
2133 /*!
2134  * \brief Send a NOTIFY request to a subscriber
2135  *
2136  * \pre sub_tree->dlg is locked
2137  *
2138  * \param sub_tree The subscription tree representing the subscription
2139  * \param force_full_state If true, ignore resource list settings and send full resource list state.
2140  * \retval 0 Success
2141  * \retval non-zero Failure
2142  */
2143 static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state)
2144 {
2145         pjsip_evsub *evsub = sub_tree->evsub;
2146         pjsip_tx_data *tdata;
2147
2148         if (ast_shutdown_final()
2149                 && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED
2150                 && sub_tree->persistence) {
2151                 return 0;
2152         }
2153
2154         if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state,
2155                                 NULL, NULL, &tdata) != PJ_SUCCESS) {
2156                 return -1;
2157         }
2158
2159         tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state);
2160         if (!tdata->msg->body) {
2161                 pjsip_tx_data_dec_ref(tdata);
2162                 return -1;
2163         }
2164
2165         if (sub_tree->is_list) {
2166                 pjsip_require_hdr *require = create_require_eventlist(tdata->pool);
2167                 pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require);
2168         }
2169
2170         if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) {
2171                 sub_tree->last_notify = 1;
2172         }
2173         if (sip_subscription_send_request(sub_tree, tdata)) {
2174                 return -1;
2175         }
2176
2177         sub_tree->send_scheduled_notify = 0;
2178
2179         return 0;
2180 }
2181
2182 static int serialized_send_notify(void *userdata)
2183 {
2184         struct sip_subscription_tree *sub_tree = userdata;
2185         pjsip_dialog *dlg = sub_tree->dlg;
2186
2187         pjsip_dlg_inc_lock(dlg);
2188         /* It's possible that between when the notification was scheduled
2189          * and now, that a new SUBSCRIBE arrived, requiring full state to be
2190          * sent out in an immediate NOTIFY. If that has happened, we need to
2191          * bail out here instead of sending the batched NOTIFY.
2192          */
2193         if (!sub_tree->send_scheduled_notify) {
2194                 pjsip_dlg_dec_lock(dlg);
2195                 ao2_cleanup(sub_tree);
2196                 return 0;
2197         }
2198
2199         send_notify(sub_tree, 0);
2200         ast_test_suite_event_notify("SUBSCRIPTION_STATE_CHANGED",
2201                         "Resource: %s",
2202                         sub_tree->root->resource);
2203         sub_tree->notify_sched_id = -1;
2204         pjsip_dlg_dec_lock(dlg);
2205         ao2_cleanup(sub_tree);
2206         return 0;
2207 }
2208
2209 static int sched_cb(const void *data)
2210 {
2211         struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data;
2212
2213         /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */
2214         ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree);
2215         return 0;
2216 }
2217
2218 static int schedule_notification(struct sip_subscription_tree *sub_tree)
2219 {
2220         /* There's already a notification scheduled */
2221         if (sub_tree->notify_sched_id > -1) {
2222                 return 0;
2223         }
2224
2225         sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree));
2226         if (sub_tree->notify_sched_id < 0) {
2227                 return -1;
2228         }
2229
2230         sub_tree->send_scheduled_notify = 1;
2231         return 0;
2232 }
2233
2234 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data,
2235                 int terminate)
2236 {
2237         int res;
2238         pjsip_dialog *dlg = sub->tree->dlg;
2239
2240         pjsip_dlg_inc_lock(dlg);
2241
2242         if (!sub->tree->evsub) {
2243                 pjsip_dlg_dec_lock(dlg);
2244                 return 0;
2245         }
2246
2247         if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2248                                 ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) {
2249                 pjsip_dlg_dec_lock(dlg);
2250                 return -1;
2251         }
2252
2253         sub->body_changed = 1;
2254         if (terminate) {
2255                 sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
2256         }
2257
2258         if (sub->tree->notification_batch_interval) {
2259                 res = schedule_notification(sub->tree);
2260         } else {
2261                 /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */
2262                 ao2_ref(sub->tree, +1);
2263                 res = send_notify(sub->tree, 0);
2264                 ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED",
2265                                 "Resource: %s",
2266                                 sub->tree->root->resource);
2267                 ao2_ref(sub->tree, -1);
2268         }
2269
2270         pjsip_dlg_dec_lock(dlg);
2271         return res;
2272 }
2273
2274 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2275 {
2276         pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size);
2277 }
2278
2279 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
2280 {
2281         pjsip_dialog *dlg;
2282
2283         dlg = sub->tree->dlg;
2284         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
2285 }
2286
2287 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
2288 {
2289         return sub->resource;
2290 }
2291
2292 int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub)
2293 {
2294         return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0;
2295 }
2296
2297 static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response)
2298 {
2299         pjsip_hdr res_hdr;
2300
2301         /* If this is a persistence recreation the subscription has already been accepted */
2302         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
2303                 return 0;
2304         }
2305
2306         pj_list_init(&res_hdr);
2307         if (sub_tree->is_list) {
2308                 /* If subscribing to a list, our response has to have a Require: eventlist header in it */
2309                 pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool));
2310         }
2311
2312         return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1;
2313 }
2314
2315 static void subscription_datastore_destroy(void *obj)
2316 {
2317         struct ast_datastore *datastore = obj;
2318
2319         /* Using the destroy function (if present) destroy the data */
2320         if (datastore->info->destroy != NULL && datastore->data != NULL) {
2321                 datastore->info->destroy(datastore->data);
2322                 datastore->data = NULL;
2323         }
2324
2325         ast_free((void *) datastore->uid);
2326         datastore->uid = NULL;
2327 }
2328
2329 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
2330 {
2331         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
2332         char uuid_buf[AST_UUID_STR_LEN];
2333         const char *uid_ptr = uid;
2334
2335         if (!info) {
2336                 return NULL;
2337         }
2338
2339         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
2340         if (!datastore) {
2341                 return NULL;
2342         }
2343
2344         datastore->info = info;
2345         if (ast_strlen_zero(uid)) {
2346                 /* They didn't provide an ID so we'll provide one ourself */
2347                 uid_ptr = ast_uuid_generate_str(uuid_buf, sizeof(uuid_buf));
2348         }
2349
2350         datastore->uid = ast_strdup(uid_ptr);
2351         if (!datastore->uid) {
2352                 return NULL;
2353         }
2354
2355         ao2_ref(datastore, +1);
2356         return datastore;
2357 }
2358
2359 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
2360 {
2361         ast_assert(datastore != NULL);
2362         ast_assert(datastore->info != NULL);
2363         ast_assert(!ast_strlen_zero(datastore->uid));
2364
2365         if (!ao2_link(subscription->datastores, datastore)) {
2366                 return -1;
2367         }
2368         return 0;
2369 }
2370
2371 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
2372 {
2373         return ao2_find(subscription->datastores, name, OBJ_KEY);
2374 }
2375
2376 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
2377 {
2378         ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
2379 }
2380
2381 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
2382 {
2383         ast_assert(datastore != NULL);
2384         ast_assert(datastore->info != NULL);
2385         ast_assert(!ast_strlen_zero(datastore->uid));
2386
2387         if (!ao2_link(publication->datastores, datastore)) {
2388                 return -1;
2389         }
2390         return 0;
2391 }
2392
2393 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
2394 {
2395         return ao2_find(publication->datastores, name, OBJ_KEY);
2396 }
2397
2398 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
2399 {
2400         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
2401 }
2402
2403 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
2404
2405 static int publication_hash_fn(const void *obj, const int flags)
2406 {
2407         const struct ast_sip_publication *publication = obj;
2408         const int *entity_tag = obj;
2409
2410         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
2411 }
2412
2413 static int publication_cmp_fn(void *obj, void *arg, int flags)
2414 {
2415         const struct ast_sip_publication *publication1 = obj;
2416         const struct ast_sip_publication *publication2 = arg;
2417         const int *entity_tag = arg;
2418
2419         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
2420                 CMP_MATCH | CMP_STOP : 0);
2421 }
2422
2423 static void publish_add_handler(struct ast_sip_publish_handler *handler)
2424 {
2425         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2426         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
2427 }
2428
2429 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
2430 {
2431         if (ast_strlen_zero(handler->event_name)) {
2432                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
2433                 return -1;
2434         }
2435
2436         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
2437                 publication_hash_fn, publication_cmp_fn))) {
2438                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
2439                         handler->event_name);
2440                 return -1;
2441         }
2442
2443         publish_add_handler(handler);
2444
2445         ast_module_ref(ast_module_info->self);
2446
2447         return 0;
2448 }
2449
2450 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
2451 {
2452         struct ast_sip_publish_handler *iter;
2453         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2454         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
2455                 if (handler == iter) {
2456                         AST_RWLIST_REMOVE_CURRENT(next);
2457                         ao2_cleanup(handler->publications);
2458                         ast_module_unref(ast_module_info->self);
2459                         break;
2460                 }
2461         }
2462         AST_RWLIST_TRAVERSE_SAFE_END;
2463 }
2464
2465 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
2466
2467 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
2468 {
2469         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2470         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
2471         ast_module_ref(ast_module_info->self);
2472 }
2473
2474 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
2475 {
2476         struct ast_sip_subscription_handler *iter;
2477         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2478
2479         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
2480                 if (!strcmp(iter->event_name, event_name)) {
2481                         break;
2482                 }
2483         }
2484         return iter;
2485 }
2486
2487 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
2488 {
2489         pj_str_t event;
2490         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
2491         struct ast_sip_subscription_handler *existing;
2492         int i = 0;
2493
2494         if (ast_strlen_zero(handler->event_name)) {
2495                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
2496                 return -1;
2497         }
2498
2499         existing = find_sub_handler_for_event_name(handler->event_name);
2500         if (existing) {
2501                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
2502                                 "A handler is already registered\n", handler->event_name);
2503                 return -1;
2504         }
2505
2506         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
2507                 pj_cstr(&accept[i], handler->accept[i]);
2508         }
2509
2510         pj_cstr(&event, handler->event_name);
2511
2512         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
2513
2514         sub_add_handler(handler);
2515
2516         return 0;
2517 }
2518
2519 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
2520 {
2521         struct ast_sip_subscription_handler *iter;
2522         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
2523         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
2524                 if (handler == iter) {
2525                         AST_RWLIST_REMOVE_CURRENT(next);
2526                         ast_module_unref(ast_module_info->self);
2527                         break;
2528                 }
2529         }
2530         AST_RWLIST_TRAVERSE_SAFE_END;
2531 }
2532
2533 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
2534                 const char *content_subtype)
2535 {
2536         struct ast_sip_pubsub_body_generator *iter;
2537         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2538
2539         AST_LIST_TRAVERSE(&body_generators, iter, list) {
2540                 if (!strcmp(iter->type, content_type) &&
2541                                 !strcmp(iter->subtype, content_subtype)) {
2542                         break;
2543                 }
2544         };
2545
2546         return iter;
2547 }
2548
2549 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
2550 {
2551         char *accept_copy = ast_strdupa(accept);
2552         char *subtype = accept_copy;
2553         char *type = strsep(&subtype, "/");
2554
2555         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
2556                 return NULL;
2557         }
2558
2559         return find_body_generator_type_subtype(type, subtype);
2560 }
2561
2562 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
2563                 size_t num_accept, const char *body_type)
2564 {
2565         int i;
2566         struct ast_sip_pubsub_body_generator *generator = NULL;
2567
2568         for (i = 0; i < num_accept; ++i) {
2569                 generator = find_body_generator_accept(accept[i]);
2570                 if (generator) {
2571                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
2572                         if (strcmp(generator->body_type, body_type)) {
2573                                 ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n",
2574                                                 generator->type, generator->subtype, generator);
2575                                 generator = NULL;
2576                                 continue;
2577                         }
2578                         break;
2579                 } else {
2580                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
2581                 }
2582         }
2583
2584         return generator;
2585 }
2586
2587 static int generate_initial_notify(struct ast_sip_subscription *sub)
2588 {
2589         void *notify_data;
2590         int res;
2591         struct ast_sip_body_data data = {
2592                 .body_type = sub->handler->body_type,
2593         };
2594
2595         if (AST_VECTOR_SIZE(&sub->children) > 0) {
2596                 int i;
2597
2598                 for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
2599                         if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) {
2600                                 return -1;
2601                         }
2602                 }
2603
2604                 return 0;
2605         }
2606
2607         /* We notify subscription establishment only on the tree leaves. */
2608         if (sub->handler->notifier->subscription_established(sub)) {
2609                 return -1;
2610         }
2611
2612         notify_data = sub->handler->notifier->get_notify_data(sub);
2613         if (!notify_data) {
2614                 return -1;
2615         }
2616
2617         data.body_data = notify_data;
2618
2619         res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub),
2620                         ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text);
2621
2622         ao2_cleanup(notify_data);
2623
2624         return res;
2625 }
2626
2627 static int initial_notify_task(void * obj)
2628 {
2629         struct sip_subscription_tree *sub_tree;
2630
2631         sub_tree = obj;
2632         if (generate_initial_notify(sub_tree->root)) {
2633                 pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2634         } else {
2635                 send_notify(sub_tree, 1);
2636                 ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED",
2637                         "Resource: %s",
2638                         sub_tree->root->resource);
2639         }
2640
2641         ao2_ref(sub_tree, -1);
2642         return 0;
2643 }
2644
2645 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
2646 {
2647         pjsip_expires_hdr *expires_header;
2648         struct ast_sip_subscription_handler *handler;
2649         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2650         struct sip_subscription_tree *sub_tree;
2651         struct ast_sip_pubsub_body_generator *generator;
2652         char *resource;
2653         pjsip_uri *request_uri;
2654         pjsip_sip_uri *request_uri_sip;
2655         size_t resource_size;
2656         int resp;
2657         struct resource_tree tree;
2658         pj_status_t dlg_status;
2659
2660         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2661         ast_assert(endpoint != NULL);
2662
2663         if (!endpoint->subscription.allow) {
2664                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
2665                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
2666                 return PJ_TRUE;
2667         }
2668
2669         request_uri = rdata->msg_info.msg->line.req.uri;
2670
2671         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2672                 char uri_str[PJSIP_MAX_URL_SIZE];
2673
2674                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2675                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2676                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2677                 return PJ_TRUE;
2678         }
2679
2680         request_uri_sip = pjsip_uri_get_uri(request_uri);
2681         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2682         resource = ast_alloca(resource_size);
2683         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
2684
2685         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
2686
2687         if (expires_header) {
2688                 if (expires_header->ivalue == 0) {
2689                         ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
2690                                 ast_sorcery_object_get_id(endpoint));
2691                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
2692                                 return PJ_TRUE;
2693                 }
2694                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
2695                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
2696                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
2697                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
2698                         return PJ_TRUE;
2699                 }
2700         }
2701
2702         handler = subscription_get_handler_from_rdata(rdata);
2703         if (!handler) {
2704                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2705                 return PJ_TRUE;
2706         }
2707
2708         generator = subscription_get_generator_from_rdata(rdata, handler);
2709         if (!generator) {
2710                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2711                 return PJ_TRUE;
2712         }
2713
2714         memset(&tree, 0, sizeof(tree));
2715         resp = build_resource_tree(endpoint, handler, resource, &tree,
2716                 ast_sip_pubsub_has_eventlist_support(rdata));
2717         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2718                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2719                 resource_tree_destroy(&tree);
2720                 return PJ_TRUE;
2721         }
2722
2723         sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status);
2724         if (!sub_tree) {
2725                 if (dlg_status != PJ_EEXISTS) {
2726                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2727                 }
2728         } else {
2729                 sub_tree->persistence = subscription_persistence_create(sub_tree);
2730                 subscription_persistence_update(sub_tree, rdata);
2731                 sip_subscription_accept(sub_tree, rdata, resp);
2732                 if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ao2_bump(sub_tree))) {
2733                         pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE);
2734                         ao2_ref(sub_tree, -1);
2735                 }
2736         }
2737
2738         resource_tree_destroy(&tree);
2739         return PJ_TRUE;
2740 }
2741
2742 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
2743 {
2744         struct ast_sip_publish_handler *iter = NULL;
2745         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
2746
2747         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
2748                 if (strcmp(event, iter->event_name)) {
2749                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
2750                         continue;
2751                 }
2752                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
2753                 break;
2754         }
2755
2756         return iter;
2757 }
2758
2759 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
2760         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
2761 {
2762         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2763
2764         if (etag_hdr) {
2765                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
2766
2767                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
2768
2769                 if (sscanf(etag, "%30d", entity_id) != 1) {
2770                         return SIP_PUBLISH_UNKNOWN;
2771                 }
2772         }
2773
2774         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2775
2776         if (!(*expires)) {
2777                 return SIP_PUBLISH_REMOVE;
2778         } else if (!etag_hdr && rdata->msg_info.msg->body) {
2779                 return SIP_PUBLISH_INITIAL;
2780         } else if (etag_hdr && !rdata->msg_info.msg->body) {
2781                 return SIP_PUBLISH_REFRESH;
2782         } else if (etag_hdr && rdata->msg_info.msg->body) {
2783                 return SIP_PUBLISH_MODIFY;
2784         }
2785
2786         return SIP_PUBLISH_UNKNOWN;
2787 }
2788
2789 /*! \brief Internal destructor for publications */
2790 static void publication_destroy_fn(void *obj)
2791 {
2792         struct ast_sip_publication *publication = obj;
2793
2794         ast_debug(3, "Destroying SIP publication\n");
2795
2796         ao2_cleanup(publication->datastores);
2797         ao2_cleanup(publication->endpoint);
2798 }
2799
2800 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2801         const char *resource, const char *event_configuration_name)
2802 {
2803         struct ast_sip_publication *publication;
2804         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
2805         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
2806         char *dst;
2807
2808         ast_assert(endpoint != NULL);
2809
2810         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
2811                 return NULL;
2812         }
2813
2814         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
2815                 ao2_ref(publication, -1);
2816                 return NULL;
2817         }
2818
2819         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
2820         ao2_ref(endpoint, +1);
2821         publication->endpoint = endpoint;
2822         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
2823         publication->sched_id = -1;
2824         dst = publication->data;
2825         publication->resource = strcpy(dst, resource);
2826         dst += resource_len;
2827         publication->event_configuration_name = strcpy(dst, event_configuration_name);
2828
2829         return publication;
2830 }
2831
2832 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
2833                 pjsip_rx_data *rdata)
2834 {
2835         pj_status_t status;
2836         pjsip_tx_data *tdata;
2837         pjsip_transaction *tsx;
2838
2839         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
2840                 return -1;
2841         }
2842
2843         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
2844                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
2845                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
2846
2847                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
2848                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
2849                         pjsip_tx_data_dec_ref(tdata);
2850                         return -1;
2851                 }
2852
2853                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
2854                 ast_sip_add_header(tdata, "Expires", expires);
2855         }
2856
2857         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
2858                 return -1;
2859         }
2860
2861         pjsip_tsx_recv_msg(tsx, rdata);
2862
2863         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
2864                 return -1;
2865         }
2866
2867         return 0;
2868 }
2869
2870 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
2871         struct ast_sip_publish_handler *handler)
2872 {
2873         struct ast_sip_publication *publication;
2874         char *resource_name;
2875         size_t resource_size;
2876         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
2877         struct ast_variable *event_configuration_name = NULL;
2878         pjsip_uri *request_uri;
2879         pjsip_sip_uri *request_uri_sip;
2880         int resp;
2881
2882         request_uri = rdata->msg_info.msg->line.req.uri;
2883
2884         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
2885                 char uri_str[PJSIP_MAX_URL_SIZE];
2886
2887                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
2888                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
2889                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
2890                 return NULL;
2891         }
2892
2893         request_uri_sip = pjsip_uri_get_uri(request_uri);
2894         resource_size = pj_strlen(&request_uri_sip->user) + 1;
2895         resource_name = ast_alloca(resource_size);
2896         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
2897
2898         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
2899         if (!resource) {
2900                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2901                 return NULL;
2902         }
2903
2904         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
2905                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
2906                 return NULL;
2907         }
2908
2909         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
2910                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
2911                         break;
2912                 }
2913         }
2914
2915         if (!event_configuration_name) {
2916                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
2917                 return NULL;
2918         }
2919
2920         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
2921
2922         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
2923                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
2924                 return NULL;
2925         }
2926
2927         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
2928
2929         if (!publication) {
2930                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
2931                 return NULL;
2932         }
2933
2934         publication->handler = handler;
2935         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
2936                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
2937                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
2938                 ao2_cleanup(publication);
2939                 return NULL;
2940         }
2941
2942         sip_publication_respond(publication, resp, rdata);
2943
2944         return publication;
2945 }
2946
2947 static int publish_expire_callback(void *data)
2948 {
2949         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
2950
2951         if (publication->handler->publish_expire) {
2952                 publication->handler->publish_expire(publication);
2953         }
2954
2955         return 0;
2956 }
2957
2958 static int publish_expire(const void *data)
2959 {
2960         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
2961
2962         ao2_unlink(publication->handler->publications, publication);
2963         publication->sched_id = -1;
2964
2965         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
2966                 ao2_cleanup(publication);
2967         }
2968
2969         return 0;
2970 }
2971
2972 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
2973 {
2974         pjsip_event_hdr *event_header;
2975         struct ast_sip_publish_handler *handler;
2976         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
2977         char event[32];
2978         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
2979         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
2980         enum sip_publish_type publish_type;
2981         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
2982         int expires = 0, entity_id, response = 0;
2983
2984         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
2985         ast_assert(endpoint != NULL);
2986
2987         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
2988         if (!event_header) {
2989                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
2990                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2991                 return PJ_TRUE;
2992         }
2993         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
2994
2995         handler = find_pub_handler(event);
2996         if (!handler) {
2997                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
2998                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
2999                 return PJ_TRUE;
3000         }
3001
3002         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
3003
3004         /* If this is not an initial publish ensure that a publication is present */
3005         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
3006                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
3007                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
3008
3009                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
3010                                 NULL, NULL);
3011                         return PJ_TRUE;
3012                 }
3013
3014                 /* Per the RFC every response has to have a new entity tag */
3015                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
3016
3017                 /* Update the expires here so that the created responses will contain the correct value */
3018                 publication->expires = expires;
3019         }
3020
3021         switch (publish_type) {
3022                 case SIP_PUBLISH_INITIAL:
3023                         publication = publish_request_initial(endpoint, rdata, handler);
3024                         break;
3025                 case SIP_PUBLISH_REFRESH:
3026                 case SIP_PUBLISH_MODIFY:
3027                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
3028                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
3029                                 /* If an error occurs we want to terminate the publication */
3030                                 expires = 0;
3031                         }
3032                         response = 200;
3033                         break;
3034                 case SIP_PUBLISH_REMOVE:
3035                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
3036                                         AST_SIP_PUBLISH_STATE_TERMINATED);
3037                         response = 200;
3038                         break;
3039                 case SIP_PUBLISH_UNKNOWN:
3040                 default:
3041                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
3042                         break;
3043         }
3044
3045         if (publication) {
3046                 if (expires) {
3047                         ao2_link(handler->publications, publication);
3048
3049                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
3050                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
3051                 } else {
3052                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
3053                 }
3054         }
3055
3056         if (response) {
3057                 sip_publication_respond(publication, response, rdata);
3058         }
3059
3060         return PJ_TRUE;
3061 }
3062
3063 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
3064 {
3065         return pub->endpoint;
3066 }
3067
3068 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
3069 {
3070         return pub->resource;
3071 }
3072
3073 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
3074 {
3075         return pub->event_configuration_name;
3076 }
3077
3078 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
3079 {
3080         struct ast_sip_pubsub_body_generator *existing;
3081         pj_str_t accept;
3082         pj_size_t accept_len;
3083
3084         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
3085         if (existing) {
3086                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
3087                                 "One is already registered.\n", generator->type, generator->subtype);
3088                 return -1;
3089         }
3090
3091         AST_RWLIST_WRLOCK(&body_generators);
3092         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
3093         AST_RWLIST_UNLOCK(&body_generators);
3094
3095         /* Lengths of type and subtype plus a slash. */
3096         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
3097
3098         /* Add room for null terminator that sprintf() will set. */
3099         pj_strset(&accept, ast_alloca(accept_len + 1), accept_len);
3100         sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */
3101
3102         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
3103                         PJSIP_H_ACCEPT, NULL, 1, &accept);
3104
3105         return 0;
3106 }
3107
3108 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
3109 {
3110         struct ast_sip_pubsub_body_generator *iter;
3111         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3112
3113         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
3114                 if (iter == generator) {
3115                         AST_LIST_REMOVE_CURRENT(list);
3116                         break;
3117                 }
3118         }
3119         AST_RWLIST_TRAVERSE_SAFE_END;
3120 }
3121
3122 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3123 {
3124         AST_RWLIST_WRLOCK(&body_supplements);
3125         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
3126         AST_RWLIST_UNLOCK(&body_supplements);
3127
3128         return 0;
3129 }
3130
3131 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
3132 {
3133         struct ast_sip_pubsub_body_supplement *iter;
3134         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
3135
3136         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
3137                 if (iter == supplement) {
3138                         AST_LIST_REMOVE_CURRENT(list);
3139                         break;
3140                 }
3141         }
3142         AST_RWLIST_TRAVERSE_SAFE_END;
3143 }
3144
3145 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
3146 {
3147         return sub->body_generator->type;
3148 }
3149
3150 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
3151 {
3152         return sub->body_generator->subtype;
3153 }
3154
3155 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
3156                 struct ast_sip_body_data *data, struct ast_str **str)
3157 {
3158         struct ast_sip_pubsub_body_supplement *supplement;
3159         struct ast_sip_pubsub_body_generator *generator;
3160         int res = 0;
3161         void *body;
3162
3163         generator = find_body_generator_type_subtype(type, subtype);
3164         if (!generator) {
3165                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
3166                                 type, subtype);
3167                 return -1;
3168         }
3169
3170         if (strcmp(data->body_type, generator->body_type)) {
3171                 ast_log(LOG_WARNING, "Body generator does not accept the type of data provided\n");
3172                 return -1;
3173         }
3174
3175         body = generator->allocate_body(data->body_data);
3176         if (!body) {
3177                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
3178                                 type, subtype);
3179                 return -1;
3180         }
3181
3182         if (generator->generate_body_content(body, data->body_data)) {
3183                 res = -1;
3184                 goto end;
3185         }
3186
3187         AST_RWLIST_RDLOCK(&body_supplements);
3188         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
3189                 if (!strcmp(generator->type, supplement->type) &&
3190                                 !strcmp(generator->subtype, supplement->subtype)) {
3191                         res = supplement->supplement_body(body, data->body_data);
3192                         if (res) {
3193                                 break;
3194                         }
3195                 }
3196         }
3197         AST_RWLIST_UNLOCK(&body_supplements);
3198
3199         if (!res) {
3200                 generator->to_string(body, str);
3201         }
3202
3203 end:
3204         if (generator->destroy_body) {
3205                 generator->destroy_body(body);
3206         }
3207
3208         return res;
3209 }
3210
3211 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
3212 {
3213         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
3214                 return pubsub_on_rx_subscribe_request(rdata);
3215         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
3216                 return pubsub_on_rx_publish_request(rdata);
3217         }
3218
3219         return PJ_FALSE;
3220 }
3221
3222 static void set_state_terminated(struct ast_sip_subscription *sub)
3223 {
3224         int i;
3225
3226         sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED;
3227         for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) {
3228                 set_state_terminated(AST_VECTOR_GET(&sub->children, i));
3229         }
3230 }
3231
3232 /* XXX This function and serialized_pubsub_on_rx_refresh are nearly identical */
3233 static int serialized_pubsub_on_server_timeout(void *userdata)
3234 {
3235         struct sip_subscription_tree *sub_tree = userdata;
3236         pjsip_dialog *dlg = sub_tree->dlg;
3237
3238         pjsip_dlg_inc_lock(dlg);
3239         if (!sub_tree->evsub) {
3240                 pjsip_dlg_dec_lock(dlg);
3241                 return 0;
3242         }
3243         set_state_terminated(sub_tree->root);
3244         send_notify(sub_tree, 1);
3245         ast_test_suite_event_notify("SUBSCRIPTION_TERMINATED",
3246                         "Resource: %s",
3247                         sub_tree->root->resource);
3248
3249         pjsip_dlg_dec_lock(dlg);
3250         ao2_cleanup(sub_tree);
3251         return 0;
3252 }
3253
3254 /*!
3255  * \brief PJSIP callback when underlying SIP subs