6a8ec12db6c96f4dfc5cc8eb9bd5441e75f62d62
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47
48 /*** DOCUMENTATION
49         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
50                 <synopsis>
51                         Lists subscriptions.
52                 </synopsis>
53                 <syntax />
54                 <description>
55                         <para>
56                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
57                         is issued for each subscription object.  Once all detail events are completed an
58                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
59                         </para>
60                 </description>
61         </manager>
62         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
63                 <synopsis>
64                         Lists subscriptions.
65                 </synopsis>
66                 <syntax />
67                 <description>
68                         <para>
69                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
70                         is issued for each subscription object.  Once all detail events are completed an
71                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
72                         </para>
73                 </description>
74         </manager>
75         <configInfo name="res_pjsip_pubsub" language="en_US">
76                 <synopsis>Module that implements publish and subscribe support.</synopsis>
77                 <configFile name="pjsip.conf">
78                         <configObject name="subscription_persistence">
79                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
80                                 <configOption name="packet">
81                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
82                                 </configOption>
83                                 <configOption name="src_name">
84                                         <synopsis>The source address of the subscription</synopsis>
85                                 </configOption>
86                                 <configOption name="src_port">
87                                         <synopsis>The source port of the subscription</synopsis>
88                                 </configOption>
89                                 <configOption name="transport_key">
90                                         <synopsis>The type of transport the subscription was received on</synopsis>
91                                 </configOption>
92                                 <configOption name="local_name">
93                                         <synopsis>The local address the subscription was received on</synopsis>
94                                 </configOption>
95                                 <configOption name="local_port">
96                                         <synopsis>The local port the subscription was received on</synopsis>
97                                 </configOption>
98                                 <configOption name="cseq">
99                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
100                                 </configOption>
101                                 <configOption name="tag">
102                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="endpoint">
105                                         <synopsis>The name of the endpoint that subscribed</synopsis>
106                                 </configOption>
107                                 <configOption name="expires">
108                                         <synopsis>The time at which the subscription expires</synopsis>
109                                 </configOption>
110                         </configObject>
111                         <configObject name="inbound-publication">
112                                 <synopsis>The configuration for inbound publications</synopsis>
113                                 <configOption name="endpoint" default="">
114                                         <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
115                                 </configOption>
116                                 <configOption name="type">
117                                         <synopsis>Must be of type 'inbound-publication'.</synopsis>
118                                 </configOption>
119                         </configObject>
120                 </configFile>
121         </configInfo>
122  ***/
123
124 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
125
126 static struct pjsip_module pubsub_module = {
127         .name = { "PubSub Module", 13 },
128         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
129         .on_rx_request = pubsub_on_rx_request,
130 };
131
132 #define MOD_DATA_PERSISTENCE "sub_persistence"
133 #define MOD_DATA_MSG "sub_msg"
134
135 static const pj_str_t str_event_name = { "Event", 5 };
136
137 /*! \brief Scheduler used for automatically expiring publications */
138 static struct ast_sched_context *sched;
139
140 /*! \brief Number of buckets for publications (on a per handler) */
141 #define PUBLICATIONS_BUCKETS 37
142
143 /*! \brief Default expiration time for PUBLISH if one is not specified */
144 #define DEFAULT_PUBLISH_EXPIRES 3600
145
146 /*! \brief Defined method for PUBLISH */
147 const pjsip_method pjsip_publish_method =
148 {
149         PJSIP_OTHER_METHOD,
150         { "PUBLISH", 7 }
151 };
152
153 /*!
154  * \brief The types of PUBLISH messages defined in RFC 3903
155  */
156 enum sip_publish_type {
157         /*!
158          * \brief Unknown
159          *
160          * \details
161          * This actually is not defined in RFC 3903. We use this as a constant
162          * to indicate that an incoming PUBLISH does not fit into any of the
163          * other categories and is thus invalid.
164          */
165         SIP_PUBLISH_UNKNOWN,
166
167         /*!
168          * \brief Initial
169          *
170          * \details
171          * The first PUBLISH sent. This will contain a non-zero Expires header
172          * as well as a body that indicates the current state of the endpoint
173          * that has sent the message. The initial PUBLISH is the only type
174          * of PUBLISH to not contain a Sip-If-Match header in it.
175          */
176         SIP_PUBLISH_INITIAL,
177
178         /*!
179          * \brief Refresh
180          *
181          * \details
182          * Used to keep a published state from expiring. This will contain a
183          * non-zero Expires header but no body since its purpose is not to
184          * update state.
185          */
186         SIP_PUBLISH_REFRESH,
187
188         /*!
189          * \brief Modify
190          *
191          * \details
192          * Used to change state from its previous value. This will contain
193          * a body updating the published state. May or may not contain an
194          * Expires header.
195          */
196         SIP_PUBLISH_MODIFY,
197
198         /*!
199          * \brief Remove
200          *
201          * \details
202          * Used to remove published state from an ESC. This will contain
203          * an Expires header set to 0 and likely no body.
204          */
205         SIP_PUBLISH_REMOVE,
206 };
207
208 /*!
209  * Used to create new entity IDs by ESCs.
210  */
211 static int esc_etag_counter;
212
213 /*!
214  * \brief Structure representing a SIP publication
215  */
216 struct ast_sip_publication {
217         /*! Publication datastores set up by handlers */
218         struct ao2_container *datastores;
219         /*! \brief Entity tag for the publication */
220         int entity_tag;
221         /*! \brief Handler for this publication */
222         struct ast_sip_publish_handler *handler;
223         /*! \brief The endpoint with which the subscription is communicating */
224         struct ast_sip_endpoint *endpoint;
225         /*! \brief Expiration time of the publication */
226         int expires;
227         /*! \brief Scheduled item for expiration of publication */
228         int sched_id;
229         /*! \brief The resource the publication is to */
230         char *resource;
231         /*! \brief The name of the event type configuration */
232         char *event_configuration_name;
233         /*! \brief Data containing the above */
234         char data[0];
235 };
236
237
238 /*!
239  * \brief Structure used for persisting an inbound subscription
240  */
241 struct subscription_persistence {
242         /*! Sorcery object details */
243         SORCERY_OBJECT(details);
244         /*! The name of the endpoint involved in the subscrption */
245         char *endpoint;
246         /*! SIP message that creates the subscription */
247         char packet[PJSIP_MAX_PKT_LEN];
248         /*! Source address of the message */
249         char src_name[PJ_INET6_ADDRSTRLEN];
250         /*! Source port of the message */
251         int src_port;
252         /*! Local transport key type */
253         char transport_key[32];
254         /*! Local transport address */
255         char local_name[PJ_INET6_ADDRSTRLEN];
256         /*! Local transport port */
257         int local_port;
258         /*! Next CSeq to use for message */
259         unsigned int cseq;
260         /*! Local tag of the dialog */
261         char *tag;
262         /*! When this subscription expires */
263         struct timeval expires;
264 };
265
266 /*!
267  * \brief Real subscription details
268  *
269  * A real subscription is one that has a direct link to a
270  * PJSIP subscription and dialog.
271  */
272 struct ast_sip_real_subscription {
273         /*! The underlying PJSIP event subscription structure */
274         pjsip_evsub *evsub;
275         /*! The underlying PJSIP dialog */
276         pjsip_dialog *dlg;
277 };
278
279 /*!
280  * \brief Virtual subscription details
281  *
282  * A virtual subscription is one that does not have a direct
283  * link to a PJSIP subscription. Instead, it is a descendent
284  * of an ast_sip_subscription. Following the ancestry will
285  * eventually lead to a real subscription.
286  */
287 struct ast_sip_virtual_subscription {
288         struct ast_sip_subscription *parent;
289 };
290
291 /*!
292  * \brief Discriminator between real and virtual subscriptions
293  */
294 enum sip_subscription_type {
295         /*!
296          * \brief a "real" subscription.
297          *
298          * Real subscriptions are at the root of a tree of subscriptions.
299          * A real subscription has a corresponding SIP subscription in the
300          * PJSIP stack.
301          */
302         SIP_SUBSCRIPTION_REAL,
303         /*!
304          * \brief a "virtual" subscription.
305          *
306          * Virtual subscriptions are the descendents of real subscriptions
307          * in a tree of subscriptions. Virtual subscriptions do not have
308          * a corresponding SIP subscription in the PJSIP stack. Instead,
309          * when a state change happens on a virtual subscription, the
310          * state change is indicated to the virtual subscription's parent.
311          */
312         SIP_SUBSCRIPTION_VIRTUAL,
313 };
314
315 /*!
316  * \brief Structure representing a SIP subscription
317  */
318 struct ast_sip_subscription {
319         /*! Subscription datastores set up by handlers */
320         struct ao2_container *datastores;
321         /*! The endpoint with which the subscription is communicating */
322         struct ast_sip_endpoint *endpoint;
323         /*! Serializer on which to place operations for this subscription */
324         struct ast_taskprocessor *serializer;
325         /*! The handler for this subscription */
326         const struct ast_sip_subscription_handler *handler;
327         /*! The role for this subscription */
328         enum ast_sip_subscription_role role;
329         /*! Indicator of real or virtual subscription */
330         enum sip_subscription_type type;
331         /*! Real and virtual components of the subscription */
332         union {
333                 struct ast_sip_real_subscription real;
334                 struct ast_sip_virtual_subscription virtual;
335         } reality;
336         /*! Body generaator for NOTIFYs */
337         struct ast_sip_pubsub_body_generator *body_generator;
338         /*! Persistence information */
339         struct subscription_persistence *persistence;
340         /*! Next item in the list */
341         AST_LIST_ENTRY(ast_sip_subscription) next;
342         /*! List of child subscriptions */
343         AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children;
344         /*! Name of resource being subscribed to */
345         char resource[0];
346 };
347
348 /*!
349  * \brief Structure representing a publication resource
350  */
351 struct ast_sip_publication_resource {
352         /*! \brief Sorcery object details */
353         SORCERY_OBJECT(details);
354         /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
355         char *endpoint;
356         /*! \brief Mapping for event types to configuration */
357         struct ast_variable *events;
358 };
359
360 static const char *sip_subscription_roles_map[] = {
361         [AST_SIP_SUBSCRIBER] = "Subscriber",
362         [AST_SIP_NOTIFIER] = "Notifier"
363 };
364
365 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
366
367 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
368 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
369
370 static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
371 {
372         return sub->reality.real.evsub;
373 }
374
375 static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
376 {
377         return sub->reality.real.dlg;
378 }
379
380 /*! \brief Destructor for publication resource */
381 static void publication_resource_destroy(void *obj)
382 {
383         struct ast_sip_publication_resource *resource = obj;
384
385         ast_free(resource->endpoint);
386         ast_variables_destroy(resource->events);
387 }
388
389 /*! \brief Allocator for publication resource */
390 static void *publication_resource_alloc(const char *name)
391 {
392         return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
393 }
394
395 /*! \brief Destructor for subscription persistence */
396 static void subscription_persistence_destroy(void *obj)
397 {
398         struct subscription_persistence *persistence = obj;
399
400         ast_free(persistence->endpoint);
401         ast_free(persistence->tag);
402 }
403
404 /*! \brief Allocator for subscription persistence */
405 static void *subscription_persistence_alloc(const char *name)
406 {
407         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
408 }
409
410 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
411 static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
412 {
413         char tag[PJ_GUID_STRING_LENGTH + 1];
414
415         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
416          * look it up by id at all.
417          */
418         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
419                 "subscription_persistence", NULL);
420
421         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
422
423         if (!persistence) {
424                 return NULL;
425         }
426
427         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
428         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
429         persistence->tag = ast_strdup(tag);
430
431         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
432         return persistence;
433 }
434
435 /*! \brief Function which updates persistence information of a subscription in sorcery */
436 static void subscription_persistence_update(struct ast_sip_subscription *sub,
437         pjsip_rx_data *rdata)
438 {
439         pjsip_dialog *dlg;
440
441         if (!sub->persistence) {
442                 return;
443         }
444
445         dlg = sip_subscription_get_dlg(sub);
446         sub->persistence->cseq = dlg->local.cseq;
447
448         if (rdata) {
449                 int expires;
450                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
451
452                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
453                 sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
454
455                 ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
456                 ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
457                 sub->persistence->src_port = rdata->pkt_info.src_port;
458                 ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
459                         sizeof(sub->persistence->transport_key));
460                 ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
461                         sizeof(sub->persistence->local_name));
462                 sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
463         }
464
465         ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
466 }
467
468 /*! \brief Function which removes persistence of a subscription from sorcery */
469 static void subscription_persistence_remove(struct ast_sip_subscription *sub)
470 {
471         if (!sub->persistence) {
472                 return;
473         }
474
475         ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
476         ao2_ref(sub->persistence, -1);
477 }
478
479
480 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
481 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
482                 size_t num_accept);
483
484 /*! \brief Retrieve a handler using the Event header of an rdata message */
485 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
486 {
487         pjsip_event_hdr *event_header;
488         char event[32];
489         struct ast_sip_subscription_handler *handler;
490
491         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
492         if (!event_header) {
493                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
494                 return NULL;
495         }
496         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
497
498         handler = find_sub_handler_for_event_name(event);
499         if (!handler) {
500                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
501         }
502
503         return handler;
504 }
505
506 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
507 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
508         const struct ast_sip_subscription_handler *handler)
509 {
510         pjsip_accept_hdr *accept_header;
511         char accept[AST_SIP_MAX_ACCEPT][64];
512         size_t num_accept_headers;
513
514         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
515         if (accept_header) {
516                 int i;
517
518                 for (i = 0; i < accept_header->count; ++i) {
519                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
520                 }
521                 num_accept_headers = accept_header->count;
522         } else {
523                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
524                  * the default accept type for the event package is to be used.
525                  */
526                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
527                 num_accept_headers = 1;
528         }
529
530         return find_body_generator(accept, num_accept_headers);
531 }
532
533 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
534                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
535                 struct ast_sip_pubsub_body_generator *generator);
536
537 /*! \brief Callback function to perform the actual recreation of a subscription */
538 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
539 {
540         struct subscription_persistence *persistence = obj;
541         pj_pool_t *pool = arg;
542         pjsip_rx_data rdata = { { 0, }, };
543         pjsip_expires_hdr *expires_header;
544         struct ast_sip_subscription_handler *handler;
545         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
546         struct ast_sip_subscription *sub;
547         struct ast_sip_pubsub_body_generator *generator;
548         int resp;
549         char *resource;
550         size_t resource_size;
551         pjsip_sip_uri *request_uri;
552
553         /* If this subscription has already expired remove it */
554         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
555                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
556                 return 0;
557         }
558
559         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
560         if (!endpoint) {
561                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
562                         persistence->endpoint);
563                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
564                 return 0;
565         }
566
567         pj_pool_reset(pool);
568         rdata.tp_info.pool = pool;
569
570         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
571                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
572                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
573                         persistence->endpoint);
574                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
575                 return 0;
576         }
577
578         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
579         resource_size = pj_strlen(&request_uri->user) + 1;
580         resource = alloca(resource_size);
581         ast_copy_pj_str(resource, &request_uri->user, resource_size);
582
583         /* Update the expiration header with the new expiration */
584         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
585         if (!expires_header) {
586                 expires_header = pjsip_expires_hdr_create(pool, 0);
587                 if (!expires_header) {
588                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
589                         return 0;
590                 }
591                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
592         }
593         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
594
595         handler = subscription_get_handler_from_rdata(&rdata);
596         if (!handler || !handler->notifier) {
597                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
598                 return 0;
599         }
600
601         generator = subscription_get_generator_from_rdata(&rdata, handler);
602         if (!generator) {
603                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
604                 return 0;
605         }
606
607         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
608                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
609
610         resp = handler->notifier->new_subscribe(endpoint, resource);
611         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
612                 sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
613                 sub->persistence = ao2_bump(persistence);
614                 subscription_persistence_update(sub, &rdata);
615         } else {
616                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
617         }
618
619         return 0;
620 }
621
622 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
623 static int subscription_persistence_load(void *data)
624 {
625         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
626                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
627         pj_pool_t *pool;
628
629         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
630                 PJSIP_POOL_RDATA_INC);
631         if (!pool) {
632                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
633                 return 0;
634         }
635
636         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
637
638         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
639
640         ao2_ref(persisted_subscriptions, -1);
641         return 0;
642 }
643
644 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
645 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
646 {
647         struct ast_json_payload *payload;
648         const char *type;
649
650         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
651                 return;
652         }
653
654         payload = stasis_message_data(message);
655         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
656
657         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
658          * recreate SIP subscriptions.
659          */
660         if (strcmp(type, "FullyBooted")) {
661                 return;
662         }
663
664         /* This has to be here so the subscription is recreated when the body generator is available */
665         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
666
667         /* Once the system is fully booted we don't care anymore */
668         stasis_unsubscribe(sub);
669 }
670
671 static void add_subscription(struct ast_sip_subscription *obj)
672 {
673         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
674         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
675         ast_module_ref(ast_module_info->self);
676 }
677
678 static void remove_subscription(struct ast_sip_subscription *obj)
679 {
680         struct ast_sip_subscription *i;
681         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
682         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
683                 if (i == obj) {
684                         AST_RWLIST_REMOVE_CURRENT(next);
685                         ast_module_unref(ast_module_info->self);
686                         break;
687                 }
688         }
689         AST_RWLIST_TRAVERSE_SAFE_END;
690 }
691
692 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
693
694 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
695 {
696         int num = 0;
697         struct ast_sip_subscription *i;
698         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
699
700         if (!on_subscription) {
701                 return num;
702         }
703
704         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
705                 if (on_subscription(i, arg)) {
706                         break;
707                 }
708                 ++num;
709         }
710         return num;
711 }
712
713 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
714                                     struct ast_str **buf)
715 {
716         char str[256];
717         struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
718
719         ast_str_append(buf, 0, "Role: %s\r\n",
720                        sip_subscription_roles_map[sub->role]);
721         ast_str_append(buf, 0, "Endpoint: %s\r\n",
722                        ast_sorcery_object_get_id(sub->endpoint));
723
724         ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
725         ast_str_append(buf, 0, "Callid: %s\r\n", str);
726
727         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
728                                sip_subscription_get_evsub(sub)));
729
730         ast_callerid_merge(str, sizeof(str),
731                            S_COR(id->self.name.valid, id->self.name.str, NULL),
732                            S_COR(id->self.number.valid, id->self.number.str, NULL),
733                            "Unknown");
734
735         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
736
737         if (sub->handler->to_ami) {
738                 sub->handler->to_ami(sub, buf);
739         }
740 }
741
742 #define DATASTORE_BUCKETS 53
743
744 #define DEFAULT_EXPIRES 3600
745
746 static int datastore_hash(const void *obj, int flags)
747 {
748         const struct ast_datastore *datastore = obj;
749         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
750
751         ast_assert(uid != NULL);
752
753         return ast_str_hash(uid);
754 }
755
756 static int datastore_cmp(void *obj, void *arg, int flags)
757 {
758         const struct ast_datastore *datastore1 = obj;
759         const struct ast_datastore *datastore2 = arg;
760         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
761
762         ast_assert(datastore1->uid != NULL);
763         ast_assert(uid2 != NULL);
764
765         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
766 }
767
768 static int subscription_remove_serializer(void *obj)
769 {
770         struct ast_sip_subscription *sub = obj;
771
772         /* This is why we keep the dialog on the subscription. When the subscription
773          * is destroyed, there is no guarantee that the underlying dialog is ready
774          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
775          * either. The dialog could be destroyed before our subscription is. We fix
776          * this problem by keeping a reference to the dialog until it is time to
777          * destroy the subscription. We need to have the dialog available when the
778          * subscription is destroyed so that we can guarantee that our attempt to
779          * remove the serializer will be successful.
780          */
781         ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
782         pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
783
784         return 0;
785 }
786
787 static void subscription_destructor(void *obj)
788 {
789         struct ast_sip_subscription *sub = obj;
790
791         ast_debug(3, "Destroying SIP subscription\n");
792
793         subscription_persistence_remove(sub);
794
795         remove_subscription(sub);
796
797         ao2_cleanup(sub->datastores);
798         ao2_cleanup(sub->endpoint);
799
800         if (sip_subscription_get_dlg(sub)) {
801                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
802         }
803         ast_taskprocessor_unreference(sub->serializer);
804 }
805
806
807 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
808 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
809                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
810 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
811                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
812 static void pubsub_on_client_refresh(pjsip_evsub *sub);
813 static void pubsub_on_server_timeout(pjsip_evsub *sub);
814
815
816 static pjsip_evsub_user pubsub_cb = {
817         .on_evsub_state = pubsub_on_evsub_state,
818         .on_rx_refresh = pubsub_on_rx_refresh,
819         .on_rx_notify = pubsub_on_rx_notify,
820         .on_client_refresh = pubsub_on_client_refresh,
821         .on_server_timeout = pubsub_on_server_timeout,
822 };
823
824 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
825                 struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role)
826 {
827         struct ast_sip_subscription *sub;
828
829         sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
830         if (!sub) {
831                 return NULL;
832         }
833         strcpy(sub->resource, resource); /* Safe */
834
835         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
836         if (!sub->datastores) {
837                 ao2_ref(sub, -1);
838                 return NULL;
839         }
840         sub->serializer = ast_sip_create_serializer();
841         if (!sub->serializer) {
842                 ao2_ref(sub, -1);
843                 return NULL;
844         }
845         sub->role = role;
846         sub->type = SIP_SUBSCRIPTION_REAL;
847         sub->endpoint = ao2_bump(endpoint);
848         sub->handler = handler;
849
850         return sub;
851 }
852
853 static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
854 {
855         /* We keep a reference to the dialog until our subscription is destroyed. See
856          * the subscription_destructor for more details
857          */
858         pjsip_dlg_inc_session(dlg, &pubsub_module);
859         sub->reality.real.dlg = dlg;
860         ast_sip_dialog_set_serializer(dlg, sub->serializer);
861         pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
862 }
863
864 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
865                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
866                 struct ast_sip_pubsub_body_generator *generator)
867 {
868         struct ast_sip_subscription *sub;
869         pjsip_dialog *dlg;
870         struct subscription_persistence *persistence;
871
872         sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER);
873         if (!sub) {
874                 return NULL;
875         }
876
877         sub->body_generator = generator;
878         dlg = ast_sip_create_dialog_uas(endpoint, rdata);
879         if (!dlg) {
880                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
881                 ao2_ref(sub, -1);
882                 return NULL;
883         }
884
885         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
886                         pubsub_module.id, MOD_DATA_PERSISTENCE);
887         if (persistence) {
888                 /* Update the created dialog with the persisted information */
889                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
890                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
891                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
892                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
893                 dlg->local.cseq = persistence->cseq;
894                 dlg->remote.cseq = persistence->cseq;
895         }
896
897         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub);
898         subscription_setup_dialog(sub, dlg);
899
900         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
901                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
902
903         add_subscription(sub);
904         return sub;
905 }
906
907 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
908 {
909         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
910         pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
911         pj_str_t name;
912
913         pj_cstr(&name, header);
914
915         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
916 }
917
918 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
919                 struct ast_sip_endpoint *endpoint, const char *resource)
920 {
921         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
922         pjsip_dialog *dlg;
923         struct ast_sip_contact *contact;
924         pj_str_t event;
925         pjsip_tx_data *tdata;
926         pjsip_evsub *evsub;
927
928         sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER);
929         if (!sub) {
930                 return NULL;
931         }
932
933         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
934         if (!contact || ast_strlen_zero(contact->uri)) {
935                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
936                                 ast_sorcery_object_get_id(endpoint));
937                 ao2_ref(sub, -1);
938                 ao2_cleanup(contact);
939                 return NULL;
940         }
941
942         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
943         ao2_cleanup(contact);
944         if (!dlg) {
945                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
946                 ao2_ref(sub, -1);
947                 return NULL;
948         }
949
950         pj_cstr(&event, handler->event_name);
951         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub);
952         subscription_setup_dialog(sub, dlg);
953
954         add_subscription(sub);
955
956         evsub = sip_subscription_get_evsub(sub);
957
958         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
959                 pjsip_evsub_send_request(evsub, tdata);
960         } else {
961                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
962                  * being called and terminating the subscription. Therefore, we don't
963                  * need to decrease the reference count of sub here.
964                  */
965                 pjsip_evsub_terminate(evsub, PJ_TRUE);
966                 return NULL;
967         }
968
969         return sub;
970 }
971
972 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
973 {
974         ast_assert(sub->endpoint != NULL);
975         ao2_ref(sub->endpoint, +1);
976         return sub->endpoint;
977 }
978
979 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
980 {
981         ast_assert(sub->serializer != NULL);
982         return sub->serializer;
983 }
984
985 static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
986 {
987         struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
988         int res;
989
990         ao2_ref(sub, +1);
991         res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub),
992                         tdata) == PJ_SUCCESS ? 0 : -1;
993
994         subscription_persistence_update(sub, NULL);
995
996         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
997                 "StateText: %s\r\n"
998                 "Endpoint: %s\r\n",
999                 pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)),
1000                 ast_sorcery_object_get_id(endpoint));
1001         ao2_cleanup(sub);
1002         ao2_cleanup(endpoint);
1003
1004         return res;
1005 }
1006
1007 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data,
1008                 int terminate)
1009 {
1010         struct ast_sip_body body = {
1011                 .type = ast_sip_subscription_get_body_type(sub),
1012                 .subtype = ast_sip_subscription_get_body_subtype(sub),
1013         };
1014         struct ast_str *body_text = ast_str_create(64);
1015         pjsip_evsub *evsub = sip_subscription_get_evsub(sub);
1016         pjsip_tx_data *tdata;
1017         pjsip_evsub_state state;
1018
1019         if (!body_text) {
1020                 return -1;
1021         }
1022
1023         if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) {
1024                 ast_free(body_text);
1025                 return -1;
1026         }
1027
1028         body.body_text = ast_str_buffer(body_text);
1029
1030         if (terminate) {
1031                 state = PJSIP_EVSUB_STATE_TERMINATED;
1032         } else {
1033                 state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ?
1034                         PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED;
1035         }
1036
1037         if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) {
1038                 ast_free(body_text);
1039                 return -1;
1040         }
1041         if (ast_sip_add_body(tdata, &body)) {
1042                 ast_free(body_text);
1043                 pjsip_tx_data_dec_ref(tdata);
1044                 return -1;
1045         }
1046         if (sip_subscription_send_request(sub, tdata)) {
1047                 ast_free(body_text);
1048                 pjsip_tx_data_dec_ref(tdata);
1049                 return -1;
1050         }
1051
1052         return 0;
1053 }
1054
1055 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1056 {
1057         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1058         ast_copy_pj_str(buf, &dlg->local.info_str, size);
1059 }
1060
1061 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1062 {
1063         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1064         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
1065 }
1066
1067 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
1068 {
1069         return sub->resource;
1070 }
1071
1072 static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
1073 {
1074         /* If this is a persistence recreation the subscription has already been accepted */
1075         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
1076                 return 0;
1077         }
1078
1079         return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
1080 }
1081
1082 static void subscription_datastore_destroy(void *obj)
1083 {
1084         struct ast_datastore *datastore = obj;
1085
1086         /* Using the destroy function (if present) destroy the data */
1087         if (datastore->info->destroy != NULL && datastore->data != NULL) {
1088                 datastore->info->destroy(datastore->data);
1089                 datastore->data = NULL;
1090         }
1091
1092         ast_free((void *) datastore->uid);
1093         datastore->uid = NULL;
1094 }
1095
1096 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
1097 {
1098         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
1099         const char *uid_ptr = uid;
1100
1101         if (!info) {
1102                 return NULL;
1103         }
1104
1105         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
1106         if (!datastore) {
1107                 return NULL;
1108         }
1109
1110         datastore->info = info;
1111         if (ast_strlen_zero(uid)) {
1112                 /* They didn't provide an ID so we'll provide one ourself */
1113                 struct ast_uuid *uuid = ast_uuid_generate();
1114                 char uuid_buf[AST_UUID_STR_LEN];
1115                 if (!uuid) {
1116                         return NULL;
1117                 }
1118                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
1119                 ast_free(uuid);
1120         }
1121
1122         datastore->uid = ast_strdup(uid_ptr);
1123         if (!datastore->uid) {
1124                 return NULL;
1125         }
1126
1127         ao2_ref(datastore, +1);
1128         return datastore;
1129 }
1130
1131 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
1132 {
1133         ast_assert(datastore != NULL);
1134         ast_assert(datastore->info != NULL);
1135         ast_assert(!ast_strlen_zero(datastore->uid));
1136
1137         if (!ao2_link(subscription->datastores, datastore)) {
1138                 return -1;
1139         }
1140         return 0;
1141 }
1142
1143 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
1144 {
1145         return ao2_find(subscription->datastores, name, OBJ_KEY);
1146 }
1147
1148 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
1149 {
1150         ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
1151 }
1152
1153 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
1154 {
1155         ast_assert(datastore != NULL);
1156         ast_assert(datastore->info != NULL);
1157         ast_assert(!ast_strlen_zero(datastore->uid));
1158
1159         if (!ao2_link(publication->datastores, datastore)) {
1160                 return -1;
1161         }
1162         return 0;
1163 }
1164
1165 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
1166 {
1167         return ao2_find(publication->datastores, name, OBJ_KEY);
1168 }
1169
1170 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
1171 {
1172         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
1173 }
1174
1175 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
1176
1177 static int publication_hash_fn(const void *obj, const int flags)
1178 {
1179         const struct ast_sip_publication *publication = obj;
1180         const int *entity_tag = obj;
1181
1182         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
1183 }
1184
1185 static int publication_cmp_fn(void *obj, void *arg, int flags)
1186 {
1187         const struct ast_sip_publication *publication1 = obj;
1188         const struct ast_sip_publication *publication2 = arg;
1189         const int *entity_tag = arg;
1190
1191         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
1192                 CMP_MATCH | CMP_STOP : 0);
1193 }
1194
1195 static void publish_add_handler(struct ast_sip_publish_handler *handler)
1196 {
1197         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1198         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
1199 }
1200
1201 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
1202 {
1203         if (ast_strlen_zero(handler->event_name)) {
1204                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
1205                 return -1;
1206         }
1207
1208         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
1209                 publication_hash_fn, publication_cmp_fn))) {
1210                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
1211                         handler->event_name);
1212                 return -1;
1213         }
1214
1215         publish_add_handler(handler);
1216
1217         ast_module_ref(ast_module_info->self);
1218
1219         return 0;
1220 }
1221
1222 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
1223 {
1224         struct ast_sip_publish_handler *iter;
1225         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1226         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
1227                 if (handler == iter) {
1228                         AST_RWLIST_REMOVE_CURRENT(next);
1229                         ao2_cleanup(handler->publications);
1230                         ast_module_unref(ast_module_info->self);
1231                         break;
1232                 }
1233         }
1234         AST_RWLIST_TRAVERSE_SAFE_END;
1235 }
1236
1237 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
1238
1239 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
1240 {
1241         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1242         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
1243         ast_module_ref(ast_module_info->self);
1244 }
1245
1246 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
1247 {
1248         struct ast_sip_subscription_handler *iter;
1249         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1250
1251         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
1252                 if (!strcmp(iter->event_name, event_name)) {
1253                         break;
1254                 }
1255         }
1256         return iter;
1257 }
1258
1259 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
1260 {
1261         pj_str_t event;
1262         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
1263         struct ast_sip_subscription_handler *existing;
1264         int i = 0;
1265
1266         if (ast_strlen_zero(handler->event_name)) {
1267                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
1268                 return -1;
1269         }
1270
1271         existing = find_sub_handler_for_event_name(handler->event_name);
1272         if (existing) {
1273                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
1274                                 "A handler is already registered\n", handler->event_name);
1275                 return -1;
1276         }
1277
1278         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
1279                 pj_cstr(&accept[i], handler->accept[i]);
1280         }
1281
1282         pj_cstr(&event, handler->event_name);
1283
1284         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
1285
1286         sub_add_handler(handler);
1287
1288         return 0;
1289 }
1290
1291 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
1292 {
1293         struct ast_sip_subscription_handler *iter;
1294         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1295         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
1296                 if (handler == iter) {
1297                         AST_RWLIST_REMOVE_CURRENT(next);
1298                         ast_module_unref(ast_module_info->self);
1299                         break;
1300                 }
1301         }
1302         AST_RWLIST_TRAVERSE_SAFE_END;
1303 }
1304
1305 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
1306                 const char *content_subtype)
1307 {
1308         struct ast_sip_pubsub_body_generator *iter;
1309         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1310
1311         AST_LIST_TRAVERSE(&body_generators, iter, list) {
1312                 if (!strcmp(iter->type, content_type) &&
1313                                 !strcmp(iter->subtype, content_subtype)) {
1314                         break;
1315                 }
1316         };
1317
1318         return iter;
1319 }
1320
1321 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
1322 {
1323         char *accept_copy = ast_strdupa(accept);
1324         char *subtype = accept_copy;
1325         char *type = strsep(&subtype, "/");
1326
1327         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
1328                 return NULL;
1329         }
1330
1331         return find_body_generator_type_subtype(type, subtype);
1332 }
1333
1334 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
1335                 size_t num_accept)
1336 {
1337         int i;
1338         struct ast_sip_pubsub_body_generator *generator = NULL;
1339
1340         for (i = 0; i < num_accept; ++i) {
1341                 generator = find_body_generator_accept(accept[i]);
1342                 if (generator) {
1343                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
1344                         break;
1345                 } else {
1346                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
1347                 }
1348         }
1349
1350         return generator;
1351 }
1352
1353 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
1354 {
1355         pjsip_expires_hdr *expires_header;
1356         struct ast_sip_subscription_handler *handler;
1357         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1358         struct ast_sip_subscription *sub;
1359         struct ast_sip_pubsub_body_generator *generator;
1360         char *resource;
1361         pjsip_uri *request_uri;
1362         pjsip_sip_uri *request_uri_sip;
1363         size_t resource_size;
1364         int resp;
1365
1366         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1367         ast_assert(endpoint != NULL);
1368
1369         if (!endpoint->subscription.allow) {
1370                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
1371                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
1372                 return PJ_TRUE;
1373         }
1374
1375         request_uri = rdata->msg_info.msg->line.req.uri;
1376
1377         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1378                 char uri_str[PJSIP_MAX_URL_SIZE];
1379
1380                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1381                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1382                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1383                 return PJ_TRUE;
1384         }
1385
1386         request_uri_sip = pjsip_uri_get_uri(request_uri);
1387         resource_size = pj_strlen(&request_uri_sip->user) + 1;
1388         resource = alloca(resource_size);
1389         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
1390
1391         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
1392
1393         if (expires_header) {
1394                 if (expires_header->ivalue == 0) {
1395                         ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
1396                                 ast_sorcery_object_get_id(endpoint));
1397                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1398                                 return PJ_TRUE;
1399                 }
1400                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
1401                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
1402                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
1403                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
1404                         return PJ_TRUE;
1405                 }
1406         }
1407
1408         handler = subscription_get_handler_from_rdata(rdata);
1409         if (!handler) {
1410                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1411                 return PJ_TRUE;
1412         }
1413
1414         generator = subscription_get_generator_from_rdata(rdata, handler);
1415         if (!generator) {
1416                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1417                 return PJ_TRUE;
1418         }
1419
1420         resp = handler->notifier->new_subscribe(endpoint, resource);
1421         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1422                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1423                 return PJ_TRUE;
1424         }
1425
1426         sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
1427         if (!sub) {
1428                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1429         } else {
1430                 sub->persistence = subscription_persistence_create(sub);
1431                 subscription_persistence_update(sub, rdata);
1432                 sip_subscription_accept(sub, rdata, resp);
1433                 if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) {
1434                         pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE);
1435                 }
1436         }
1437
1438         return PJ_TRUE;
1439 }
1440
1441 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
1442 {
1443         struct ast_sip_publish_handler *iter = NULL;
1444         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1445
1446         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
1447                 if (strcmp(event, iter->event_name)) {
1448                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
1449                         continue;
1450                 }
1451                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
1452                 break;
1453         }
1454
1455         return iter;
1456 }
1457
1458 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
1459         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
1460 {
1461         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1462
1463         if (etag_hdr) {
1464                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
1465
1466                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
1467
1468                 if (sscanf(etag, "%30d", entity_id) != 1) {
1469                         return SIP_PUBLISH_UNKNOWN;
1470                 }
1471         }
1472
1473         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1474
1475         if (!(*expires)) {
1476                 return SIP_PUBLISH_REMOVE;
1477         } else if (!etag_hdr && rdata->msg_info.msg->body) {
1478                 return SIP_PUBLISH_INITIAL;
1479         } else if (etag_hdr && !rdata->msg_info.msg->body) {
1480                 return SIP_PUBLISH_REFRESH;
1481         } else if (etag_hdr && rdata->msg_info.msg->body) {
1482                 return SIP_PUBLISH_MODIFY;
1483         }
1484
1485         return SIP_PUBLISH_UNKNOWN;
1486 }
1487
1488 /*! \brief Internal destructor for publications */
1489 static void publication_destroy_fn(void *obj)
1490 {
1491         struct ast_sip_publication *publication = obj;
1492
1493         ast_debug(3, "Destroying SIP publication\n");
1494
1495         ao2_cleanup(publication->datastores);
1496         ao2_cleanup(publication->endpoint);
1497 }
1498
1499 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1500         const char *resource, const char *event_configuration_name)
1501 {
1502         struct ast_sip_publication *publication;
1503         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1504         size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
1505         char *dst;
1506
1507         ast_assert(endpoint != NULL);
1508
1509         if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
1510                 return NULL;
1511         }
1512
1513         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1514                 ao2_ref(publication, -1);
1515                 return NULL;
1516         }
1517
1518         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1519         ao2_ref(endpoint, +1);
1520         publication->endpoint = endpoint;
1521         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1522         publication->sched_id = -1;
1523         dst = publication->data;
1524         publication->resource = strcpy(dst, resource);
1525         dst += resource_len;
1526         publication->event_configuration_name = strcpy(dst, event_configuration_name);
1527
1528         return publication;
1529 }
1530
1531 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
1532                 pjsip_rx_data *rdata)
1533 {
1534         pj_status_t status;
1535         pjsip_tx_data *tdata;
1536         pjsip_transaction *tsx;
1537
1538         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
1539                 return -1;
1540         }
1541
1542         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1543                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1544                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1545
1546                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1547                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1548                         pjsip_tx_data_dec_ref(tdata);
1549                         return -1;
1550                 }
1551
1552                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
1553                 ast_sip_add_header(tdata, "Expires", expires);
1554         }
1555
1556         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1557                 return -1;
1558         }
1559
1560         pjsip_tsx_recv_msg(tsx, rdata);
1561
1562         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
1563                 return -1;
1564         }
1565
1566         return 0;
1567 }
1568
1569 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1570         struct ast_sip_publish_handler *handler)
1571 {
1572         struct ast_sip_publication *publication;
1573         char *resource_name;
1574         size_t resource_size;
1575         RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
1576         struct ast_variable *event_configuration_name = NULL;
1577         pjsip_uri *request_uri;
1578         pjsip_sip_uri *request_uri_sip;
1579         int resp;
1580
1581         request_uri = rdata->msg_info.msg->line.req.uri;
1582
1583         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1584                 char uri_str[PJSIP_MAX_URL_SIZE];
1585
1586                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1587                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1588                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1589                 return NULL;
1590         }
1591
1592         request_uri_sip = pjsip_uri_get_uri(request_uri);
1593         resource_size = pj_strlen(&request_uri_sip->user) + 1;
1594         resource_name = alloca(resource_size);
1595         ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
1596
1597         resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
1598         if (!resource) {
1599                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
1600                 return NULL;
1601         }
1602
1603         if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
1604                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
1605                 return NULL;
1606         }
1607
1608         for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
1609                 if (!strcmp(event_configuration_name->name, handler->event_name)) {
1610                         break;
1611                 }
1612         }
1613
1614         if (!event_configuration_name) {
1615                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
1616                 return NULL;
1617         }
1618
1619         resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
1620
1621         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1622                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1623                 return NULL;
1624         }
1625
1626         publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
1627
1628         if (!publication) {
1629                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
1630                 return NULL;
1631         }
1632
1633         publication->handler = handler;
1634         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
1635                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
1636                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1637                 ao2_cleanup(publication);
1638                 return NULL;
1639         }
1640
1641         sip_publication_respond(publication, resp, rdata);
1642
1643         return publication;
1644 }
1645
1646 static int publish_expire_callback(void *data)
1647 {
1648         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
1649
1650         if (publication->handler->publish_expire) {
1651                 publication->handler->publish_expire(publication);
1652         }
1653
1654         return 0;
1655 }
1656
1657 static int publish_expire(const void *data)
1658 {
1659         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
1660
1661         ao2_unlink(publication->handler->publications, publication);
1662         publication->sched_id = -1;
1663
1664         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
1665                 ao2_cleanup(publication);
1666         }
1667
1668         return 0;
1669 }
1670
1671 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
1672 {
1673         pjsip_event_hdr *event_header;
1674         struct ast_sip_publish_handler *handler;
1675         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1676         char event[32];
1677         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
1678         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
1679         enum sip_publish_type publish_type;
1680         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
1681         int expires = 0, entity_id, response = 0;
1682
1683         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1684         ast_assert(endpoint != NULL);
1685
1686         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
1687         if (!event_header) {
1688                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
1689                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1690                 return PJ_TRUE;
1691         }
1692         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
1693
1694         handler = find_pub_handler(event);
1695         if (!handler) {
1696                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
1697                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1698                 return PJ_TRUE;
1699         }
1700
1701         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
1702
1703         /* If this is not an initial publish ensure that a publication is present */
1704         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
1705                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
1706                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
1707
1708                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
1709                                 NULL, NULL);
1710                         return PJ_TRUE;
1711                 }
1712
1713                 /* Per the RFC every response has to have a new entity tag */
1714                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1715
1716                 /* Update the expires here so that the created responses will contain the correct value */
1717                 publication->expires = expires;
1718         }
1719
1720         switch (publish_type) {
1721                 case SIP_PUBLISH_INITIAL:
1722                         publication = publish_request_initial(endpoint, rdata, handler);
1723                         break;
1724                 case SIP_PUBLISH_REFRESH:
1725                 case SIP_PUBLISH_MODIFY:
1726                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
1727                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
1728                                 /* If an error occurs we want to terminate the publication */
1729                                 expires = 0;
1730                         }
1731                         response = 200;
1732                         break;
1733                 case SIP_PUBLISH_REMOVE:
1734                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
1735                                         AST_SIP_PUBLISH_STATE_TERMINATED);
1736                         response = 200;
1737                         break;
1738                 case SIP_PUBLISH_UNKNOWN:
1739                 default:
1740                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1741                         break;
1742         }
1743
1744         if (publication) {
1745                 if (expires) {
1746                         ao2_link(handler->publications, publication);
1747
1748                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1749                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1750                 } else {
1751                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1752                 }
1753         }
1754
1755         if (response) {
1756                 sip_publication_respond(publication, response, rdata);
1757         }
1758
1759         return PJ_TRUE;
1760 }
1761
1762 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1763 {
1764         return pub->endpoint;
1765 }
1766
1767 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
1768 {
1769         return pub->resource;
1770 }
1771
1772 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
1773 {
1774         return pub->event_configuration_name;
1775 }
1776
1777 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1778 {
1779         struct ast_sip_pubsub_body_generator *existing;
1780         pj_str_t accept;
1781         pj_size_t accept_len;
1782
1783         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1784         if (existing) {
1785                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1786                                 "One is already registered.\n", generator->type, generator->subtype);
1787                 return -1;
1788         }
1789
1790         AST_RWLIST_WRLOCK(&body_generators);
1791         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1792         AST_RWLIST_UNLOCK(&body_generators);
1793
1794         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1795          * null-terminated, so there is no need to allocate for the extra null
1796          * byte
1797          */
1798         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1799
1800         accept.ptr = alloca(accept_len);
1801         accept.slen = accept_len;
1802         /* Safe use of sprintf */
1803         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1804         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1805                         PJSIP_H_ACCEPT, NULL, 1, &accept);
1806
1807         return 0;
1808 }
1809
1810 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1811 {
1812         struct ast_sip_pubsub_body_generator *iter;
1813         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1814
1815         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1816                 if (iter == generator) {
1817                         AST_LIST_REMOVE_CURRENT(list);
1818                         break;
1819                 }
1820         }
1821         AST_RWLIST_TRAVERSE_SAFE_END;
1822 }
1823
1824 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1825 {
1826         AST_RWLIST_WRLOCK(&body_supplements);
1827         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1828         AST_RWLIST_UNLOCK(&body_supplements);
1829
1830         return 0;
1831 }
1832
1833 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1834 {
1835         struct ast_sip_pubsub_body_supplement *iter;
1836         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1837
1838         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1839                 if (iter == supplement) {
1840                         AST_LIST_REMOVE_CURRENT(list);
1841                         break;
1842                 }
1843         }
1844         AST_RWLIST_TRAVERSE_SAFE_END;
1845 }
1846
1847 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1848 {
1849         return sub->body_generator->type;
1850 }
1851
1852 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1853 {
1854         return sub->body_generator->subtype;
1855 }
1856
1857 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1858                 void *data, struct ast_str **str)
1859 {
1860         struct ast_sip_pubsub_body_supplement *supplement;
1861         struct ast_sip_pubsub_body_generator *generator;
1862         int res = 0;
1863         void *body;
1864
1865         generator = find_body_generator_type_subtype(type, subtype);
1866         if (!generator) {
1867                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1868                                 type, subtype);
1869                 return -1;
1870         }
1871
1872         body = generator->allocate_body(data);
1873         if (!body) {
1874                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1875                                 type, subtype);
1876                 return -1;
1877         }
1878
1879         if (generator->generate_body_content(body, data)) {
1880                 res = -1;
1881                 goto end;
1882         }
1883
1884         AST_RWLIST_RDLOCK(&body_supplements);
1885         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1886                 if (!strcmp(generator->type, supplement->type) &&
1887                                 !strcmp(generator->subtype, supplement->subtype)) {
1888                         res = supplement->supplement_body(body, data);
1889                         if (res) {
1890                                 break;
1891                         }
1892                 }
1893         }
1894         AST_RWLIST_UNLOCK(&body_supplements);
1895
1896         if (!res) {
1897                 generator->to_string(body, str);
1898         }
1899
1900 end:
1901         if (generator->destroy_body) {
1902                 generator->destroy_body(body);
1903         }
1904
1905         return res;
1906 }
1907
1908 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1909 {
1910         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1911                 return pubsub_on_rx_subscribe_request(rdata);
1912         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1913                 return pubsub_on_rx_publish_request(rdata);
1914         }
1915
1916         return PJ_FALSE;
1917 }
1918
1919 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1920 {
1921         struct ast_sip_subscription *sub;
1922         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1923                 return;
1924         }
1925
1926         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1927         if (!sub) {
1928                 return;
1929         }
1930
1931         if (sub->handler->subscription_shutdown) {
1932                 sub->handler->subscription_shutdown(sub);
1933         }
1934         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1935 }
1936
1937 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1938                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1939 {
1940         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1941         enum ast_sip_subscription_notify_reason reason;
1942
1943         if (!sub) {
1944                 return;
1945         }
1946
1947         if (pjsip_evsub_get_state(sip_subscription_get_evsub(sub)) == PJSIP_EVSUB_STATE_TERMINATED) {
1948                 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED;
1949         } else {
1950                 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED;
1951         }
1952         if (sub->handler->notifier->notify_required(sub, reason)) {
1953                 *p_st_code = 500;
1954         }
1955 }
1956
1957 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1958                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1959 {
1960         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1961
1962         if (!sub) {
1963                 return;
1964         }
1965
1966         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
1967                         pjsip_evsub_get_state(evsub));
1968 }
1969
1970 static int serialized_pubsub_on_client_refresh(void *userdata)
1971 {
1972         struct ast_sip_subscription *sub = userdata;
1973         pjsip_evsub *evsub;
1974         pjsip_tx_data *tdata;
1975
1976         evsub = sip_subscription_get_evsub(sub);
1977
1978         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1979                 pjsip_evsub_send_request(evsub, tdata);
1980         } else {
1981                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1982                 return 0;
1983         }
1984         ao2_cleanup(sub);
1985         return 0;
1986 }
1987
1988 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1989 {
1990         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1991
1992         ao2_ref(sub, +1);
1993         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1994 }
1995
1996 static int serialized_pubsub_on_server_timeout(void *userdata)
1997 {
1998         struct ast_sip_subscription *sub = userdata;
1999
2000         sub->handler->notifier->notify_required(sub,
2001                         AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED);
2002
2003         ao2_cleanup(sub);
2004         return 0;
2005 }
2006
2007 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
2008 {
2009         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
2010
2011         if (!sub) {
2012                 /* if a subscription has been terminated and the subscription
2013                    timeout/expires is less than the time it takes for all pending
2014                    transactions to end then the subscription timer will not have
2015                    been canceled yet and sub will be null, so do nothing since
2016                    the subscription has already been terminated. */
2017                 return;
2018         }
2019
2020         ao2_ref(sub, +1);
2021         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
2022 }
2023
2024 static int ami_subscription_detail(struct ast_sip_subscription *sub,
2025                                    struct ast_sip_ami *ami,
2026                                    const char *event)
2027 {
2028         RAII_VAR(struct ast_str *, buf,
2029                  ast_sip_create_ami_event(event, ami), ast_free);
2030
2031         if (!buf) {
2032                 return -1;
2033         }
2034
2035         sip_subscription_to_ami(sub, &buf);
2036         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
2037         return 0;
2038 }
2039
2040 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
2041 {
2042         return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
2043                 sub, arg, "InboundSubscriptionDetail") : 0;
2044 }
2045
2046 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
2047 {
2048         return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
2049                 sub, arg, "OutboundSubscriptionDetail") : 0;
2050 }
2051
2052 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
2053 {
2054         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
2055         int num;
2056
2057         astman_send_listack(s, m, "Following are Events for "
2058                             "each inbound Subscription", "start");
2059
2060         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
2061
2062         astman_append(s, "Event: InboundSubscriptionDetailComplete\r\n");
2063         if (!ast_strlen_zero(ami.action_id)) {
2064                 astman_append(s, "ActionID: %s\r\n", ami.action_id);
2065         }
2066         astman_append(s, "EventList: Complete\r\n"
2067                       "ListItems: %d\r\n\r\n", num);
2068         return 0;
2069 }
2070
2071 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
2072 {
2073         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
2074         int num;
2075
2076         astman_send_listack(s, m, "Following are Events for "
2077                             "each outbound Subscription", "start");
2078
2079         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
2080
2081         astman_append(s, "Event: OutboundSubscriptionDetailComplete\r\n");
2082         if (!ast_strlen_zero(ami.action_id)) {
2083                 astman_append(s, "ActionID: %s\r\n", ami.action_id);
2084         }
2085         astman_append(s, "EventList: Complete\r\n"
2086                       "ListItems: %d\r\n\r\n", num);
2087         return 0;
2088 }
2089
2090 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
2091 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
2092
2093 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2094 {
2095         struct subscription_persistence *persistence = obj;
2096
2097         persistence->endpoint = ast_strdup(var->value);
2098         return 0;
2099 }
2100
2101 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
2102 {
2103         const struct subscription_persistence *persistence = obj;
2104
2105         *buf = ast_strdup(persistence->endpoint);
2106         return 0;
2107 }
2108
2109 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2110 {
2111         struct subscription_persistence *persistence = obj;
2112
2113         persistence->tag = ast_strdup(var->value);
2114         return 0;
2115 }
2116
2117 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
2118 {
2119         const struct subscription_persistence *persistence = obj;
2120
2121         *buf = ast_strdup(persistence->tag);
2122         return 0;
2123 }
2124
2125 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2126 {
2127         struct subscription_persistence *persistence = obj;
2128         return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
2129 }
2130
2131 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
2132 {
2133         const struct subscription_persistence *persistence = obj;
2134         return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
2135 }
2136
2137 static int resource_endpoint_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2138 {
2139         struct ast_sip_publication_resource *resource = obj;
2140
2141         ast_free(resource->endpoint);
2142         resource->endpoint = ast_strdup(var->value);
2143
2144         return 0;
2145 }
2146
2147 static int resource_event_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2148 {
2149         struct ast_sip_publication_resource *resource = obj;
2150         /* The event configuration name starts with 'event_' so skip past it to get the real name */
2151         const char *event = var->name + 6;
2152         struct ast_variable *item;
2153
2154         if (ast_strlen_zero(event) || ast_strlen_zero(var->value)) {
2155                 return -1;
2156         }
2157
2158         item = ast_variable_new(event, var->value, "");
2159         if (!item) {
2160                 return -1;
2161         }
2162
2163         if (resource->events) {
2164                 item->next = resource->events;
2165         }
2166         resource->events = item;
2167
2168         return 0;
2169 }
2170
2171 static int load_module(void)
2172 {
2173         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
2174         struct ast_sorcery *sorcery = ast_sip_get_sorcery();
2175
2176         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
2177
2178         if (!(sched = ast_sched_context_create())) {
2179                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
2180                 return AST_MODULE_LOAD_FAILURE;
2181         }
2182
2183         if (ast_sched_start_thread(sched)) {
2184                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
2185                 ast_sched_context_destroy(sched);
2186                 return AST_MODULE_LOAD_FAILURE;
2187         }
2188
2189         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
2190
2191         if (ast_sip_register_service(&pubsub_module)) {
2192                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
2193                 ast_sched_context_destroy(sched);
2194                 return AST_MODULE_LOAD_FAILURE;
2195         }
2196
2197         ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
2198         ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
2199         if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
2200                 NULL, NULL)) {
2201                 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
2202                 ast_sip_unregister_service(&pubsub_module);
2203                 ast_sched_context_destroy(sched);
2204                 return AST_MODULE_LOAD_FAILURE;
2205         }
2206         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
2207                 CHARFLDSET(struct subscription_persistence, packet));
2208         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
2209                 CHARFLDSET(struct subscription_persistence, src_name));
2210         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
2211                 FLDSET(struct subscription_persistence, src_port));
2212         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
2213                 CHARFLDSET(struct subscription_persistence, transport_key));
2214         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
2215                 CHARFLDSET(struct subscription_persistence, local_name));
2216         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
2217                 FLDSET(struct subscription_persistence, local_port));
2218         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
2219                 FLDSET(struct subscription_persistence, cseq));
2220         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
2221                 persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
2222         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
2223                 persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
2224         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
2225                 persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
2226
2227         ast_sorcery_apply_default(sorcery, "inbound-publication", "config", "pjsip.conf,criteria=type=inbound-publication");
2228         if (ast_sorcery_object_register(sorcery, "inbound-publication", publication_resource_alloc,
2229                 NULL, NULL)) {
2230                 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
2231                 ast_sip_unregister_service(&pubsub_module);
2232                 ast_sched_context_destroy(sched);
2233                 return AST_MODULE_LOAD_FAILURE;
2234         }
2235         ast_sorcery_object_field_register(sorcery, "inbound-publication", "type", "", OPT_NOOP_T, 0, 0);
2236         ast_sorcery_object_field_register_custom(sorcery, "inbound-publication", "endpoint", "",
2237                 resource_endpoint_handler, NULL, NULL, 0, 0);
2238         ast_sorcery_object_fields_register(sorcery, "inbound-publication", "^event_", resource_event_handler, NULL);
2239         ast_sorcery_reload_object(sorcery, "inbound-publication");
2240
2241         if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
2242                 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
2243         } else {
2244                 stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
2245         }
2246
2247         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
2248                                  ami_show_subscriptions_inbound);
2249         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
2250                                  ami_show_subscriptions_outbound);
2251
2252         return AST_MODULE_LOAD_SUCCESS;
2253 }
2254
2255 static int unload_module(void)
2256 {
2257         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
2258         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
2259
2260         if (sched) {
2261                 ast_sched_context_destroy(sched);
2262         }
2263
2264         return 0;
2265 }
2266
2267 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
2268                 .support_level = AST_MODULE_SUPPORT_CORE,
2269                 .load = load_module,
2270                 .unload = unload_module,
2271                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
2272 );