Fix build in devmode for GCC 4.10
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47
48 /*** DOCUMENTATION
49         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
50                 <synopsis>
51                         Lists subscriptions.
52                 </synopsis>
53                 <syntax />
54                 <description>
55                         <para>
56                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
57                         is issued for each subscription object.  Once all detail events are completed an
58                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
59                         </para>
60                 </description>
61         </manager>
62         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
63                 <synopsis>
64                         Lists subscriptions.
65                 </synopsis>
66                 <syntax />
67                 <description>
68                         <para>
69                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
70                         is issued for each subscription object.  Once all detail events are completed an
71                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
72                         </para>
73                 </description>
74         </manager>
75         <configInfo name="res_pjsip_pubsub" language="en_US">
76                 <synopsis>Module that implements publish and subscribe support.</synopsis>
77                 <configFile name="pjsip.conf">
78                         <configObject name="subscription_persistence">
79                                 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
80                                 <configOption name="packet">
81                                         <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
82                                 </configOption>
83                                 <configOption name="src_name">
84                                         <synopsis>The source address of the subscription</synopsis>
85                                 </configOption>
86                                 <configOption name="src_port">
87                                         <synopsis>The source port of the subscription</synopsis>
88                                 </configOption>
89                                 <configOption name="transport_key">
90                                         <synopsis>The type of transport the subscription was received on</synopsis>
91                                 </configOption>
92                                 <configOption name="local_name">
93                                         <synopsis>The local address the subscription was received on</synopsis>
94                                 </configOption>
95                                 <configOption name="local_port">
96                                         <synopsis>The local port the subscription was received on</synopsis>
97                                 </configOption>
98                                 <configOption name="cseq">
99                                         <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
100                                 </configOption>
101                                 <configOption name="tag">
102                                         <synopsis>The local tag of the dialog for the subscription</synopsis>
103                                 </configOption>
104                                 <configOption name="endpoint">
105                                         <synopsis>The name of the endpoint that subscribed</synopsis>
106                                 </configOption>
107                                 <configOption name="expires">
108                                         <synopsis>The time at which the subscription expires</synopsis>
109                                 </configOption>
110                         </configObject>
111                 </configFile>
112         </configInfo>
113  ***/
114
115 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
116
117 static struct pjsip_module pubsub_module = {
118         .name = { "PubSub Module", 13 },
119         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
120         .on_rx_request = pubsub_on_rx_request,
121 };
122
123 #define MOD_DATA_BODY_GENERATOR "sub_body_generator"
124 #define MOD_DATA_PERSISTENCE "sub_persistence"
125
126 static const pj_str_t str_event_name = { "Event", 5 };
127
128 /*! \brief Scheduler used for automatically expiring publications */
129 static struct ast_sched_context *sched;
130
131 /*! \brief Number of buckets for publications (on a per handler) */
132 #define PUBLICATIONS_BUCKETS 37
133
134 /*! \brief Default expiration time for PUBLISH if one is not specified */
135 #define DEFAULT_PUBLISH_EXPIRES 3600
136
137 /*! \brief Defined method for PUBLISH */
138 const pjsip_method pjsip_publish_method =
139 {
140         PJSIP_OTHER_METHOD,
141         { "PUBLISH", 7 }
142 };
143
144 /*!
145  * \brief The types of PUBLISH messages defined in RFC 3903
146  */
147 enum sip_publish_type {
148         /*!
149          * \brief Unknown
150          *
151          * \details
152          * This actually is not defined in RFC 3903. We use this as a constant
153          * to indicate that an incoming PUBLISH does not fit into any of the
154          * other categories and is thus invalid.
155          */
156         SIP_PUBLISH_UNKNOWN,
157
158         /*!
159          * \brief Initial
160          *
161          * \details
162          * The first PUBLISH sent. This will contain a non-zero Expires header
163          * as well as a body that indicates the current state of the endpoint
164          * that has sent the message. The initial PUBLISH is the only type
165          * of PUBLISH to not contain a Sip-If-Match header in it.
166          */
167         SIP_PUBLISH_INITIAL,
168
169         /*!
170          * \brief Refresh
171          *
172          * \details
173          * Used to keep a published state from expiring. This will contain a
174          * non-zero Expires header but no body since its purpose is not to
175          * update state.
176          */
177         SIP_PUBLISH_REFRESH,
178
179         /*!
180          * \brief Modify
181          *
182          * \details
183          * Used to change state from its previous value. This will contain
184          * a body updating the published state. May or may not contain an
185          * Expires header.
186          */
187         SIP_PUBLISH_MODIFY,
188
189         /*!
190          * \brief Remove
191          *
192          * \details
193          * Used to remove published state from an ESC. This will contain
194          * an Expires header set to 0 and likely no body.
195          */
196         SIP_PUBLISH_REMOVE,
197 };
198
199 /*!
200  * Used to create new entity IDs by ESCs.
201  */
202 static int esc_etag_counter;
203
204 /*!
205  * \brief Structure representing a SIP publication
206  */
207 struct ast_sip_publication {
208         /*! Publication datastores set up by handlers */
209         struct ao2_container *datastores;
210         /*! \brief Entity tag for the publication */
211         int entity_tag;
212         /*! \brief Handler for this publication */
213         struct ast_sip_publish_handler *handler;
214         /*! \brief The endpoint with which the subscription is communicating */
215         struct ast_sip_endpoint *endpoint;
216         /*! \brief Expiration time of the publication */
217         int expires;
218         /*! \brief Scheduled item for expiration of publication */
219         int sched_id;
220 };
221
222
223 /*!
224  * \brief Structure used for persisting an inbound subscription
225  */
226 struct subscription_persistence {
227         /*! Sorcery object details */
228         SORCERY_OBJECT(details);
229         /*! The name of the endpoint involved in the subscrption */
230         char *endpoint;
231         /*! SIP message that creates the subscription */
232         char packet[PJSIP_MAX_PKT_LEN];
233         /*! Source address of the message */
234         char src_name[PJ_INET6_ADDRSTRLEN];
235         /*! Source port of the message */
236         int src_port;
237         /*! Local transport key type */
238         char transport_key[32];
239         /*! Local transport address */
240         char local_name[PJ_INET6_ADDRSTRLEN];
241         /*! Local transport port */
242         int local_port;
243         /*! Next CSeq to use for message */
244         unsigned int cseq;
245         /*! Local tag of the dialog */
246         char *tag;
247         /*! When this subscription expires */
248         struct timeval expires;
249 };
250
251 /*!
252  * \brief Structure representing a SIP subscription
253  */
254 struct ast_sip_subscription {
255         /*! Subscription datastores set up by handlers */
256         struct ao2_container *datastores;
257         /*! The endpoint with which the subscription is communicating */
258         struct ast_sip_endpoint *endpoint;
259         /*! Serializer on which to place operations for this subscription */
260         struct ast_taskprocessor *serializer;
261         /*! The handler for this subscription */
262         const struct ast_sip_subscription_handler *handler;
263         /*! The role for this subscription */
264         enum ast_sip_subscription_role role;
265         /*! The underlying PJSIP event subscription structure */
266         pjsip_evsub *evsub;
267         /*! The underlying PJSIP dialog */
268         pjsip_dialog *dlg;
269         /*! Body generaator for NOTIFYs */
270         struct ast_sip_pubsub_body_generator *body_generator;
271         /*! Persistence information */
272         struct subscription_persistence *persistence;
273         /*! Next item in the list */
274         AST_LIST_ENTRY(ast_sip_subscription) next;
275 };
276
277 static const char *sip_subscription_roles_map[] = {
278         [AST_SIP_SUBSCRIBER] = "Subscriber",
279         [AST_SIP_NOTIFIER] = "Notifier"
280 };
281
282 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
283
284 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
285 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
286
287 /*! \brief Destructor for subscription persistence */
288 static void subscription_persistence_destroy(void *obj)
289 {
290         struct subscription_persistence *persistence = obj;
291
292         ast_free(persistence->endpoint);
293         ast_free(persistence->tag);
294 }
295
296 /*! \brief Allocator for subscription persistence */
297 static void *subscription_persistence_alloc(const char *name)
298 {
299         return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
300 }
301
302 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
303 static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
304 {
305         char tag[PJ_GUID_STRING_LENGTH + 1];
306
307         /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
308          * look it up by id at all.
309          */
310         struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
311                 "subscription_persistence", NULL);
312
313         if (!persistence) {
314                 return NULL;
315         }
316
317         persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
318         ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
319         persistence->tag = ast_strdup(tag);
320
321         ast_sorcery_create(ast_sip_get_sorcery(), persistence);
322         return persistence;
323 }
324
325 /*! \brief Function which updates persistence information of a subscription in sorcery */
326 static void subscription_persistence_update(struct ast_sip_subscription *sub,
327         pjsip_rx_data *rdata)
328 {
329         if (!sub->persistence) {
330                 return;
331         }
332
333         sub->persistence->cseq = sub->dlg->local.cseq;
334
335         if (rdata) {
336                 int expires;
337                 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
338
339                 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
340                 sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
341
342                 ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
343                 ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
344                 sub->persistence->src_port = rdata->pkt_info.src_port;
345                 ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
346                         sizeof(sub->persistence->transport_key));
347                 ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
348                         sizeof(sub->persistence->local_name));
349                 sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
350         }
351
352         ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
353 }
354
355 /*! \brief Function which removes persistence of a subscription from sorcery */
356 static void subscription_persistence_remove(struct ast_sip_subscription *sub)
357 {
358         if (!sub->persistence) {
359                 return;
360         }
361
362         ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
363         ao2_ref(sub->persistence, -1);
364 }
365
366
367 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
368 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
369                 size_t num_accept);
370
371 /*! \brief Retrieve a handler using the Event header of an rdata message */
372 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
373 {
374         pjsip_event_hdr *event_header;
375         char event[32];
376         struct ast_sip_subscription_handler *handler;
377
378         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
379         if (!event_header) {
380                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
381                 return NULL;
382         }
383         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
384
385         handler = find_sub_handler_for_event_name(event);
386         if (!handler) {
387                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
388         }
389
390         return handler;
391 }
392
393 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
394 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
395         const struct ast_sip_subscription_handler *handler)
396 {
397         pjsip_accept_hdr *accept_header;
398         char accept[AST_SIP_MAX_ACCEPT][64];
399         size_t num_accept_headers;
400
401         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
402         if (accept_header) {
403                 int i;
404
405                 for (i = 0; i < accept_header->count; ++i) {
406                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
407                 }
408                 num_accept_headers = accept_header->count;
409         } else {
410                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
411                  * the default accept type for the event package is to be used.
412                  */
413                 ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
414                 num_accept_headers = 1;
415         }
416
417         return find_body_generator(accept, num_accept_headers);
418 }
419
420 /*! \brief Callback function to perform the actual recreation of a subscription */
421 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
422 {
423         struct subscription_persistence *persistence = obj;
424         pj_pool_t *pool = arg;
425         pjsip_rx_data rdata = { { 0, }, };
426         pjsip_expires_hdr *expires_header;
427         struct ast_sip_subscription_handler *handler;
428         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
429         struct ast_sip_subscription *sub;
430         struct ast_sip_pubsub_body_generator *generator;
431
432         /* If this subscription has already expired remove it */
433         if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
434                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
435                 return 0;
436         }
437
438         endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
439         if (!endpoint) {
440                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
441                         persistence->endpoint);
442                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
443                 return 0;
444         }
445
446         pj_pool_reset(pool);
447         rdata.tp_info.pool = pool;
448
449         if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
450                 persistence->transport_key, persistence->local_name, persistence->local_port)) {
451                 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
452                         persistence->endpoint);
453                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
454                 return 0;
455         }
456
457         /* Update the expiration header with the new expiration */
458         expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
459         if (!expires_header) {
460                 expires_header = pjsip_expires_hdr_create(pool, 0);
461                 if (!expires_header) {
462                         ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
463                         return 0;
464                 }
465                 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
466         }
467         expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
468
469         handler = subscription_get_handler_from_rdata(&rdata);
470         if (!handler) {
471                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
472                 return 0;
473         }
474
475         generator = subscription_get_generator_from_rdata(&rdata, handler);
476         if (!generator) {
477                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
478                 return 0;
479         }
480
481         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
482                         pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
483         ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
484                         pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
485
486         sub = handler->new_subscribe(endpoint, &rdata);
487         if (sub) {
488                 sub->persistence = ao2_bump(persistence);
489                 subscription_persistence_update(sub, &rdata);
490         } else {
491                 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
492         }
493
494         return 0;
495 }
496
497 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
498 static int subscription_persistence_load(void *data)
499 {
500         struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
501                 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
502         pj_pool_t *pool;
503
504         pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
505                 PJSIP_POOL_RDATA_INC);
506         if (!pool) {
507                 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
508                 return 0;
509         }
510
511         ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
512
513         pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
514
515         ao2_ref(persisted_subscriptions, -1);
516         return 0;
517 }
518
519 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
520 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
521 {
522         struct ast_json_payload *payload;
523         const char *type;
524
525         if (stasis_message_type(message) != ast_manager_get_generic_type()) {
526                 return;
527         }
528
529         payload = stasis_message_data(message);
530         type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
531
532         /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
533          * recreate SIP subscriptions.
534          */
535         if (strcmp(type, "FullyBooted")) {
536                 return;
537         }
538
539         /* This has to be here so the subscription is recreated when the body generator is available */
540         ast_sip_push_task(NULL, subscription_persistence_load, NULL);
541
542         /* Once the system is fully booted we don't care anymore */
543         stasis_unsubscribe(sub);
544 }
545
546 static void add_subscription(struct ast_sip_subscription *obj)
547 {
548         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
549         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
550         ast_module_ref(ast_module_info->self);
551 }
552
553 static void remove_subscription(struct ast_sip_subscription *obj)
554 {
555         struct ast_sip_subscription *i;
556         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
557         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
558                 if (i == obj) {
559                         AST_RWLIST_REMOVE_CURRENT(next);
560                         ast_module_unref(ast_module_info->self);
561                         break;
562                 }
563         }
564         AST_RWLIST_TRAVERSE_SAFE_END;
565 }
566
567 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
568
569 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
570 {
571         int num = 0;
572         struct ast_sip_subscription *i;
573         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
574
575         if (!on_subscription) {
576                 return num;
577         }
578
579         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
580                 if (on_subscription(i, arg)) {
581                         break;
582                 }
583                 ++num;
584         }
585         return num;
586 }
587
588 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
589                                     struct ast_str **buf)
590 {
591         char str[256];
592         struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
593
594         ast_str_append(buf, 0, "Role: %s\r\n",
595                        sip_subscription_roles_map[sub->role]);
596         ast_str_append(buf, 0, "Endpoint: %s\r\n",
597                        ast_sorcery_object_get_id(sub->endpoint));
598
599         ast_copy_pj_str(str, &sub->dlg->call_id->id, sizeof(str));
600         ast_str_append(buf, 0, "Callid: %s\r\n", str);
601
602         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
603                                ast_sip_subscription_get_evsub(sub)));
604
605         ast_callerid_merge(str, sizeof(str),
606                            S_COR(id->self.name.valid, id->self.name.str, NULL),
607                            S_COR(id->self.number.valid, id->self.number.str, NULL),
608                            "Unknown");
609
610         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
611
612         if (sub->handler->to_ami) {
613                 sub->handler->to_ami(sub, buf);
614         }
615 }
616
617 #define DATASTORE_BUCKETS 53
618
619 #define DEFAULT_EXPIRES 3600
620
621 static int datastore_hash(const void *obj, int flags)
622 {
623         const struct ast_datastore *datastore = obj;
624         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
625
626         ast_assert(uid != NULL);
627
628         return ast_str_hash(uid);
629 }
630
631 static int datastore_cmp(void *obj, void *arg, int flags)
632 {
633         const struct ast_datastore *datastore1 = obj;
634         const struct ast_datastore *datastore2 = arg;
635         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
636
637         ast_assert(datastore1->uid != NULL);
638         ast_assert(uid2 != NULL);
639
640         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
641 }
642
643 static int subscription_remove_serializer(void *obj)
644 {
645         struct ast_sip_subscription *sub = obj;
646
647         /* This is why we keep the dialog on the subscription. When the subscription
648          * is destroyed, there is no guarantee that the underlying dialog is ready
649          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
650          * either. The dialog could be destroyed before our subscription is. We fix
651          * this problem by keeping a reference to the dialog until it is time to
652          * destroy the subscription. We need to have the dialog available when the
653          * subscription is destroyed so that we can guarantee that our attempt to
654          * remove the serializer will be successful.
655          */
656         ast_sip_dialog_set_serializer(sub->dlg, NULL);
657         pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
658
659         return 0;
660 }
661
662 static void subscription_destructor(void *obj)
663 {
664         struct ast_sip_subscription *sub = obj;
665
666         ast_debug(3, "Destroying SIP subscription\n");
667
668         subscription_persistence_remove(sub);
669
670         remove_subscription(sub);
671
672         ao2_cleanup(sub->datastores);
673         ao2_cleanup(sub->endpoint);
674
675         if (sub->dlg) {
676                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
677         }
678         ast_taskprocessor_unreference(sub->serializer);
679 }
680
681 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
682 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
683 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
684                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
685 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
686                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
687 static void pubsub_on_client_refresh(pjsip_evsub *sub);
688 static void pubsub_on_server_timeout(pjsip_evsub *sub);
689
690
691 static pjsip_evsub_user pubsub_cb = {
692         .on_evsub_state = pubsub_on_evsub_state,
693         .on_tsx_state = pubsub_on_tsx_state,
694         .on_rx_refresh = pubsub_on_rx_refresh,
695         .on_rx_notify = pubsub_on_rx_notify,
696         .on_client_refresh = pubsub_on_client_refresh,
697         .on_server_timeout = pubsub_on_server_timeout,
698 };
699
700 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
701                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
702 {
703         pjsip_evsub *evsub;
704         /* PJSIP is kind enough to have some built-in support for certain
705          * events. We need to use the correct initialization function for the
706          * built-in events
707          */
708         if (role == AST_SIP_NOTIFIER) {
709                 pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
710         } else {
711                 pj_str_t pj_event;
712                 pj_cstr(&pj_event, event);
713                 pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
714         }
715         return evsub;
716 }
717
718 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
719                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
720 {
721         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
722         pjsip_dialog *dlg;
723         struct subscription_persistence *persistence;
724
725         if (!sub) {
726                 return NULL;
727         }
728         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
729         if (!sub->datastores) {
730                 ao2_ref(sub, -1);
731                 return NULL;
732         }
733         sub->serializer = ast_sip_create_serializer();
734         if (!sub->serializer) {
735                 ao2_ref(sub, -1);
736                 return NULL;
737         }
738         sub->body_generator = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
739                         pubsub_module.id, MOD_DATA_BODY_GENERATOR);
740         sub->role = role;
741         if (role == AST_SIP_NOTIFIER) {
742                 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
743         } else {
744                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
745
746                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
747                 if (!contact || ast_strlen_zero(contact->uri)) {
748                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
749                                         ast_sorcery_object_get_id(endpoint));
750                         ao2_ref(sub, -1);
751                         return NULL;
752                 }
753                 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
754         }
755         if (!dlg) {
756                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
757                 ao2_ref(sub, -1);
758                 return NULL;
759         }
760         persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
761                         pubsub_module.id, MOD_DATA_PERSISTENCE);
762         if (persistence) {
763                 /* Update the created dialog with the persisted information */
764                 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
765                 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
766                 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
767                 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
768                 dlg->local.cseq = persistence->cseq;
769                 dlg->remote.cseq = persistence->cseq;
770         }
771         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
772         /* We keep a reference to the dialog until our subscription is destroyed. See
773          * the subscription_destructor for more details
774          */
775         pjsip_dlg_inc_session(dlg, &pubsub_module);
776         sub->dlg = dlg;
777         ast_sip_dialog_set_serializer(dlg, sub->serializer);
778         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
779         ao2_ref(endpoint, +1);
780         sub->endpoint = endpoint;
781         sub->handler = handler;
782
783         add_subscription(sub);
784         return sub;
785 }
786
787 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
788 {
789         ast_assert(sub->endpoint != NULL);
790         ao2_ref(sub->endpoint, +1);
791         return sub->endpoint;
792 }
793
794 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
795 {
796         ast_assert(sub->serializer != NULL);
797         return sub->serializer;
798 }
799
800 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
801 {
802         return sub->evsub;
803 }
804
805 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
806 {
807         return sub->dlg;
808 }
809
810 int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
811 {
812         /* If this is a persistence recreation the subscription has already been accepted */
813         if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
814                 return 0;
815         }
816
817         return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
818 }
819
820 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
821 {
822         struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
823         int res;
824
825         ao2_ref(sub, +1);
826         res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
827                         tdata) == PJ_SUCCESS ? 0 : -1;
828
829         subscription_persistence_update(sub, NULL);
830
831         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
832                 "StateText: %s\r\n"
833                 "Endpoint: %s\r\n",
834                 pjsip_evsub_get_state_name(ast_sip_subscription_get_evsub(sub)),
835                 ast_sorcery_object_get_id(endpoint));
836         ao2_cleanup(sub);
837         ao2_cleanup(endpoint);
838
839         return res;
840 }
841
842 static void subscription_datastore_destroy(void *obj)
843 {
844         struct ast_datastore *datastore = obj;
845
846         /* Using the destroy function (if present) destroy the data */
847         if (datastore->info->destroy != NULL && datastore->data != NULL) {
848                 datastore->info->destroy(datastore->data);
849                 datastore->data = NULL;
850         }
851
852         ast_free((void *) datastore->uid);
853         datastore->uid = NULL;
854 }
855
856 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
857 {
858         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
859         const char *uid_ptr = uid;
860
861         if (!info) {
862                 return NULL;
863         }
864
865         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
866         if (!datastore) {
867                 return NULL;
868         }
869
870         datastore->info = info;
871         if (ast_strlen_zero(uid)) {
872                 /* They didn't provide an ID so we'll provide one ourself */
873                 struct ast_uuid *uuid = ast_uuid_generate();
874                 char uuid_buf[AST_UUID_STR_LEN];
875                 if (!uuid) {
876                         return NULL;
877                 }
878                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
879                 ast_free(uuid);
880         }
881
882         datastore->uid = ast_strdup(uid_ptr);
883         if (!datastore->uid) {
884                 return NULL;
885         }
886
887         ao2_ref(datastore, +1);
888         return datastore;
889 }
890
891 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
892 {
893         ast_assert(datastore != NULL);
894         ast_assert(datastore->info != NULL);
895         ast_assert(!ast_strlen_zero(datastore->uid));
896
897         if (!ao2_link(subscription->datastores, datastore)) {
898                 return -1;
899         }
900         return 0;
901 }
902
903 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
904 {
905         return ao2_find(subscription->datastores, name, OBJ_KEY);
906 }
907
908 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
909 {
910         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
911 }
912
913 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
914 {
915         ast_assert(datastore != NULL);
916         ast_assert(datastore->info != NULL);
917         ast_assert(!ast_strlen_zero(datastore->uid));
918
919         if (!ao2_link(publication->datastores, datastore)) {
920                 return -1;
921         }
922         return 0;
923 }
924
925 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
926 {
927         return ao2_find(publication->datastores, name, OBJ_KEY);
928 }
929
930 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
931 {
932         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
933 }
934
935 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
936
937 static int publication_hash_fn(const void *obj, const int flags)
938 {
939         const struct ast_sip_publication *publication = obj;
940         const int *entity_tag = obj;
941
942         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
943 }
944
945 static int publication_cmp_fn(void *obj, void *arg, int flags)
946 {
947         const struct ast_sip_publication *publication1 = obj;
948         const struct ast_sip_publication *publication2 = arg;
949         const int *entity_tag = arg;
950
951         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
952                 CMP_MATCH | CMP_STOP : 0);
953 }
954
955 static void publish_add_handler(struct ast_sip_publish_handler *handler)
956 {
957         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
958         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
959 }
960
961 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
962 {
963         if (ast_strlen_zero(handler->event_name)) {
964                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
965                 return -1;
966         }
967
968         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
969                 publication_hash_fn, publication_cmp_fn))) {
970                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
971                         handler->event_name);
972                 return -1;
973         }
974
975         publish_add_handler(handler);
976
977         ast_module_ref(ast_module_info->self);
978
979         return 0;
980 }
981
982 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
983 {
984         struct ast_sip_publish_handler *iter;
985         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
986         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
987                 if (handler == iter) {
988                         AST_RWLIST_REMOVE_CURRENT(next);
989                         ao2_cleanup(handler->publications);
990                         ast_module_unref(ast_module_info->self);
991                         break;
992                 }
993         }
994         AST_RWLIST_TRAVERSE_SAFE_END;
995 }
996
997 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
998
999 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
1000 {
1001         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1002         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
1003         ast_module_ref(ast_module_info->self);
1004 }
1005
1006 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
1007 {
1008         struct ast_sip_subscription_handler *iter;
1009         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1010
1011         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
1012                 if (!strcmp(iter->event_name, event_name)) {
1013                         break;
1014                 }
1015         }
1016         return iter;
1017 }
1018
1019 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
1020 {
1021         pj_str_t event;
1022         pj_str_t accept[AST_SIP_MAX_ACCEPT];
1023         struct ast_sip_subscription_handler *existing;
1024         int i;
1025
1026         if (ast_strlen_zero(handler->event_name)) {
1027                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
1028                 return -1;
1029         }
1030
1031         existing = find_sub_handler_for_event_name(handler->event_name);
1032         if (existing) {
1033                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
1034                                 "A handler is already registered\n", handler->event_name);
1035                 return -1;
1036         }
1037
1038         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
1039                 pj_cstr(&accept[i], handler->accept[i]);
1040         }
1041
1042         pj_cstr(&event, handler->event_name);
1043
1044         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
1045
1046         sub_add_handler(handler);
1047
1048         return 0;
1049 }
1050
1051 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
1052 {
1053         struct ast_sip_subscription_handler *iter;
1054         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1055         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
1056                 if (handler == iter) {
1057                         AST_RWLIST_REMOVE_CURRENT(next);
1058                         ast_module_unref(ast_module_info->self);
1059                         break;
1060                 }
1061         }
1062         AST_RWLIST_TRAVERSE_SAFE_END;
1063 }
1064
1065 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
1066                 const char *content_subtype)
1067 {
1068         struct ast_sip_pubsub_body_generator *iter;
1069         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1070
1071         AST_LIST_TRAVERSE(&body_generators, iter, list) {
1072                 if (!strcmp(iter->type, content_type) &&
1073                                 !strcmp(iter->subtype, content_subtype)) {
1074                         break;
1075                 }
1076         };
1077
1078         return iter;
1079 }
1080
1081 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
1082 {
1083         char *accept_copy = ast_strdupa(accept);
1084         char *subtype = accept_copy;
1085         char *type = strsep(&subtype, "/");
1086
1087         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
1088                 return NULL;
1089         }
1090
1091         return find_body_generator_type_subtype(type, subtype);
1092 }
1093
1094 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
1095                 size_t num_accept)
1096 {
1097         int i;
1098         struct ast_sip_pubsub_body_generator *generator = NULL;
1099
1100         for (i = 0; i < num_accept; ++i) {
1101                 generator = find_body_generator_accept(accept[i]);
1102                 if (generator) {
1103                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
1104                         break;
1105                 } else {
1106                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
1107                 }
1108         }
1109
1110         return generator;
1111 }
1112
1113 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
1114 {
1115         pjsip_expires_hdr *expires_header;
1116         struct ast_sip_subscription_handler *handler;
1117         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1118         struct ast_sip_subscription *sub;
1119         struct ast_sip_pubsub_body_generator *generator;
1120
1121         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1122         ast_assert(endpoint != NULL);
1123
1124         if (!endpoint->subscription.allow) {
1125                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
1126                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
1127                 return PJ_TRUE;
1128         }
1129
1130         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
1131
1132         if (expires_header) {
1133                 if (expires_header->ivalue == 0) {
1134                         ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
1135                                 ast_sorcery_object_get_id(endpoint));
1136                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1137                                 return PJ_TRUE;
1138                 }
1139                 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
1140                         ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
1141                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
1142                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
1143                         return PJ_TRUE;
1144                 }
1145         }
1146
1147         handler = subscription_get_handler_from_rdata(rdata);
1148         if (!handler) {
1149                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1150                 return PJ_TRUE;
1151         }
1152
1153         generator = subscription_get_generator_from_rdata(rdata, handler);
1154         if (!generator) {
1155                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1156                 return PJ_TRUE;
1157         }
1158
1159         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1160                         pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
1161
1162         sub = handler->new_subscribe(endpoint, rdata);
1163         if (!sub) {
1164                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
1165
1166                 if (trans) {
1167                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
1168                         pjsip_tx_data *tdata;
1169
1170                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
1171                                 return PJ_TRUE;
1172                         }
1173                         pjsip_dlg_send_response(dlg, trans, tdata);
1174                 } else {
1175                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1176                 }
1177         } else {
1178                 sub->persistence = subscription_persistence_create(sub);
1179                 subscription_persistence_update(sub, rdata);
1180         }
1181
1182         return PJ_TRUE;
1183 }
1184
1185 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
1186 {
1187         struct ast_sip_publish_handler *iter = NULL;
1188         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1189
1190         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
1191                 if (strcmp(event, iter->event_name)) {
1192                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
1193                         continue;
1194                 }
1195                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
1196                 break;
1197         }
1198
1199         return iter;
1200 }
1201
1202 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
1203         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
1204 {
1205         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1206
1207         if (etag_hdr) {
1208                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
1209
1210                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
1211
1212                 if (sscanf(etag, "%30d", entity_id) != 1) {
1213                         return SIP_PUBLISH_UNKNOWN;
1214                 }
1215         }
1216
1217         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1218
1219         if (!(*expires)) {
1220                 return SIP_PUBLISH_REMOVE;
1221         } else if (!etag_hdr && rdata->msg_info.msg->body) {
1222                 return SIP_PUBLISH_INITIAL;
1223         } else if (etag_hdr && !rdata->msg_info.msg->body) {
1224                 return SIP_PUBLISH_REFRESH;
1225         } else if (etag_hdr && rdata->msg_info.msg->body) {
1226                 return SIP_PUBLISH_MODIFY;
1227         }
1228
1229         return SIP_PUBLISH_UNKNOWN;
1230 }
1231
1232 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1233         struct ast_sip_publish_handler *handler)
1234 {
1235         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
1236
1237         if (!publication) {
1238                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
1239                 return NULL;
1240         }
1241
1242         publication->handler = handler;
1243
1244         return publication;
1245 }
1246
1247 static int publish_expire_callback(void *data)
1248 {
1249         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
1250
1251         publication->handler->publish_expire(publication);
1252
1253         return 0;
1254 }
1255
1256 static int publish_expire(const void *data)
1257 {
1258         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
1259
1260         ao2_unlink(publication->handler->publications, publication);
1261         publication->sched_id = -1;
1262
1263         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
1264                 ao2_cleanup(publication);
1265         }
1266
1267         return 0;
1268 }
1269
1270 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
1271 {
1272         pjsip_event_hdr *event_header;
1273         struct ast_sip_publish_handler *handler;
1274         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1275         char event[32];
1276         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
1277         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
1278         enum sip_publish_type publish_type;
1279         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
1280         int expires = 0, entity_id;
1281
1282         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1283         ast_assert(endpoint != NULL);
1284
1285         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
1286         if (!event_header) {
1287                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
1288                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1289                 return PJ_TRUE;
1290         }
1291         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
1292
1293         handler = find_pub_handler(event);
1294         if (!handler) {
1295                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
1296                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1297                 return PJ_TRUE;
1298         }
1299
1300         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
1301
1302         /* If this is not an initial publish ensure that a publication is present */
1303         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
1304                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
1305                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
1306
1307                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
1308                                 NULL, NULL);
1309                         return PJ_TRUE;
1310                 }
1311
1312                 /* Per the RFC every response has to have a new entity tag */
1313                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1314
1315                 /* Update the expires here so that the created responses will contain the correct value */
1316                 publication->expires = expires;
1317         }
1318
1319         switch (publish_type) {
1320                 case SIP_PUBLISH_INITIAL:
1321                         publication = publish_request_initial(endpoint, rdata, handler);
1322                         break;
1323                 case SIP_PUBLISH_REFRESH:
1324                 case SIP_PUBLISH_MODIFY:
1325                         if (handler->publish_refresh(publication, rdata)) {
1326                                 /* If an error occurs we want to terminate the publication */
1327                                 expires = 0;
1328                         }
1329                         break;
1330                 case SIP_PUBLISH_REMOVE:
1331                         handler->publish_termination(publication, rdata);
1332                         break;
1333                 case SIP_PUBLISH_UNKNOWN:
1334                 default:
1335                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1336                         break;
1337         }
1338
1339         if (publication) {
1340                 if (expires) {
1341                         ao2_link(handler->publications, publication);
1342
1343                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1344                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1345                 } else {
1346                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1347                 }
1348         }
1349
1350         return PJ_TRUE;
1351 }
1352
1353 /*! \brief Internal destructor for publications */
1354 static void publication_destroy_fn(void *obj)
1355 {
1356         struct ast_sip_publication *publication = obj;
1357
1358         ast_debug(3, "Destroying SIP publication\n");
1359
1360         ao2_cleanup(publication->datastores);
1361         ao2_cleanup(publication->endpoint);
1362 }
1363
1364 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1365 {
1366         struct ast_sip_publication *publication;
1367         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1368
1369         ast_assert(endpoint != NULL);
1370
1371         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
1372                 return NULL;
1373         }
1374
1375         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1376                 ao2_ref(publication, -1);
1377                 return NULL;
1378         }
1379
1380         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1381         ao2_ref(endpoint, +1);
1382         publication->endpoint = endpoint;
1383         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1384         publication->sched_id = -1;
1385
1386         return publication;
1387 }
1388
1389 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1390 {
1391         return pub->endpoint;
1392 }
1393
1394 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
1395         pjsip_tx_data **tdata)
1396 {
1397         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
1398                 return -1;
1399         }
1400
1401         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1402                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1403                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1404
1405                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1406                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1407                         pjsip_tx_data_dec_ref(*tdata);
1408                         return -1;
1409                 }
1410
1411                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
1412                 ast_sip_add_header(*tdata, "Expires", expires);
1413         }
1414
1415         return 0;
1416 }
1417
1418 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
1419         pjsip_tx_data *tdata)
1420 {
1421         pj_status_t status;
1422         pjsip_transaction *tsx;
1423
1424         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1425                 return status;
1426         }
1427
1428         pjsip_tsx_recv_msg(tsx, rdata);
1429
1430         return pjsip_tsx_send_msg(tsx, tdata);
1431 }
1432
1433 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1434 {
1435         struct ast_sip_pubsub_body_generator *existing;
1436         pj_str_t accept;
1437         pj_size_t accept_len;
1438
1439         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1440         if (existing) {
1441                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1442                                 "One is already registered.\n", generator->type, generator->subtype);
1443                 return -1;
1444         }
1445
1446         AST_RWLIST_WRLOCK(&body_generators);
1447         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1448         AST_RWLIST_UNLOCK(&body_generators);
1449
1450         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1451          * null-terminated, so there is no need to allocate for the extra null
1452          * byte
1453          */
1454         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1455
1456         accept.ptr = alloca(accept_len);
1457         accept.slen = accept_len;
1458         /* Safe use of sprintf */
1459         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1460         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1461                         PJSIP_H_ACCEPT, NULL, 1, &accept);
1462
1463         return 0;
1464 }
1465
1466 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1467 {
1468         struct ast_sip_pubsub_body_generator *iter;
1469         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1470
1471         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1472                 if (iter == generator) {
1473                         AST_LIST_REMOVE_CURRENT(list);
1474                         break;
1475                 }
1476         }
1477         AST_RWLIST_TRAVERSE_SAFE_END;
1478 }
1479
1480 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1481 {
1482         AST_RWLIST_WRLOCK(&body_supplements);
1483         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1484         AST_RWLIST_UNLOCK(&body_supplements);
1485
1486         return 0;
1487 }
1488
1489 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1490 {
1491         struct ast_sip_pubsub_body_supplement *iter;
1492         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1493
1494         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1495                 if (iter == supplement) {
1496                         AST_LIST_REMOVE_CURRENT(list);
1497                         break;
1498                 }
1499         }
1500         AST_RWLIST_TRAVERSE_SAFE_END;
1501 }
1502
1503 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1504 {
1505         return sub->body_generator->type;
1506 }
1507
1508 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1509 {
1510         return sub->body_generator->subtype;
1511 }
1512
1513 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1514                 void *data, struct ast_str **str)
1515 {
1516         struct ast_sip_pubsub_body_supplement *supplement;
1517         struct ast_sip_pubsub_body_generator *generator;
1518         int res = 0;
1519         void *body;
1520
1521         generator = find_body_generator_type_subtype(type, subtype);
1522         if (!generator) {
1523                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1524                                 type, subtype);
1525                 return -1;
1526         }
1527
1528         body = generator->allocate_body(data);
1529         if (!body) {
1530                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1531                                 type, subtype);
1532                 return -1;
1533         }
1534
1535         if (generator->generate_body_content(body, data)) {
1536                 res = -1;
1537                 goto end;
1538         }
1539
1540         AST_RWLIST_RDLOCK(&body_supplements);
1541         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1542                 if (!strcmp(generator->type, supplement->type) &&
1543                                 !strcmp(generator->subtype, supplement->subtype)) {
1544                         res = supplement->supplement_body(body, data);
1545                         if (res) {
1546                                 break;
1547                         }
1548                 }
1549         }
1550         AST_RWLIST_UNLOCK(&body_supplements);
1551
1552         if (!res) {
1553                 generator->to_string(body, str);
1554         }
1555
1556 end:
1557         if (generator->destroy_body) {
1558                 generator->destroy_body(body);
1559         }
1560
1561         return res;
1562 }
1563
1564 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1565 {
1566         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1567                 return pubsub_on_rx_subscribe_request(rdata);
1568         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1569                 return pubsub_on_rx_publish_request(rdata);
1570         }
1571
1572         return PJ_FALSE;
1573 }
1574
1575 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1576 {
1577         struct ast_sip_subscription *sub;
1578         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1579                 return;
1580         }
1581
1582         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1583         if (!sub) {
1584                 return;
1585         }
1586
1587         if (sub->handler->subscription_shutdown) {
1588                 sub->handler->subscription_shutdown(sub);
1589         }
1590         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1591 }
1592
1593 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
1594 {
1595         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1596
1597         if (!sub) {
1598                 return;
1599         }
1600
1601         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
1602             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
1603                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
1604         }
1605 }
1606
1607 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
1608                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
1609                 struct ast_sip_subscription_response_data *response_data)
1610 {
1611         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
1612         *p_st_code = response_data->status_code;
1613
1614         if (!ast_strlen_zero(response_data->status_text)) {
1615                 pj_strdup2(pool, *p_st_text, response_data->status_text);
1616         }
1617
1618         if (response_data->headers) {
1619                 struct ast_variable *iter;
1620                 for (iter = response_data->headers; iter; iter = iter->next) {
1621                         pj_str_t header_name;
1622                         pj_str_t header_value;
1623                         pjsip_generic_string_hdr *hdr;
1624
1625                         pj_cstr(&header_name, iter->name);
1626                         pj_cstr(&header_value, iter->value);
1627                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1628                         pj_list_insert_before(res_hdr, hdr);
1629                 }
1630         }
1631
1632         if (response_data->body) {
1633                 pj_str_t type;
1634                 pj_str_t subtype;
1635                 pj_str_t body_text;
1636
1637                 pj_cstr(&type, response_data->body->type);
1638                 pj_cstr(&subtype, response_data->body->subtype);
1639                 pj_cstr(&body_text, response_data->body->body_text);
1640
1641                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1642         }
1643 }
1644
1645 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1646 {
1647         if (response_data->status_code != 200 ||
1648                         !ast_strlen_zero(response_data->status_text) ||
1649                         response_data->headers ||
1650                         response_data->body) {
1651                 return 1;
1652         }
1653         return 0;
1654 }
1655
1656 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1657                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1658 {
1659         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1660         struct ast_sip_subscription_response_data response_data = {
1661                 .status_code = 200,
1662         };
1663
1664         if (!sub) {
1665                 return;
1666         }
1667
1668         if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
1669                 sub->handler->subscription_terminated(sub, rdata);
1670                 return;
1671         }
1672
1673         sub->handler->resubscribe(sub, rdata, &response_data);
1674
1675         if (!response_data_changed(&response_data)) {
1676                 return;
1677         }
1678
1679         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1680                         res_hdr, p_body, &response_data);
1681 }
1682
1683 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1684                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1685 {
1686         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1687         struct ast_sip_subscription_response_data response_data = {
1688                 .status_code = 200,
1689         };
1690
1691         if (!sub || !sub->handler->notify_request) {
1692                 return;
1693         }
1694
1695         sub->handler->notify_request(sub, rdata, &response_data);
1696
1697         if (!response_data_changed(&response_data)) {
1698                 return;
1699         }
1700
1701         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1702                         res_hdr, p_body, &response_data);
1703 }
1704
1705 static int serialized_pubsub_on_client_refresh(void *userdata)
1706 {
1707         struct ast_sip_subscription *sub = userdata;
1708
1709         sub->handler->refresh_subscription(sub);
1710         ao2_cleanup(sub);
1711         return 0;
1712 }
1713
1714 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1715 {
1716         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1717
1718         ao2_ref(sub, +1);
1719         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1720 }
1721
1722 static int serialized_pubsub_on_server_timeout(void *userdata)
1723 {
1724         struct ast_sip_subscription *sub = userdata;
1725
1726         sub->handler->subscription_timeout(sub);
1727         ao2_cleanup(sub);
1728         return 0;
1729 }
1730
1731 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1732 {
1733         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1734
1735         if (!sub) {
1736                 /* if a subscription has been terminated and the subscription
1737                    timeout/expires is less than the time it takes for all pending
1738                    transactions to end then the subscription timer will not have
1739                    been canceled yet and sub will be null, so do nothing since
1740                    the subscription has already been terminated. */
1741                 return;
1742         }
1743
1744         ao2_ref(sub, +1);
1745         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1746 }
1747
1748 static int ami_subscription_detail(struct ast_sip_subscription *sub,
1749                                    struct ast_sip_ami *ami,
1750                                    const char *event)
1751 {
1752         RAII_VAR(struct ast_str *, buf,
1753                  ast_sip_create_ami_event(event, ami), ast_free);
1754
1755         if (!buf) {
1756                 return -1;
1757         }
1758
1759         sip_subscription_to_ami(sub, &buf);
1760         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
1761         return 0;
1762 }
1763
1764 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
1765 {
1766         return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
1767                 sub, arg, "InboundSubscriptionDetail") : 0;
1768 }
1769
1770 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
1771 {
1772         return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
1773                 sub, arg, "OutboundSubscriptionDetail") : 0;
1774 }
1775
1776 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
1777 {
1778         struct ast_sip_ami ami = { .s = s, .m = m };
1779         int num;
1780
1781         astman_send_listack(s, m, "Following are Events for "
1782                             "each inbound Subscription", "start");
1783
1784         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
1785
1786         astman_append(s,
1787                       "Event: InboundSubscriptionDetailComplete\r\n"
1788                       "EventList: Complete\r\n"
1789                       "ListItems: %d\r\n\r\n", num);
1790         return 0;
1791 }
1792
1793 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
1794 {
1795         struct ast_sip_ami ami = { .s = s, .m = m };
1796         int num;
1797
1798         astman_send_listack(s, m, "Following are Events for "
1799                             "each outbound Subscription", "start");
1800
1801         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
1802
1803         astman_append(s,
1804                       "Event: OutboundSubscriptionDetailComplete\r\n"
1805                       "EventList: Complete\r\n"
1806                       "ListItems: %d\r\n\r\n", num);
1807         return 0;
1808 }
1809
1810 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
1811 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
1812
1813 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1814 {
1815         struct subscription_persistence *persistence = obj;
1816
1817         persistence->endpoint = ast_strdup(var->value);
1818         return 0;
1819 }
1820
1821 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
1822 {
1823         const struct subscription_persistence *persistence = obj;
1824
1825         *buf = ast_strdup(persistence->endpoint);
1826         return 0;
1827 }
1828
1829 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1830 {
1831         struct subscription_persistence *persistence = obj;
1832
1833         persistence->tag = ast_strdup(var->value);
1834         return 0;
1835 }
1836
1837 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
1838 {
1839         const struct subscription_persistence *persistence = obj;
1840
1841         *buf = ast_strdup(persistence->tag);
1842         return 0;
1843 }
1844
1845 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1846 {
1847         struct subscription_persistence *persistence = obj;
1848         return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
1849 }
1850
1851 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
1852 {
1853         const struct subscription_persistence *persistence = obj;
1854         return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
1855 }
1856
1857 static int load_module(void)
1858 {
1859         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1860         struct ast_sorcery *sorcery = ast_sip_get_sorcery();
1861
1862         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1863
1864         if (!(sched = ast_sched_context_create())) {
1865                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1866                 return AST_MODULE_LOAD_FAILURE;
1867         }
1868
1869         if (ast_sched_start_thread(sched)) {
1870                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1871                 ast_sched_context_destroy(sched);
1872                 return AST_MODULE_LOAD_FAILURE;
1873         }
1874
1875         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1876
1877         if (ast_sip_register_service(&pubsub_module)) {
1878                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1879                 ast_sched_context_destroy(sched);
1880                 return AST_MODULE_LOAD_FAILURE;
1881         }
1882
1883         ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
1884         ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
1885         if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
1886                 NULL, NULL)) {
1887                 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
1888                 ast_sip_unregister_service(&pubsub_module);
1889                 ast_sched_context_destroy(sched);
1890                 return AST_MODULE_LOAD_FAILURE;
1891         }
1892         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
1893                 CHARFLDSET(struct subscription_persistence, packet));
1894         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
1895                 CHARFLDSET(struct subscription_persistence, src_name));
1896         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
1897                 FLDSET(struct subscription_persistence, src_port));
1898         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
1899                 CHARFLDSET(struct subscription_persistence, transport_key));
1900         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
1901                 CHARFLDSET(struct subscription_persistence, local_name));
1902         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
1903                 FLDSET(struct subscription_persistence, local_port));
1904         ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
1905                 FLDSET(struct subscription_persistence, cseq));
1906         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
1907                 persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
1908         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
1909                 persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
1910         ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
1911                 persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
1912
1913         if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
1914                 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1915         } else {
1916                 stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
1917         }
1918
1919         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
1920                                  ami_show_subscriptions_inbound);
1921         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
1922                                  ami_show_subscriptions_outbound);
1923
1924         return AST_MODULE_LOAD_SUCCESS;
1925 }
1926
1927 static int unload_module(void)
1928 {
1929         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
1930         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
1931
1932         if (sched) {
1933                 ast_sched_context_destroy(sched);
1934         }
1935
1936         return 0;
1937 }
1938
1939 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1940                 .load = load_module,
1941                 .unload = unload_module,
1942                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1943 );