9032bd3f62e90529bf05400ed73e0d3a738fda3d
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47
48 /*** DOCUMENTATION
49         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
50                 <synopsis>
51                         Lists subscriptions.
52                 </synopsis>
53                 <syntax />
54                 <description>
55                         <para>
56                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
57                         is issued for each subscription object.  Once all detail events are completed an
58                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
59                         </para>
60                 </description>
61         </manager>
62         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
63                 <synopsis>
64                         Lists subscriptions.
65                 </synopsis>
66                 <syntax />
67                 <description>
68                         <para>
69                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
70                         is issued for each subscription object.  Once all detail events are completed an
71                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
72                         </para>
73                 </description>
74         </manager>
75         <configInfo name="res_pjsip_pubsub" language="en_US">
76                 <synopsis>Module that implements publish and subscribe support.</synopsis>
77                 <configFile name="pjsip.conf">
78                         <configObject name="subscription_persistence">
79                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
80                                 <configOption name="packet">
81                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
82                                 </configOption>
83                                 <configOption name="src_name">
84                                         <synopsis>The source address of the subscription</synopsis>
85                                 </configOption>
86                                 <configOption name="src_port">
87                                         <synopsis>The source port of the subscription</synopsis>
88                                 </configOption>
89                                 <configOption name="transport_key">
90                                         <synopsis>The type of transport the subscription was received on</synopsis>
91                                 </configOption>
92                                 <configOption name="local_name">
93                                         <synopsis>The local address the subscription was received on</synopsis>
94                                 </configOption>
95                                 <configOption name="local_port">
96                                         <synopsis>The local port the subscription was received on</synopsis>
97                                 </configOption>
98                                 <configOption name="cseq">
99                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
100                                 </configOption>
101                                 <configOption name="tag">
102                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="endpoint">
105                                         <synopsis>The name of the endpoint that subscribed</synopsis>
106                                 </configOption>
107                                 <configOption name="expires">
108                                         <synopsis>The time at which the subscription expires</synopsis>
109                                 </configOption>
110                         </configObject>
111                 </configFile>
112         </configInfo>
113  ***/
114
115 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
116
117 static struct pjsip_module pubsub_module = {
118         .name = { "PubSub Module", 13 },
119         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
120         .on_rx_request = pubsub_on_rx_request,
121 };
122
123 #define MOD_DATA_PERSISTENCE "sub_persistence"
124 #define MOD_DATA_MSG "sub_msg"
125
126 static const pj_str_t str_event_name = { "Event", 5 };
127
128 /*! \brief Scheduler used for automatically expiring publications */
129 static struct ast_sched_context *sched;
130
131 /*! \brief Number of buckets for publications (on a per handler) */
132 #define PUBLICATIONS_BUCKETS 37
133
134 /*! \brief Default expiration time for PUBLISH if one is not specified */
135 #define DEFAULT_PUBLISH_EXPIRES 3600
136
137 /*! \brief Defined method for PUBLISH */
138 const pjsip_method pjsip_publish_method =
139 {
140         PJSIP_OTHER_METHOD,
141         { "PUBLISH", 7 }
142 };
143
144 /*!
145  * \brief The types of PUBLISH messages defined in RFC 3903
146  */
147 enum sip_publish_type {
148         /*!
149          * \brief Unknown
150          *
151          * \details
152          * This actually is not defined in RFC 3903. We use this as a constant
153          * to indicate that an incoming PUBLISH does not fit into any of the
154          * other categories and is thus invalid.
155          */
156         SIP_PUBLISH_UNKNOWN,
157
158         /*!
159          * \brief Initial
160          *
161          * \details
162          * The first PUBLISH sent. This will contain a non-zero Expires header
163          * as well as a body that indicates the current state of the endpoint
164          * that has sent the message. The initial PUBLISH is the only type
165          * of PUBLISH to not contain a Sip-If-Match header in it.
166          */
167         SIP_PUBLISH_INITIAL,
168
169         /*!
170          * \brief Refresh
171          *
172          * \details
173          * Used to keep a published state from expiring. This will contain a
174          * non-zero Expires header but no body since its purpose is not to
175          * update state.
176          */
177         SIP_PUBLISH_REFRESH,
178
179         /*!
180          * \brief Modify
181          *
182          * \details
183          * Used to change state from its previous value. This will contain
184          * a body updating the published state. May or may not contain an
185          * Expires header.
186          */
187         SIP_PUBLISH_MODIFY,
188
189         /*!
190          * \brief Remove
191          *
192          * \details
193          * Used to remove published state from an ESC. This will contain
194          * an Expires header set to 0 and likely no body.
195          */
196         SIP_PUBLISH_REMOVE,
197 };
198
199 /*!
200  * Used to create new entity IDs by ESCs.
201  */
202 static int esc_etag_counter;
203
204 /*!
205  * \brief Structure representing a SIP publication
206  */
207 struct ast_sip_publication {
208         /*! Publication datastores set up by handlers */
209         struct ao2_container *datastores;
210         /*! \brief Entity tag for the publication */
211         int entity_tag;
212         /*! \brief Handler for this publication */
213         struct ast_sip_publish_handler *handler;
214         /*! \brief The endpoint with which the subscription is communicating */
215         struct ast_sip_endpoint *endpoint;
216         /*! \brief Expiration time of the publication */
217         int expires;
218         /*! \brief Scheduled item for expiration of publication */
219         int sched_id;
220 };
221
222
223 /*!
224  * \brief Structure used for persisting an inbound subscription
225  */
226 struct subscription_persistence {
227         /*! Sorcery object details */
228         SORCERY_OBJECT(details);
229         /*! The name of the endpoint involved in the subscrption */
230         char *endpoint;
231         /*! SIP message that creates the subscription */
232         char packet[PJSIP_MAX_PKT_LEN];
233         /*! Source address of the message */
234         char src_name[PJ_INET6_ADDRSTRLEN];
235         /*! Source port of the message */
236         int src_port;
237         /*! Local transport key type */
238         char transport_key[32];
239         /*! Local transport address */
240         char local_name[PJ_INET6_ADDRSTRLEN];
241         /*! Local transport port */
242         int local_port;
243         /*! Next CSeq to use for message */
244         unsigned int cseq;
245         /*! Local tag of the dialog */
246         char *tag;
247         /*! When this subscription expires */
248         struct timeval expires;
249 };
250
251 /*!
252  * \brief Real subscription details
253  *
254  * A real subscription is one that has a direct link to a
255  * PJSIP subscription and dialog.
256  */
257 struct ast_sip_real_subscription {
258         /*! The underlying PJSIP event subscription structure */
259         pjsip_evsub *evsub;
260         /*! The underlying PJSIP dialog */
261         pjsip_dialog *dlg;
262 };
263
264 /*!
265  * \brief Virtual subscription details
266  *
267  * A virtual subscription is one that does not have a direct
268  * link to a PJSIP subscription. Instead, it is a descendent
269  * of an ast_sip_subscription. Following the ancestry will
270  * eventually lead to a real subscription.
271  */
272 struct ast_sip_virtual_subscription {
273         struct ast_sip_subscription *parent;
274 };
275
276 /*!
277  * \brief Discriminator between real and virtual subscriptions
278  */
279 enum sip_subscription_type {
280         /*!
281          * \brief a "real" subscription.
282          *
283          * Real subscriptions are at the root of a tree of subscriptions.
284          * A real subscription has a corresponding SIP subscription in the
285          * PJSIP stack.
286          */
287         SIP_SUBSCRIPTION_REAL,
288         /*!
289          * \brief a "virtual" subscription.
290          *
291          * Virtual subscriptions are the descendents of real subscriptions
292          * in a tree of subscriptions. Virtual subscriptions do not have
293          * a corresponding SIP subscription in the PJSIP stack. Instead,
294          * when a state change happens on a virtual subscription, the
295          * state change is indicated to the virtual subscription's parent.
296          */
297         SIP_SUBSCRIPTION_VIRTUAL,
298 };
299
300 /*!
301  * \brief Structure representing a SIP subscription
302  */
303 struct ast_sip_subscription {
304         /*! Subscription datastores set up by handlers */
305         struct ao2_container *datastores;
306         /*! The endpoint with which the subscription is communicating */
307         struct ast_sip_endpoint *endpoint;
308         /*! Serializer on which to place operations for this subscription */
309         struct ast_taskprocessor *serializer;
310         /*! The handler for this subscription */
311         const struct ast_sip_subscription_handler *handler;
312         /*! The role for this subscription */
313         enum ast_sip_subscription_role role;
314         /*! Indicator of real or virtual subscription */
315         enum sip_subscription_type type;
316         /*! Real and virtual components of the subscription */
317         union {
318                 struct ast_sip_real_subscription real;
319                 struct ast_sip_virtual_subscription virtual;
320         } reality;
321         /*! Body generaator for NOTIFYs */
322         struct ast_sip_pubsub_body_generator *body_generator;
323         /*! Persistence information */
324         struct subscription_persistence *persistence;
325         /*! Next item in the list */
326         AST_LIST_ENTRY(ast_sip_subscription) next;
327         /*! List of child subscriptions */
328         AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children;
329         /*! Name of resource being subscribed to */
330         char resource[0];
331 };
332
333 static const char *sip_subscription_roles_map[] = {
334         [AST_SIP_SUBSCRIBER] = "Subscriber",
335         [AST_SIP_NOTIFIER] = "Notifier"
336 };
337
338 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
339
340 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
341 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
342
343 static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
344 {
345         return sub->reality.real.evsub;
346 }
347
348 static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
349 {
350         return sub->reality.real.dlg;
351 }
352
353 /*! \brief Destructor for subscription persistence */
354 static void subscription_persistence_destroy(void *obj)
355 {
356         struct subscription_persistence *persistence = obj;
357
358         ast_free(persistence->endpoint);
359         ast_free(persistence->tag);
360 }
361
362 /*! \brief Allocator for subscription persistence */
363 static void *subscription_persistence_alloc(const char *name)
364 {
365         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
366 }
367
368 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
369 static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
370 {
371         char tag[PJ_GUID_STRING_LENGTH + 1];
372
373         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
374          * look it up by id at all.
375          */
376         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
377                 "subscription_persistence", NULL);
378
379         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
380
381         if (!persistence) {
382                 return NULL;
383         }
384
385         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
386         ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
387         persistence->tag = ast_strdup(tag);
388
389         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
390         return persistence;
391 }
392
393 /*! \brief Function which updates persistence information of a subscription in sorcery */
394 static void subscription_persistence_update(struct ast_sip_subscription *sub,
395         pjsip_rx_data *rdata)
396 {
397         pjsip_dialog *dlg;
398
399         if (!sub->persistence) {
400                 return;
401         }
402
403         dlg = sip_subscription_get_dlg(sub);
404         sub->persistence->cseq = dlg->local.cseq;
405
406         if (rdata) {
407                 int expires;
408                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
409
410                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
411                 sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
412
413                 ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
414                 ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
415                 sub->persistence->src_port = rdata->pkt_info.src_port;
416                 ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
417                         sizeof(sub->persistence->transport_key));
418                 ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
419                         sizeof(sub->persistence->local_name));
420                 sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
421         }
422
423         ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
424 }
425
426 /*! \brief Function which removes persistence of a subscription from sorcery */
427 static void subscription_persistence_remove(struct ast_sip_subscription *sub)
428 {
429         if (!sub->persistence) {
430                 return;
431         }
432
433         ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
434         ao2_ref(sub->persistence, -1);
435 }
436
437
438 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
439 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
440                 size_t num_accept);
441
442 /*! \brief Retrieve a handler using the Event header of an rdata message */
443 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
444 {
445         pjsip_event_hdr *event_header;
446         char event[32];
447         struct ast_sip_subscription_handler *handler;
448
449         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
450         if (!event_header) {
451                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
452                 return NULL;
453         }
454         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
455
456         handler = find_sub_handler_for_event_name(event);
457         if (!handler) {
458                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
459         }
460
461         return handler;
462 }
463
464 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
465 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
466         const struct ast_sip_subscription_handler *handler)
467 {
468         pjsip_accept_hdr *accept_header;
469         char accept[AST_SIP_MAX_ACCEPT][64];
470         size_t num_accept_headers;
471
472         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
473         if (accept_header) {
474                 int i;
475
476                 for (i = 0; i < accept_header->count; ++i) {
477                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
478                 }
479                 num_accept_headers = accept_header->count;
480         } else {
481                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
482                  * the default accept type for the event package is to be used.
483                  */
484                 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
485                 num_accept_headers = 1;
486         }
487
488         return find_body_generator(accept, num_accept_headers);
489 }
490
491 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
492                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
493                 struct ast_sip_pubsub_body_generator *generator);
494
495 /*! \brief Callback function to perform the actual recreation of a subscription */
496 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
497 {
498         struct subscription_persistence *persistence = obj;
499         pj_pool_t *pool = arg;
500         pjsip_rx_data rdata = { { 0, }, };
501         pjsip_expires_hdr *expires_header;
502         struct ast_sip_subscription_handler *handler;
503         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
504         struct ast_sip_subscription *sub;
505         struct ast_sip_pubsub_body_generator *generator;
506         int resp;
507         char *resource;
508         size_t resource_size;
509         pjsip_sip_uri *request_uri;
510
511         /* If this subscription has already expired remove it */
512         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
513                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
514                 return 0;
515         }
516
517         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
518         if (!endpoint) {
519                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
520                         persistence->endpoint);
521                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
522                 return 0;
523         }
524
525         pj_pool_reset(pool);
526         rdata.tp_info.pool = pool;
527
528         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
529                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
530                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
531                         persistence->endpoint);
532                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
533                 return 0;
534         }
535
536         request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
537         resource_size = pj_strlen(&request_uri->user) + 1;
538         resource = alloca(resource_size);
539         ast_copy_pj_str(resource, &request_uri->user, resource_size);
540
541         /* Update the expiration header with the new expiration */
542         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
543         if (!expires_header) {
544                 expires_header = pjsip_expires_hdr_create(pool, 0);
545                 if (!expires_header) {
546                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
547                         return 0;
548                 }
549                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
550         }
551         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
552
553         handler = subscription_get_handler_from_rdata(&rdata);
554         if (!handler || !handler->notifier) {
555                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
556                 return 0;
557         }
558
559         generator = subscription_get_generator_from_rdata(&rdata, handler);
560         if (!generator) {
561                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
562                 return 0;
563         }
564
565         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
566                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
567
568         resp = handler->notifier->new_subscribe(endpoint, resource);
569         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
570                 sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
571                 sub->persistence = ao2_bump(persistence);
572                 subscription_persistence_update(sub, &rdata);
573         } else {
574                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
575         }
576
577         return 0;
578 }
579
580 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
581 static int subscription_persistence_load(void *data)
582 {
583         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
584                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
585         pj_pool_t *pool;
586
587         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
588                 PJSIP_POOL_RDATA_INC);
589         if (!pool) {
590                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
591                 return 0;
592         }
593
594         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
595
596         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
597
598         ao2_ref(persisted_subscriptions, -1);
599         return 0;
600 }
601
602 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
603 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
604 {
605         struct ast_json_payload *payload;
606         const char *type;
607
608         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
609                 return;
610         }
611
612         payload = stasis_message_data(message);
613         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
614
615         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
616          * recreate SIP subscriptions.
617          */
618         if (strcmp(type, "FullyBooted")) {
619                 return;
620         }
621
622         /* This has to be here so the subscription is recreated when the body generator is available */
623         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
624
625         /* Once the system is fully booted we don't care anymore */
626         stasis_unsubscribe(sub);
627 }
628
629 static void add_subscription(struct ast_sip_subscription *obj)
630 {
631         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
632         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
633         ast_module_ref(ast_module_info->self);
634 }
635
636 static void remove_subscription(struct ast_sip_subscription *obj)
637 {
638         struct ast_sip_subscription *i;
639         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
640         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
641                 if (i == obj) {
642                         AST_RWLIST_REMOVE_CURRENT(next);
643                         ast_module_unref(ast_module_info->self);
644                         break;
645                 }
646         }
647         AST_RWLIST_TRAVERSE_SAFE_END;
648 }
649
650 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
651
652 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
653 {
654         int num = 0;
655         struct ast_sip_subscription *i;
656         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
657
658         if (!on_subscription) {
659                 return num;
660         }
661
662         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
663                 if (on_subscription(i, arg)) {
664                         break;
665                 }
666                 ++num;
667         }
668         return num;
669 }
670
671 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
672                                     struct ast_str **buf)
673 {
674         char str[256];
675         struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
676
677         ast_str_append(buf, 0, "Role: %s\r\n",
678                        sip_subscription_roles_map[sub->role]);
679         ast_str_append(buf, 0, "Endpoint: %s\r\n",
680                        ast_sorcery_object_get_id(sub->endpoint));
681
682         ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
683         ast_str_append(buf, 0, "Callid: %s\r\n", str);
684
685         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
686                                sip_subscription_get_evsub(sub)));
687
688         ast_callerid_merge(str, sizeof(str),
689                            S_COR(id->self.name.valid, id->self.name.str, NULL),
690                            S_COR(id->self.number.valid, id->self.number.str, NULL),
691                            "Unknown");
692
693         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
694
695         if (sub->handler->to_ami) {
696                 sub->handler->to_ami(sub, buf);
697         }
698 }
699
700 #define DATASTORE_BUCKETS 53
701
702 #define DEFAULT_EXPIRES 3600
703
704 static int datastore_hash(const void *obj, int flags)
705 {
706         const struct ast_datastore *datastore = obj;
707         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
708
709         ast_assert(uid != NULL);
710
711         return ast_str_hash(uid);
712 }
713
714 static int datastore_cmp(void *obj, void *arg, int flags)
715 {
716         const struct ast_datastore *datastore1 = obj;
717         const struct ast_datastore *datastore2 = arg;
718         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
719
720         ast_assert(datastore1->uid != NULL);
721         ast_assert(uid2 != NULL);
722
723         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
724 }
725
726 static int subscription_remove_serializer(void *obj)
727 {
728         struct ast_sip_subscription *sub = obj;
729
730         /* This is why we keep the dialog on the subscription. When the subscription
731          * is destroyed, there is no guarantee that the underlying dialog is ready
732          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
733          * either. The dialog could be destroyed before our subscription is. We fix
734          * this problem by keeping a reference to the dialog until it is time to
735          * destroy the subscription. We need to have the dialog available when the
736          * subscription is destroyed so that we can guarantee that our attempt to
737          * remove the serializer will be successful.
738          */
739         ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
740         pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
741
742         return 0;
743 }
744
745 static void subscription_destructor(void *obj)
746 {
747         struct ast_sip_subscription *sub = obj;
748
749         ast_debug(3, "Destroying SIP subscription\n");
750
751         subscription_persistence_remove(sub);
752
753         remove_subscription(sub);
754
755         ao2_cleanup(sub->datastores);
756         ao2_cleanup(sub->endpoint);
757
758         if (sip_subscription_get_dlg(sub)) {
759                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
760         }
761         ast_taskprocessor_unreference(sub->serializer);
762 }
763
764
765 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
766 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
767                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
768 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
769                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
770 static void pubsub_on_client_refresh(pjsip_evsub *sub);
771 static void pubsub_on_server_timeout(pjsip_evsub *sub);
772
773
774 static pjsip_evsub_user pubsub_cb = {
775         .on_evsub_state = pubsub_on_evsub_state,
776         .on_rx_refresh = pubsub_on_rx_refresh,
777         .on_rx_notify = pubsub_on_rx_notify,
778         .on_client_refresh = pubsub_on_client_refresh,
779         .on_server_timeout = pubsub_on_server_timeout,
780 };
781
782 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
783                 struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role)
784 {
785         struct ast_sip_subscription *sub;
786
787         sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
788         if (!sub) {
789                 return NULL;
790         }
791         strcpy(sub->resource, resource); /* Safe */
792
793         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
794         if (!sub->datastores) {
795                 ao2_ref(sub, -1);
796                 return NULL;
797         }
798         sub->serializer = ast_sip_create_serializer();
799         if (!sub->serializer) {
800                 ao2_ref(sub, -1);
801                 return NULL;
802         }
803         sub->role = role;
804         sub->type = SIP_SUBSCRIPTION_REAL;
805         sub->endpoint = ao2_bump(endpoint);
806         sub->handler = handler;
807
808         return sub;
809 }
810
811 static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
812 {
813         /* We keep a reference to the dialog until our subscription is destroyed. See
814          * the subscription_destructor for more details
815          */
816         pjsip_dlg_inc_session(dlg, &pubsub_module);
817         sub->reality.real.dlg = dlg;
818         ast_sip_dialog_set_serializer(dlg, sub->serializer);
819         pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
820 }
821
822 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
823                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
824                 struct ast_sip_pubsub_body_generator *generator)
825 {
826         struct ast_sip_subscription *sub;
827         pjsip_dialog *dlg;
828         struct subscription_persistence *persistence;
829
830         sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER);
831         if (!sub) {
832                 return NULL;
833         }
834
835         sub->body_generator = generator;
836         dlg = ast_sip_create_dialog_uas(endpoint, rdata);
837         if (!dlg) {
838                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
839                 ao2_ref(sub, -1);
840                 return NULL;
841         }
842
843         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
844                         pubsub_module.id, MOD_DATA_PERSISTENCE);
845         if (persistence) {
846                 /* Update the created dialog with the persisted information */
847                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
848                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
849                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
850                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
851                 dlg->local.cseq = persistence->cseq;
852                 dlg->remote.cseq = persistence->cseq;
853         }
854
855         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub);
856         subscription_setup_dialog(sub, dlg);
857
858         ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
859                         pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
860
861         add_subscription(sub);
862         return sub;
863 }
864
865 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
866 {
867         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
868         pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
869         pj_str_t name;
870
871         pj_cstr(&name, header);
872
873         return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
874 }
875
876 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
877                 struct ast_sip_endpoint *endpoint, const char *resource)
878 {
879         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
880         pjsip_dialog *dlg;
881         struct ast_sip_contact *contact;
882         pj_str_t event;
883         pjsip_tx_data *tdata;
884         pjsip_evsub *evsub;
885
886         sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER);
887         if (!sub) {
888                 return NULL;
889         }
890
891         contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
892         if (!contact || ast_strlen_zero(contact->uri)) {
893                 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
894                                 ast_sorcery_object_get_id(endpoint));
895                 ao2_ref(sub, -1);
896                 ao2_cleanup(contact);
897                 return NULL;
898         }
899
900         dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
901         ao2_cleanup(contact);
902         if (!dlg) {
903                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
904                 ao2_ref(sub, -1);
905                 return NULL;
906         }
907
908         pj_cstr(&event, handler->event_name);
909         pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub);
910         subscription_setup_dialog(sub, dlg);
911
912         add_subscription(sub);
913
914         evsub = sip_subscription_get_evsub(sub);
915
916         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
917                 pjsip_evsub_send_request(evsub, tdata);
918         } else {
919                 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
920                  * being called and terminating the subscription. Therefore, we don't
921                  * need to decrease the reference count of sub here.
922                  */
923                 pjsip_evsub_terminate(evsub, PJ_TRUE);
924                 return NULL;
925         }
926
927         return sub;
928 }
929
930 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
931 {
932         ast_assert(sub->endpoint != NULL);
933         ao2_ref(sub->endpoint, +1);
934         return sub->endpoint;
935 }
936
937 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
938 {
939         ast_assert(sub->serializer != NULL);
940         return sub->serializer;
941 }
942
943 static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
944 {
945         struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
946         int res;
947
948         ao2_ref(sub, +1);
949         res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub),
950                         tdata) == PJ_SUCCESS ? 0 : -1;
951
952         subscription_persistence_update(sub, NULL);
953
954         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
955                 "StateText: %s\r\n"
956                 "Endpoint: %s\r\n",
957                 pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)),
958                 ast_sorcery_object_get_id(endpoint));
959         ao2_cleanup(sub);
960         ao2_cleanup(endpoint);
961
962         return res;
963 }
964
965 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data,
966                 int terminate)
967 {
968         struct ast_sip_body body = {
969                 .type = ast_sip_subscription_get_body_type(sub),
970                 .subtype = ast_sip_subscription_get_body_subtype(sub),
971         };
972         struct ast_str *body_text = ast_str_create(64);
973         pjsip_evsub *evsub = sip_subscription_get_evsub(sub);
974         pjsip_tx_data *tdata;
975         pjsip_evsub_state state;
976
977         if (!body_text) {
978                 return -1;
979         }
980
981         if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) {
982                 ast_free(body_text);
983                 return -1;
984         }
985
986         body.body_text = ast_str_buffer(body_text);
987
988         if (terminate) {
989                 state = PJSIP_EVSUB_STATE_TERMINATED;
990         } else {
991                 state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ?
992                         PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED;
993         }
994
995         ast_log_backtrace();
996
997         if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) {
998                 ast_free(body_text);
999                 return -1;
1000         }
1001         if (ast_sip_add_body(tdata, &body)) {
1002                 ast_free(body_text);
1003                 pjsip_tx_data_dec_ref(tdata);
1004                 return -1;
1005         }
1006         if (sip_subscription_send_request(sub, tdata)) {
1007                 ast_free(body_text);
1008                 pjsip_tx_data_dec_ref(tdata);
1009                 return -1;
1010         }
1011
1012         return 0;
1013 }
1014
1015 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1016 {
1017         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1018         ast_copy_pj_str(buf, &dlg->local.info_str, size);
1019 }
1020
1021 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1022 {
1023         pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1024         ast_copy_pj_str(buf, &dlg->remote.info_str, size);
1025 }
1026
1027 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
1028 {
1029         return sub->resource;
1030 }
1031
1032 static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
1033 {
1034         /* If this is a persistence recreation the subscription has already been accepted */
1035         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
1036                 return 0;
1037         }
1038
1039         return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
1040 }
1041
1042 static void subscription_datastore_destroy(void *obj)
1043 {
1044         struct ast_datastore *datastore = obj;
1045
1046         /* Using the destroy function (if present) destroy the data */
1047         if (datastore->info->destroy != NULL && datastore->data != NULL) {
1048                 datastore->info->destroy(datastore->data);
1049                 datastore->data = NULL;
1050         }
1051
1052         ast_free((void *) datastore->uid);
1053         datastore->uid = NULL;
1054 }
1055
1056 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
1057 {
1058         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
1059         const char *uid_ptr = uid;
1060
1061         if (!info) {
1062                 return NULL;
1063         }
1064
1065         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
1066         if (!datastore) {
1067                 return NULL;
1068         }
1069
1070         datastore->info = info;
1071         if (ast_strlen_zero(uid)) {
1072                 /* They didn't provide an ID so we'll provide one ourself */
1073                 struct ast_uuid *uuid = ast_uuid_generate();
1074                 char uuid_buf[AST_UUID_STR_LEN];
1075                 if (!uuid) {
1076                         return NULL;
1077                 }
1078                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
1079                 ast_free(uuid);
1080         }
1081
1082         datastore->uid = ast_strdup(uid_ptr);
1083         if (!datastore->uid) {
1084                 return NULL;
1085         }
1086
1087         ao2_ref(datastore, +1);
1088         return datastore;
1089 }
1090
1091 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
1092 {
1093         ast_assert(datastore != NULL);
1094         ast_assert(datastore->info != NULL);
1095         ast_assert(!ast_strlen_zero(datastore->uid));
1096
1097         if (!ao2_link(subscription->datastores, datastore)) {
1098                 return -1;
1099         }
1100         return 0;
1101 }
1102
1103 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
1104 {
1105         return ao2_find(subscription->datastores, name, OBJ_KEY);
1106 }
1107
1108 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
1109 {
1110         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
1111 }
1112
1113 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
1114 {
1115         ast_assert(datastore != NULL);
1116         ast_assert(datastore->info != NULL);
1117         ast_assert(!ast_strlen_zero(datastore->uid));
1118
1119         if (!ao2_link(publication->datastores, datastore)) {
1120                 return -1;
1121         }
1122         return 0;
1123 }
1124
1125 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
1126 {
1127         return ao2_find(publication->datastores, name, OBJ_KEY);
1128 }
1129
1130 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
1131 {
1132         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
1133 }
1134
1135 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
1136
1137 static int publication_hash_fn(const void *obj, const int flags)
1138 {
1139         const struct ast_sip_publication *publication = obj;
1140         const int *entity_tag = obj;
1141
1142         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
1143 }
1144
1145 static int publication_cmp_fn(void *obj, void *arg, int flags)
1146 {
1147         const struct ast_sip_publication *publication1 = obj;
1148         const struct ast_sip_publication *publication2 = arg;
1149         const int *entity_tag = arg;
1150
1151         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
1152                 CMP_MATCH | CMP_STOP : 0);
1153 }
1154
1155 static void publish_add_handler(struct ast_sip_publish_handler *handler)
1156 {
1157         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1158         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
1159 }
1160
1161 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
1162 {
1163         if (ast_strlen_zero(handler->event_name)) {
1164                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
1165                 return -1;
1166         }
1167
1168         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
1169                 publication_hash_fn, publication_cmp_fn))) {
1170                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
1171                         handler->event_name);
1172                 return -1;
1173         }
1174
1175         publish_add_handler(handler);
1176
1177         ast_module_ref(ast_module_info->self);
1178
1179         return 0;
1180 }
1181
1182 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
1183 {
1184         struct ast_sip_publish_handler *iter;
1185         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1186         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
1187                 if (handler == iter) {
1188                         AST_RWLIST_REMOVE_CURRENT(next);
1189                         ao2_cleanup(handler->publications);
1190                         ast_module_unref(ast_module_info->self);
1191                         break;
1192                 }
1193         }
1194         AST_RWLIST_TRAVERSE_SAFE_END;
1195 }
1196
1197 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
1198
1199 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
1200 {
1201         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1202         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
1203         ast_module_ref(ast_module_info->self);
1204 }
1205
1206 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
1207 {
1208         struct ast_sip_subscription_handler *iter;
1209         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1210
1211         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
1212                 if (!strcmp(iter->event_name, event_name)) {
1213                         break;
1214                 }
1215         }
1216         return iter;
1217 }
1218
1219 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
1220 {
1221         pj_str_t event;
1222         pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
1223         struct ast_sip_subscription_handler *existing;
1224         int i = 0;
1225
1226         if (ast_strlen_zero(handler->event_name)) {
1227                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
1228                 return -1;
1229         }
1230
1231         existing = find_sub_handler_for_event_name(handler->event_name);
1232         if (existing) {
1233                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
1234                                 "A handler is already registered\n", handler->event_name);
1235                 return -1;
1236         }
1237
1238         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
1239                 pj_cstr(&accept[i], handler->accept[i]);
1240         }
1241
1242         pj_cstr(&event, handler->event_name);
1243
1244         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
1245
1246         sub_add_handler(handler);
1247
1248         return 0;
1249 }
1250
1251 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
1252 {
1253         struct ast_sip_subscription_handler *iter;
1254         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1255         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
1256                 if (handler == iter) {
1257                         AST_RWLIST_REMOVE_CURRENT(next);
1258                         ast_module_unref(ast_module_info->self);
1259                         break;
1260                 }
1261         }
1262         AST_RWLIST_TRAVERSE_SAFE_END;
1263 }
1264
1265 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
1266                 const char *content_subtype)
1267 {
1268         struct ast_sip_pubsub_body_generator *iter;
1269         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1270
1271         AST_LIST_TRAVERSE(&body_generators, iter, list) {
1272                 if (!strcmp(iter->type, content_type) &&
1273                                 !strcmp(iter->subtype, content_subtype)) {
1274                         break;
1275                 }
1276         };
1277
1278         return iter;
1279 }
1280
1281 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
1282 {
1283         char *accept_copy = ast_strdupa(accept);
1284         char *subtype = accept_copy;
1285         char *type = strsep(&subtype, "/");
1286
1287         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
1288                 return NULL;
1289         }
1290
1291         return find_body_generator_type_subtype(type, subtype);
1292 }
1293
1294 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
1295                 size_t num_accept)
1296 {
1297         int i;
1298         struct ast_sip_pubsub_body_generator *generator = NULL;
1299
1300         for (i = 0; i < num_accept; ++i) {
1301                 generator = find_body_generator_accept(accept[i]);
1302                 if (generator) {
1303                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
1304                         break;
1305                 } else {
1306                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
1307                 }
1308         }
1309
1310         return generator;
1311 }
1312
1313 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
1314 {
1315         pjsip_expires_hdr *expires_header;
1316         struct ast_sip_subscription_handler *handler;
1317         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1318         struct ast_sip_subscription *sub;
1319         struct ast_sip_pubsub_body_generator *generator;
1320         char *resource;
1321         pjsip_uri *request_uri;
1322         pjsip_sip_uri *request_uri_sip;
1323         size_t resource_size;
1324         int resp;
1325
1326         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1327         ast_assert(endpoint != NULL);
1328
1329         if (!endpoint->subscription.allow) {
1330                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
1331                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
1332                 return PJ_TRUE;
1333         }
1334
1335         request_uri = rdata->msg_info.msg->line.req.uri;
1336
1337         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1338                 char uri_str[PJSIP_MAX_URL_SIZE];
1339
1340                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1341                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1342                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1343                 return PJ_TRUE;
1344         }
1345
1346         request_uri_sip = pjsip_uri_get_uri(request_uri);
1347         resource_size = pj_strlen(&request_uri_sip->user) + 1;
1348         resource = alloca(resource_size);
1349         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
1350
1351         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
1352
1353         if (expires_header) {
1354                 if (expires_header->ivalue == 0) {
1355                         ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
1356                                 ast_sorcery_object_get_id(endpoint));
1357                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1358                                 return PJ_TRUE;
1359                 }
1360                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
1361                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
1362                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
1363                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
1364                         return PJ_TRUE;
1365                 }
1366         }
1367
1368         handler = subscription_get_handler_from_rdata(rdata);
1369         if (!handler) {
1370                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1371                 return PJ_TRUE;
1372         }
1373
1374         generator = subscription_get_generator_from_rdata(rdata, handler);
1375         if (!generator) {
1376                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1377                 return PJ_TRUE;
1378         }
1379
1380         resp = handler->notifier->new_subscribe(endpoint, resource);
1381         if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1382                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1383                 return PJ_TRUE;
1384         }
1385
1386         sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
1387         if (!sub) {
1388                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1389         } else {
1390                 sub->persistence = subscription_persistence_create(sub);
1391                 subscription_persistence_update(sub, rdata);
1392                 sip_subscription_accept(sub, rdata, resp);
1393                 if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) {
1394                         pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE);
1395                 }
1396         }
1397
1398         return PJ_TRUE;
1399 }
1400
1401 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
1402 {
1403         struct ast_sip_publish_handler *iter = NULL;
1404         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1405
1406         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
1407                 if (strcmp(event, iter->event_name)) {
1408                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
1409                         continue;
1410                 }
1411                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
1412                 break;
1413         }
1414
1415         return iter;
1416 }
1417
1418 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
1419         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
1420 {
1421         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1422
1423         if (etag_hdr) {
1424                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
1425
1426                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
1427
1428                 if (sscanf(etag, "%30d", entity_id) != 1) {
1429                         return SIP_PUBLISH_UNKNOWN;
1430                 }
1431         }
1432
1433         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1434
1435         if (!(*expires)) {
1436                 return SIP_PUBLISH_REMOVE;
1437         } else if (!etag_hdr && rdata->msg_info.msg->body) {
1438                 return SIP_PUBLISH_INITIAL;
1439         } else if (etag_hdr && !rdata->msg_info.msg->body) {
1440                 return SIP_PUBLISH_REFRESH;
1441         } else if (etag_hdr && rdata->msg_info.msg->body) {
1442                 return SIP_PUBLISH_MODIFY;
1443         }
1444
1445         return SIP_PUBLISH_UNKNOWN;
1446 }
1447
1448 /*! \brief Internal destructor for publications */
1449 static void publication_destroy_fn(void *obj)
1450 {
1451         struct ast_sip_publication *publication = obj;
1452
1453         ast_debug(3, "Destroying SIP publication\n");
1454
1455         ao2_cleanup(publication->datastores);
1456         ao2_cleanup(publication->endpoint);
1457 }
1458
1459 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1460 {
1461         struct ast_sip_publication *publication;
1462         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1463
1464         ast_assert(endpoint != NULL);
1465
1466         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
1467                 return NULL;
1468         }
1469
1470         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1471                 ao2_ref(publication, -1);
1472                 return NULL;
1473         }
1474
1475         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1476         ao2_ref(endpoint, +1);
1477         publication->endpoint = endpoint;
1478         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1479         publication->sched_id = -1;
1480
1481         return publication;
1482 }
1483
1484 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
1485                 pjsip_rx_data *rdata)
1486 {
1487         pj_status_t status;
1488         pjsip_tx_data *tdata;
1489         pjsip_transaction *tsx;
1490
1491         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
1492                 return -1;
1493         }
1494
1495         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1496                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1497                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1498
1499                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1500                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1501                         pjsip_tx_data_dec_ref(tdata);
1502                         return -1;
1503                 }
1504
1505                 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
1506                 ast_sip_add_header(tdata, "Expires", expires);
1507         }
1508
1509         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1510                 return -1;
1511         }
1512
1513         pjsip_tsx_recv_msg(tsx, rdata);
1514
1515         if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
1516                 return -1;
1517         }
1518
1519         return 0;
1520 }
1521
1522 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1523         struct ast_sip_publish_handler *handler)
1524 {
1525         struct ast_sip_publication *publication;
1526         char *resource;
1527         size_t resource_size;
1528         pjsip_uri *request_uri;
1529         pjsip_sip_uri *request_uri_sip;
1530         int resp;
1531
1532         request_uri = rdata->msg_info.msg->line.req.uri;
1533
1534         if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1535                 char uri_str[PJSIP_MAX_URL_SIZE];
1536
1537                 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1538                 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1539                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1540                 return NULL;
1541         }
1542
1543         request_uri_sip = pjsip_uri_get_uri(request_uri);
1544         resource_size = pj_strlen(&request_uri_sip->user) + 1;
1545         resource = alloca(resource_size);
1546         ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
1547
1548         resp = handler->new_publication(endpoint, resource);
1549
1550         if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1551                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1552                 return NULL;
1553         }
1554
1555         publication = sip_create_publication(endpoint, rdata);
1556
1557         if (!publication) {
1558                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
1559                 return NULL;
1560         }
1561
1562         publication->handler = handler;
1563         if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
1564                         AST_SIP_PUBLISH_STATE_INITIALIZED)) {
1565                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1566                 ao2_cleanup(publication);
1567                 return NULL;
1568         }
1569
1570         sip_publication_respond(publication, resp, rdata);
1571
1572         return publication;
1573 }
1574
1575 static int publish_expire_callback(void *data)
1576 {
1577         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
1578
1579         publication->handler->publish_expire(publication);
1580
1581         return 0;
1582 }
1583
1584 static int publish_expire(const void *data)
1585 {
1586         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
1587
1588         ao2_unlink(publication->handler->publications, publication);
1589         publication->sched_id = -1;
1590
1591         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
1592                 ao2_cleanup(publication);
1593         }
1594
1595         return 0;
1596 }
1597
1598 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
1599 {
1600         pjsip_event_hdr *event_header;
1601         struct ast_sip_publish_handler *handler;
1602         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1603         char event[32];
1604         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
1605         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
1606         enum sip_publish_type publish_type;
1607         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
1608         int expires = 0, entity_id;
1609
1610         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1611         ast_assert(endpoint != NULL);
1612
1613         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
1614         if (!event_header) {
1615                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
1616                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1617                 return PJ_TRUE;
1618         }
1619         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
1620
1621         handler = find_pub_handler(event);
1622         if (!handler) {
1623                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
1624                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1625                 return PJ_TRUE;
1626         }
1627
1628         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
1629
1630         /* If this is not an initial publish ensure that a publication is present */
1631         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
1632                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
1633                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
1634
1635                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
1636                                 NULL, NULL);
1637                         return PJ_TRUE;
1638                 }
1639
1640                 /* Per the RFC every response has to have a new entity tag */
1641                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1642
1643                 /* Update the expires here so that the created responses will contain the correct value */
1644                 publication->expires = expires;
1645         }
1646
1647         switch (publish_type) {
1648                 case SIP_PUBLISH_INITIAL:
1649                         publication = publish_request_initial(endpoint, rdata, handler);
1650                         break;
1651                 case SIP_PUBLISH_REFRESH:
1652                         sip_publication_respond(publication, 200, rdata);
1653                 case SIP_PUBLISH_MODIFY:
1654                         if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
1655                                                 AST_SIP_PUBLISH_STATE_ACTIVE)) {
1656                                 /* If an error occurs we want to terminate the publication */
1657                                 expires = 0;
1658                         }
1659                         sip_publication_respond(publication, 200, rdata);
1660                         break;
1661                 case SIP_PUBLISH_REMOVE:
1662                         handler->publication_state_change(publication, rdata->msg_info.msg->body,
1663                                         AST_SIP_PUBLISH_STATE_TERMINATED);
1664                         sip_publication_respond(publication, 200, rdata);
1665                         break;
1666                 case SIP_PUBLISH_UNKNOWN:
1667                 default:
1668                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1669                         break;
1670         }
1671
1672         if (publication) {
1673                 if (expires) {
1674                         ao2_link(handler->publications, publication);
1675
1676                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1677                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1678                 } else {
1679                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1680                 }
1681         }
1682
1683         return PJ_TRUE;
1684 }
1685
1686 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1687 {
1688         return pub->endpoint;
1689 }
1690
1691
1692 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1693 {
1694         struct ast_sip_pubsub_body_generator *existing;
1695         pj_str_t accept;
1696         pj_size_t accept_len;
1697
1698         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1699         if (existing) {
1700                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1701                                 "One is already registered.\n", generator->type, generator->subtype);
1702                 return -1;
1703         }
1704
1705         AST_RWLIST_WRLOCK(&body_generators);
1706         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1707         AST_RWLIST_UNLOCK(&body_generators);
1708
1709         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1710          * null-terminated, so there is no need to allocate for the extra null
1711          * byte
1712          */
1713         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1714
1715         accept.ptr = alloca(accept_len);
1716         accept.slen = accept_len;
1717         /* Safe use of sprintf */
1718         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1719         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1720                         PJSIP_H_ACCEPT, NULL, 1, &accept);
1721
1722         return 0;
1723 }
1724
1725 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1726 {
1727         struct ast_sip_pubsub_body_generator *iter;
1728         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1729
1730         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1731                 if (iter == generator) {
1732                         AST_LIST_REMOVE_CURRENT(list);
1733                         break;
1734                 }
1735         }
1736         AST_RWLIST_TRAVERSE_SAFE_END;
1737 }
1738
1739 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1740 {
1741         AST_RWLIST_WRLOCK(&body_supplements);
1742         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1743         AST_RWLIST_UNLOCK(&body_supplements);
1744
1745         return 0;
1746 }
1747
1748 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1749 {
1750         struct ast_sip_pubsub_body_supplement *iter;
1751         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1752
1753         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1754                 if (iter == supplement) {
1755                         AST_LIST_REMOVE_CURRENT(list);
1756                         break;
1757                 }
1758         }
1759         AST_RWLIST_TRAVERSE_SAFE_END;
1760 }
1761
1762 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1763 {
1764         return sub->body_generator->type;
1765 }
1766
1767 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1768 {
1769         return sub->body_generator->subtype;
1770 }
1771
1772 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1773                 void *data, struct ast_str **str)
1774 {
1775         struct ast_sip_pubsub_body_supplement *supplement;
1776         struct ast_sip_pubsub_body_generator *generator;
1777         int res = 0;
1778         void *body;
1779
1780         generator = find_body_generator_type_subtype(type, subtype);
1781         if (!generator) {
1782                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1783                                 type, subtype);
1784                 return -1;
1785         }
1786
1787         body = generator->allocate_body(data);
1788         if (!body) {
1789                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1790                                 type, subtype);
1791                 return -1;
1792         }
1793
1794         if (generator->generate_body_content(body, data)) {
1795                 res = -1;
1796                 goto end;
1797         }
1798
1799         AST_RWLIST_RDLOCK(&body_supplements);
1800         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1801                 if (!strcmp(generator->type, supplement->type) &&
1802                                 !strcmp(generator->subtype, supplement->subtype)) {
1803                         res = supplement->supplement_body(body, data);
1804                         if (res) {
1805                                 break;
1806                         }
1807                 }
1808         }
1809         AST_RWLIST_UNLOCK(&body_supplements);
1810
1811         if (!res) {
1812                 generator->to_string(body, str);
1813         }
1814
1815 end:
1816         if (generator->destroy_body) {
1817                 generator->destroy_body(body);
1818         }
1819
1820         return res;
1821 }
1822
1823 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1824 {
1825         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1826                 return pubsub_on_rx_subscribe_request(rdata);
1827         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1828                 return pubsub_on_rx_publish_request(rdata);
1829         }
1830
1831         return PJ_FALSE;
1832 }
1833
1834 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1835 {
1836         struct ast_sip_subscription *sub;
1837         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1838                 return;
1839         }
1840
1841         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1842         if (!sub) {
1843                 return;
1844         }
1845
1846         if (sub->handler->subscription_shutdown) {
1847                 sub->handler->subscription_shutdown(sub);
1848         }
1849         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1850 }
1851
1852 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1853                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1854 {
1855         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1856         enum ast_sip_subscription_notify_reason reason;
1857
1858         if (!sub) {
1859                 return;
1860         }
1861
1862         if (pjsip_evsub_get_state(sip_subscription_get_evsub(sub)) == PJSIP_EVSUB_STATE_TERMINATED) {
1863                 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED;
1864         } else {
1865                 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED;
1866         }
1867         if (sub->handler->notifier->notify_required(sub, reason)) {
1868                 *p_st_code = 500;
1869         }
1870 }
1871
1872 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1873                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1874 {
1875         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1876
1877         if (!sub) {
1878                 return;
1879         }
1880
1881         sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
1882                         pjsip_evsub_get_state(evsub));
1883 }
1884
1885 static int serialized_pubsub_on_client_refresh(void *userdata)
1886 {
1887         struct ast_sip_subscription *sub = userdata;
1888         pjsip_evsub *evsub;
1889         pjsip_tx_data *tdata;
1890
1891         evsub = sip_subscription_get_evsub(sub);
1892
1893         if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1894                 pjsip_evsub_send_request(evsub, tdata);
1895         } else {
1896                 pjsip_evsub_terminate(evsub, PJ_TRUE);
1897                 return 0;
1898         }
1899         ao2_cleanup(sub);
1900         return 0;
1901 }
1902
1903 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1904 {
1905         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1906
1907         ao2_ref(sub, +1);
1908         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1909 }
1910
1911 static int serialized_pubsub_on_server_timeout(void *userdata)
1912 {
1913         struct ast_sip_subscription *sub = userdata;
1914
1915         sub->handler->notifier->notify_required(sub,
1916                         AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED);
1917
1918         ao2_cleanup(sub);
1919         return 0;
1920 }
1921
1922 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1923 {
1924         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1925
1926         if (!sub) {
1927                 /* if a subscription has been terminated and the subscription
1928                    timeout/expires is less than the time it takes for all pending
1929                    transactions to end then the subscription timer will not have
1930                    been canceled yet and sub will be null, so do nothing since
1931                    the subscription has already been terminated. */
1932                 return;
1933         }
1934
1935         ao2_ref(sub, +1);
1936         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1937 }
1938
1939 static int ami_subscription_detail(struct ast_sip_subscription *sub,
1940                                    struct ast_sip_ami *ami,
1941                                    const char *event)
1942 {
1943         RAII_VAR(struct ast_str *, buf,
1944                  ast_sip_create_ami_event(event, ami), ast_free);
1945
1946         if (!buf) {
1947                 return -1;
1948         }
1949
1950         sip_subscription_to_ami(sub, &buf);
1951         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
1952         return 0;
1953 }
1954
1955 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
1956 {
1957         return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
1958                 sub, arg, "InboundSubscriptionDetail") : 0;
1959 }
1960
1961 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
1962 {
1963         return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
1964                 sub, arg, "OutboundSubscriptionDetail") : 0;
1965 }
1966
1967 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
1968 {
1969         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
1970         int num;
1971
1972         astman_send_listack(s, m, "Following are Events for "
1973                             "each inbound Subscription", "start");
1974
1975         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
1976
1977         astman_append(s, "Event: InboundSubscriptionDetailComplete\r\n");
1978         if (!ast_strlen_zero(ami.action_id)) {
1979                 astman_append(s, "ActionID: %s\r\n", ami.action_id);
1980         }
1981         astman_append(s, "EventList: Complete\r\n"
1982                       "ListItems: %d\r\n\r\n", num);
1983         return 0;
1984 }
1985
1986 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
1987 {
1988         struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
1989         int num;
1990
1991         astman_send_listack(s, m, "Following are Events for "
1992                             "each outbound Subscription", "start");
1993
1994         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
1995
1996         astman_append(s, "Event: OutboundSubscriptionDetailComplete\r\n");
1997         if (!ast_strlen_zero(ami.action_id)) {
1998                 astman_append(s, "ActionID: %s\r\n", ami.action_id);
1999         }
2000         astman_append(s, "EventList: Complete\r\n"
2001                       "ListItems: %d\r\n\r\n", num);
2002         return 0;
2003 }
2004
2005 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
2006 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
2007
2008 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2009 {
2010         struct subscription_persistence *persistence = obj;
2011
2012         persistence->endpoint = ast_strdup(var->value);
2013         return 0;
2014 }
2015
2016 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
2017 {
2018         const struct subscription_persistence *persistence = obj;
2019
2020         *buf = ast_strdup(persistence->endpoint);
2021         return 0;
2022 }
2023
2024 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2025 {
2026         struct subscription_persistence *persistence = obj;
2027
2028         persistence->tag = ast_strdup(var->value);
2029         return 0;
2030 }
2031
2032 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
2033 {
2034         const struct subscription_persistence *persistence = obj;
2035
2036         *buf = ast_strdup(persistence->tag);
2037         return 0;
2038 }
2039
2040 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2041 {
2042         struct subscription_persistence *persistence = obj;
2043         return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
2044 }
2045
2046 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
2047 {
2048         const struct subscription_persistence *persistence = obj;
2049         return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
2050 }
2051
2052 static int load_module(void)
2053 {
2054         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
2055         struct ast_sorcery *sorcery = ast_sip_get_sorcery();
2056
2057         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
2058
2059         if (!(sched = ast_sched_context_create())) {
2060                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
2061                 return AST_MODULE_LOAD_FAILURE;
2062         }
2063
2064         if (ast_sched_start_thread(sched)) {
2065                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
2066                 ast_sched_context_destroy(sched);
2067                 return AST_MODULE_LOAD_FAILURE;
2068         }
2069
2070         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
2071
2072         if (ast_sip_register_service(&pubsub_module)) {
2073                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
2074                 ast_sched_context_destroy(sched);
2075                 return AST_MODULE_LOAD_FAILURE;
2076         }
2077
2078         ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
2079         ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
2080         if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
2081                 NULL, NULL)) {
2082                 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
2083                 ast_sip_unregister_service(&pubsub_module);
2084                 ast_sched_context_destroy(sched);
2085                 return AST_MODULE_LOAD_FAILURE;
2086         }
2087         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
2088                 CHARFLDSET(struct subscription_persistence, packet));
2089         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
2090                 CHARFLDSET(struct subscription_persistence, src_name));
2091         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
2092                 FLDSET(struct subscription_persistence, src_port));
2093         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
2094                 CHARFLDSET(struct subscription_persistence, transport_key));
2095         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
2096                 CHARFLDSET(struct subscription_persistence, local_name));
2097         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
2098                 FLDSET(struct subscription_persistence, local_port));
2099         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
2100                 FLDSET(struct subscription_persistence, cseq));
2101         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
2102                 persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
2103         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
2104                 persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
2105         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
2106                 persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
2107
2108         if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
2109                 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
2110         } else {
2111                 stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
2112         }
2113
2114         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
2115                                  ami_show_subscriptions_inbound);
2116         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
2117                                  ami_show_subscriptions_outbound);
2118
2119         return AST_MODULE_LOAD_SUCCESS;
2120 }
2121
2122 static int unload_module(void)
2123 {
2124         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
2125         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
2126
2127         if (sched) {
2128                 ast_sched_context_destroy(sched);
2129         }
2130
2131         return 0;
2132 }
2133
2134 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
2135                 .load = load_module,
2136                 .unload = unload_module,
2137                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
2138 );