Allow Asterisk to compile under GCC 4.10
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
47
48 /*** DOCUMENTATION
49         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
50                 <synopsis>
51                         Lists subscriptions.
52                 </synopsis>
53                 <syntax />
54                 <description>
55                         <para>
56                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
57                         is issued for each subscription object.  Once all detail events are completed an
58                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
59                         </para>
60                 </description>
61         </manager>
62         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
63                 <synopsis>
64                         Lists subscriptions.
65                 </synopsis>
66                 <syntax />
67                 <description>
68                         <para>
69                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
70                         is issued for each subscription object.  Once all detail events are completed an
71                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
72                         </para>
73                 </description>
74         </manager>
75  ***/
76
77 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
78
79 static struct pjsip_module pubsub_module = {
80         .name = { "PubSub Module", 13 },
81         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
82         .on_rx_request = pubsub_on_rx_request,
83 };
84
85 #define MOD_DATA_BODY_GENERATOR "sub_body_generator"
86
87 static const pj_str_t str_event_name = { "Event", 5 };
88
89 /*! \brief Scheduler used for automatically expiring publications */
90 static struct ast_sched_context *sched;
91
92 /*! \brief Number of buckets for publications (on a per handler) */
93 #define PUBLICATIONS_BUCKETS 37
94
95 /*! \brief Default expiration time for PUBLISH if one is not specified */
96 #define DEFAULT_PUBLISH_EXPIRES 3600
97
98 /*! \brief Defined method for PUBLISH */
99 const pjsip_method pjsip_publish_method =
100 {
101         PJSIP_OTHER_METHOD,
102         { "PUBLISH", 7 }
103 };
104
105 /*!
106  * \brief The types of PUBLISH messages defined in RFC 3903
107  */
108 enum sip_publish_type {
109         /*!
110          * \brief Unknown
111          *
112          * \details
113          * This actually is not defined in RFC 3903. We use this as a constant
114          * to indicate that an incoming PUBLISH does not fit into any of the
115          * other categories and is thus invalid.
116          */
117         SIP_PUBLISH_UNKNOWN,
118
119         /*!
120          * \brief Initial
121          *
122          * \details
123          * The first PUBLISH sent. This will contain a non-zero Expires header
124          * as well as a body that indicates the current state of the endpoint
125          * that has sent the message. The initial PUBLISH is the only type
126          * of PUBLISH to not contain a Sip-If-Match header in it.
127          */
128         SIP_PUBLISH_INITIAL,
129
130         /*!
131          * \brief Refresh
132          *
133          * \details
134          * Used to keep a published state from expiring. This will contain a
135          * non-zero Expires header but no body since its purpose is not to
136          * update state.
137          */
138         SIP_PUBLISH_REFRESH,
139
140         /*!
141          * \brief Modify
142          *
143          * \details
144          * Used to change state from its previous value. This will contain
145          * a body updating the published state. May or may not contain an
146          * Expires header.
147          */
148         SIP_PUBLISH_MODIFY,
149
150         /*!
151          * \brief Remove
152          *
153          * \details
154          * Used to remove published state from an ESC. This will contain
155          * an Expires header set to 0 and likely no body.
156          */
157         SIP_PUBLISH_REMOVE,
158 };
159
160 /*!
161  * Used to create new entity IDs by ESCs.
162  */
163 static int esc_etag_counter;
164
165 /*!
166  * \brief Structure representing a SIP publication
167  */
168 struct ast_sip_publication {
169         /*! Publication datastores set up by handlers */
170         struct ao2_container *datastores;
171         /*! \brief Entity tag for the publication */
172         int entity_tag;
173         /*! \brief Handler for this publication */
174         struct ast_sip_publish_handler *handler;
175         /*! \brief The endpoint with which the subscription is communicating */
176         struct ast_sip_endpoint *endpoint;
177         /*! \brief Expiration time of the publication */
178         int expires;
179         /*! \brief Scheduled item for expiration of publication */
180         int sched_id;
181 };
182
183 /*!
184  * \brief Structure representing a SIP subscription
185  */
186 struct ast_sip_subscription {
187         /*! Subscription datastores set up by handlers */
188         struct ao2_container *datastores;
189         /*! The endpoint with which the subscription is communicating */
190         struct ast_sip_endpoint *endpoint;
191         /*! Serializer on which to place operations for this subscription */
192         struct ast_taskprocessor *serializer;
193         /*! The handler for this subscription */
194         const struct ast_sip_subscription_handler *handler;
195         /*! The role for this subscription */
196         enum ast_sip_subscription_role role;
197         /*! The underlying PJSIP event subscription structure */
198         pjsip_evsub *evsub;
199         /*! The underlying PJSIP dialog */
200         pjsip_dialog *dlg;
201         /*! Body generaator for NOTIFYs */
202         struct ast_sip_pubsub_body_generator *body_generator;
203         /*! Next item in the list */
204         AST_LIST_ENTRY(ast_sip_subscription) next;
205 };
206
207 static const char *sip_subscription_roles_map[] = {
208         [AST_SIP_SUBSCRIBER] = "Subscriber",
209         [AST_SIP_NOTIFIER] = "Notifier"
210 };
211
212 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
213
214 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
215 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
216
217 static void add_subscription(struct ast_sip_subscription *obj)
218 {
219         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
220         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
221         ast_module_ref(ast_module_info->self);
222 }
223
224 static void remove_subscription(struct ast_sip_subscription *obj)
225 {
226         struct ast_sip_subscription *i;
227         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
228         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
229                 if (i == obj) {
230                         AST_RWLIST_REMOVE_CURRENT(next);
231                         ast_module_unref(ast_module_info->self);
232                         break;
233                 }
234         }
235         AST_RWLIST_TRAVERSE_SAFE_END;
236 }
237
238 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
239
240 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
241 {
242         int num = 0;
243         struct ast_sip_subscription *i;
244         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
245
246         if (!on_subscription) {
247                 return num;
248         }
249
250         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
251                 if (on_subscription(i, arg)) {
252                         break;
253                 }
254                 ++num;
255         }
256         return num;
257 }
258
259 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
260                                     struct ast_str **buf)
261 {
262         char str[256];
263         struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
264
265         ast_str_append(buf, 0, "Role: %s\r\n",
266                        sip_subscription_roles_map[sub->role]);
267         ast_str_append(buf, 0, "Endpoint: %s\r\n",
268                        ast_sorcery_object_get_id(sub->endpoint));
269
270         ast_copy_pj_str(str, &sub->dlg->call_id->id, sizeof(str));
271         ast_str_append(buf, 0, "Callid: %s\r\n", str);
272
273         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
274                                ast_sip_subscription_get_evsub(sub)));
275
276         ast_callerid_merge(str, sizeof(str),
277                            S_COR(id->self.name.valid, id->self.name.str, NULL),
278                            S_COR(id->self.number.valid, id->self.number.str, NULL),
279                            "Unknown");
280
281         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
282
283         if (sub->handler->to_ami) {
284                 sub->handler->to_ami(sub, buf);
285         }
286 }
287
288 #define DATASTORE_BUCKETS 53
289
290 #define DEFAULT_EXPIRES 3600
291
292 static int datastore_hash(const void *obj, int flags)
293 {
294         const struct ast_datastore *datastore = obj;
295         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
296
297         ast_assert(uid != NULL);
298
299         return ast_str_hash(uid);
300 }
301
302 static int datastore_cmp(void *obj, void *arg, int flags)
303 {
304         const struct ast_datastore *datastore1 = obj;
305         const struct ast_datastore *datastore2 = arg;
306         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
307
308         ast_assert(datastore1->uid != NULL);
309         ast_assert(uid2 != NULL);
310
311         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
312 }
313
314 static int subscription_remove_serializer(void *obj)
315 {
316         struct ast_sip_subscription *sub = obj;
317
318         /* This is why we keep the dialog on the subscription. When the subscription
319          * is destroyed, there is no guarantee that the underlying dialog is ready
320          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
321          * either. The dialog could be destroyed before our subscription is. We fix
322          * this problem by keeping a reference to the dialog until it is time to
323          * destroy the subscription. We need to have the dialog available when the
324          * subscription is destroyed so that we can guarantee that our attempt to
325          * remove the serializer will be successful.
326          */
327         ast_sip_dialog_set_serializer(sub->dlg, NULL);
328         pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
329
330         return 0;
331 }
332
333 static void subscription_destructor(void *obj)
334 {
335         struct ast_sip_subscription *sub = obj;
336
337         ast_debug(3, "Destroying SIP subscription\n");
338         remove_subscription(sub);
339
340         ao2_cleanup(sub->datastores);
341         ao2_cleanup(sub->endpoint);
342
343         if (sub->dlg) {
344                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
345         }
346         ast_taskprocessor_unreference(sub->serializer);
347 }
348
349 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
350 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
351 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
352                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
353 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
354                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
355 static void pubsub_on_client_refresh(pjsip_evsub *sub);
356 static void pubsub_on_server_timeout(pjsip_evsub *sub);
357
358
359 static pjsip_evsub_user pubsub_cb = {
360         .on_evsub_state = pubsub_on_evsub_state,
361         .on_tsx_state = pubsub_on_tsx_state,
362         .on_rx_refresh = pubsub_on_rx_refresh,
363         .on_rx_notify = pubsub_on_rx_notify,
364         .on_client_refresh = pubsub_on_client_refresh,
365         .on_server_timeout = pubsub_on_server_timeout,
366 };
367
368 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
369                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
370 {
371         pjsip_evsub *evsub;
372         /* PJSIP is kind enough to have some built-in support for certain
373          * events. We need to use the correct initialization function for the
374          * built-in events
375          */
376         if (role == AST_SIP_NOTIFIER) {
377                 pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
378         } else {
379                 pj_str_t pj_event;
380                 pj_cstr(&pj_event, event);
381                 pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
382         }
383         return evsub;
384 }
385
386 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
387                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
388 {
389         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
390         pjsip_dialog *dlg;
391
392         if (!sub) {
393                 return NULL;
394         }
395         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
396         if (!sub->datastores) {
397                 ao2_ref(sub, -1);
398                 return NULL;
399         }
400         sub->serializer = ast_sip_create_serializer();
401         if (!sub->serializer) {
402                 ao2_ref(sub, -1);
403                 return NULL;
404         }
405         sub->body_generator = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
406                         pubsub_module.id, MOD_DATA_BODY_GENERATOR);
407         sub->role = role;
408         if (role == AST_SIP_NOTIFIER) {
409                 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
410         } else {
411                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
412
413                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
414                 if (!contact || ast_strlen_zero(contact->uri)) {
415                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
416                                         ast_sorcery_object_get_id(endpoint));
417                         ao2_ref(sub, -1);
418                         return NULL;
419                 }
420                 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
421         }
422         if (!dlg) {
423                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
424                 ao2_ref(sub, -1);
425                 return NULL;
426         }
427         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
428         /* We keep a reference to the dialog until our subscription is destroyed. See
429          * the subscription_destructor for more details
430          */
431         pjsip_dlg_inc_session(dlg, &pubsub_module);
432         sub->dlg = dlg;
433         ast_sip_dialog_set_serializer(dlg, sub->serializer);
434         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
435         ao2_ref(endpoint, +1);
436         sub->endpoint = endpoint;
437         sub->handler = handler;
438
439         add_subscription(sub);
440         return sub;
441 }
442
443 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
444 {
445         ast_assert(sub->endpoint != NULL);
446         ao2_ref(sub->endpoint, +1);
447         return sub->endpoint;
448 }
449
450 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
451 {
452         ast_assert(sub->serializer != NULL);
453         return sub->serializer;
454 }
455
456 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
457 {
458         return sub->evsub;
459 }
460
461 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
462 {
463         return sub->dlg;
464 }
465
466 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
467 {
468         struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
469         int res;
470
471         ao2_ref(sub, +1);
472         res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
473                         tdata) == PJ_SUCCESS ? 0 : -1;
474
475         ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
476                 "StateText: %s\r\n"
477                 "Endpoint: %s\r\n",
478                 pjsip_evsub_get_state_name(ast_sip_subscription_get_evsub(sub)),
479                 ast_sorcery_object_get_id(endpoint));
480         ao2_cleanup(sub);
481         ao2_cleanup(endpoint);
482
483         return res;
484 }
485
486 static void subscription_datastore_destroy(void *obj)
487 {
488         struct ast_datastore *datastore = obj;
489
490         /* Using the destroy function (if present) destroy the data */
491         if (datastore->info->destroy != NULL && datastore->data != NULL) {
492                 datastore->info->destroy(datastore->data);
493                 datastore->data = NULL;
494         }
495
496         ast_free((void *) datastore->uid);
497         datastore->uid = NULL;
498 }
499
500 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
501 {
502         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
503         const char *uid_ptr = uid;
504
505         if (!info) {
506                 return NULL;
507         }
508
509         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
510         if (!datastore) {
511                 return NULL;
512         }
513
514         datastore->info = info;
515         if (ast_strlen_zero(uid)) {
516                 /* They didn't provide an ID so we'll provide one ourself */
517                 struct ast_uuid *uuid = ast_uuid_generate();
518                 char uuid_buf[AST_UUID_STR_LEN];
519                 if (!uuid) {
520                         return NULL;
521                 }
522                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
523                 ast_free(uuid);
524         }
525
526         datastore->uid = ast_strdup(uid_ptr);
527         if (!datastore->uid) {
528                 return NULL;
529         }
530
531         ao2_ref(datastore, +1);
532         return datastore;
533 }
534
535 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
536 {
537         ast_assert(datastore != NULL);
538         ast_assert(datastore->info != NULL);
539         ast_assert(!ast_strlen_zero(datastore->uid));
540
541         if (!ao2_link(subscription->datastores, datastore)) {
542                 return -1;
543         }
544         return 0;
545 }
546
547 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
548 {
549         return ao2_find(subscription->datastores, name, OBJ_KEY);
550 }
551
552 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
553 {
554         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
555 }
556
557 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
558 {
559         ast_assert(datastore != NULL);
560         ast_assert(datastore->info != NULL);
561         ast_assert(!ast_strlen_zero(datastore->uid));
562
563         if (!ao2_link(publication->datastores, datastore)) {
564                 return -1;
565         }
566         return 0;
567 }
568
569 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
570 {
571         return ao2_find(publication->datastores, name, OBJ_KEY);
572 }
573
574 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
575 {
576         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
577 }
578
579 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
580
581 static int publication_hash_fn(const void *obj, const int flags)
582 {
583         const struct ast_sip_publication *publication = obj;
584         const int *entity_tag = obj;
585
586         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
587 }
588
589 static int publication_cmp_fn(void *obj, void *arg, int flags)
590 {
591         const struct ast_sip_publication *publication1 = obj;
592         const struct ast_sip_publication *publication2 = arg;
593         const int *entity_tag = arg;
594
595         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
596                 CMP_MATCH | CMP_STOP : 0);
597 }
598
599 static void publish_add_handler(struct ast_sip_publish_handler *handler)
600 {
601         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
602         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
603 }
604
605 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
606 {
607         if (ast_strlen_zero(handler->event_name)) {
608                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
609                 return -1;
610         }
611
612         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
613                 publication_hash_fn, publication_cmp_fn))) {
614                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
615                         handler->event_name);
616                 return -1;
617         }
618
619         publish_add_handler(handler);
620
621         ast_module_ref(ast_module_info->self);
622
623         return 0;
624 }
625
626 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
627 {
628         struct ast_sip_publish_handler *iter;
629         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
630         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
631                 if (handler == iter) {
632                         AST_RWLIST_REMOVE_CURRENT(next);
633                         ao2_cleanup(handler->publications);
634                         ast_module_unref(ast_module_info->self);
635                         break;
636                 }
637         }
638         AST_RWLIST_TRAVERSE_SAFE_END;
639 }
640
641 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
642
643 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
644 {
645         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
646         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
647         ast_module_ref(ast_module_info->self);
648 }
649
650 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
651 {
652         struct ast_sip_subscription_handler *iter;
653         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
654
655         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
656                 if (!strcmp(iter->event_name, event_name)) {
657                         break;
658                 }
659         }
660         return iter;
661 }
662
663 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
664 {
665         pj_str_t event;
666         pj_str_t accept[AST_SIP_MAX_ACCEPT];
667         struct ast_sip_subscription_handler *existing;
668         int i;
669
670         if (ast_strlen_zero(handler->event_name)) {
671                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
672                 return -1;
673         }
674
675         existing = find_sub_handler_for_event_name(handler->event_name);
676         if (existing) {
677                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
678                                 "A handler is already registered\n", handler->event_name);
679                 return -1;
680         }
681
682         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
683                 pj_cstr(&accept[i], handler->accept[i]);
684         }
685
686         pj_cstr(&event, handler->event_name);
687
688         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
689
690         sub_add_handler(handler);
691         return 0;
692 }
693
694 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
695 {
696         struct ast_sip_subscription_handler *iter;
697         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
698         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
699                 if (handler == iter) {
700                         AST_RWLIST_REMOVE_CURRENT(next);
701                         ast_module_unref(ast_module_info->self);
702                         break;
703                 }
704         }
705         AST_RWLIST_TRAVERSE_SAFE_END;
706 }
707
708 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
709                 const char *content_subtype)
710 {
711         struct ast_sip_pubsub_body_generator *iter;
712         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
713
714         AST_LIST_TRAVERSE(&body_generators, iter, list) {
715                 if (!strcmp(iter->type, content_type) &&
716                                 !strcmp(iter->subtype, content_subtype)) {
717                         break;
718                 }
719         };
720
721         return iter;
722 }
723
724 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
725 {
726         char *accept_copy = ast_strdupa(accept);
727         char *subtype = accept_copy;
728         char *type = strsep(&subtype, "/");
729
730         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
731                 return NULL;
732         }
733
734         return find_body_generator_type_subtype(type, subtype);
735 }
736
737 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
738                 size_t num_accept)
739 {
740         int i;
741         struct ast_sip_pubsub_body_generator *generator = NULL;
742
743         for (i = 0; i < num_accept; ++i) {
744                 generator = find_body_generator_accept(accept[i]);
745                 if (generator) {
746                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
747                         break;
748                 } else {
749                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
750                 }
751         }
752
753         return generator;
754 }
755
756 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
757 {
758         char event[32];
759         char accept[AST_SIP_MAX_ACCEPT][64];
760         pjsip_accept_hdr *accept_header;
761         pjsip_event_hdr *event_header;
762         pjsip_expires_hdr *expires_header;
763         struct ast_sip_subscription_handler *handler;
764         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
765         struct ast_sip_subscription *sub;
766         size_t num_accept_headers;
767         struct ast_sip_pubsub_body_generator *generator;
768
769         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
770         ast_assert(endpoint != NULL);
771
772         if (!endpoint->subscription.allow) {
773                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
774                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
775                 return PJ_TRUE;
776         }
777
778         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
779
780         if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) {
781                 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
782                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
783                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
784                 return PJ_TRUE;
785         }
786
787         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
788         if (!event_header) {
789                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
790                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
791                 return PJ_TRUE;
792         }
793         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
794
795         handler = find_sub_handler_for_event_name(event);
796         if (!handler) {
797                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
798                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
799                 return PJ_TRUE;
800         }
801
802         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
803         if (accept_header) {
804                 int i;
805
806                 for (i = 0; i < accept_header->count; ++i) {
807                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
808                 }
809                 num_accept_headers = accept_header->count;
810         } else {
811                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
812                  * the default accept type for the event package is to be used.
813                  */
814                 ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
815                 num_accept_headers = 1;
816         }
817
818         generator = find_body_generator(accept, num_accept_headers);
819         if (!generator) {
820                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
821                 return PJ_TRUE;
822         }
823
824         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
825                         pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
826
827         sub = handler->new_subscribe(endpoint, rdata);
828         if (!sub) {
829                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
830
831                 if (trans) {
832                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
833                         pjsip_tx_data *tdata;
834
835                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
836                                 return PJ_TRUE;
837                         }
838                         pjsip_dlg_send_response(dlg, trans, tdata);
839                 } else {
840                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
841                 }
842         }
843         return PJ_TRUE;
844 }
845
846 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
847 {
848         struct ast_sip_publish_handler *iter = NULL;
849         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
850
851         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
852                 if (strcmp(event, iter->event_name)) {
853                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
854                         continue;
855                 }
856                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
857                 break;
858         }
859
860         return iter;
861 }
862
863 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
864         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
865 {
866         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
867
868         if (etag_hdr) {
869                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
870
871                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
872
873                 if (sscanf(etag, "%30d", entity_id) != 1) {
874                         return SIP_PUBLISH_UNKNOWN;
875                 }
876         }
877
878         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
879
880         if (!(*expires)) {
881                 return SIP_PUBLISH_REMOVE;
882         } else if (!etag_hdr && rdata->msg_info.msg->body) {
883                 return SIP_PUBLISH_INITIAL;
884         } else if (etag_hdr && !rdata->msg_info.msg->body) {
885                 return SIP_PUBLISH_REFRESH;
886         } else if (etag_hdr && rdata->msg_info.msg->body) {
887                 return SIP_PUBLISH_MODIFY;
888         }
889
890         return SIP_PUBLISH_UNKNOWN;
891 }
892
893 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
894         struct ast_sip_publish_handler *handler)
895 {
896         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
897
898         if (!publication) {
899                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
900                 return NULL;
901         }
902
903         publication->handler = handler;
904
905         return publication;
906 }
907
908 static int publish_expire_callback(void *data)
909 {
910         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
911
912         publication->handler->publish_expire(publication);
913
914         return 0;
915 }
916
917 static int publish_expire(const void *data)
918 {
919         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
920
921         ao2_unlink(publication->handler->publications, publication);
922         publication->sched_id = -1;
923
924         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
925                 ao2_cleanup(publication);
926         }
927
928         return 0;
929 }
930
931 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
932 {
933         pjsip_event_hdr *event_header;
934         struct ast_sip_publish_handler *handler;
935         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
936         char event[32];
937         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
938         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
939         enum sip_publish_type publish_type;
940         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
941         int expires = 0, entity_id;
942
943         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
944         ast_assert(endpoint != NULL);
945
946         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
947         if (!event_header) {
948                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
949                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
950                 return PJ_TRUE;
951         }
952         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
953
954         handler = find_pub_handler(event);
955         if (!handler) {
956                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
957                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
958                 return PJ_TRUE;
959         }
960
961         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
962
963         /* If this is not an initial publish ensure that a publication is present */
964         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
965                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
966                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
967
968                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
969                                 NULL, NULL);
970                         return PJ_TRUE;
971                 }
972
973                 /* Per the RFC every response has to have a new entity tag */
974                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
975
976                 /* Update the expires here so that the created responses will contain the correct value */
977                 publication->expires = expires;
978         }
979
980         switch (publish_type) {
981                 case SIP_PUBLISH_INITIAL:
982                         publication = publish_request_initial(endpoint, rdata, handler);
983                         break;
984                 case SIP_PUBLISH_REFRESH:
985                 case SIP_PUBLISH_MODIFY:
986                         if (handler->publish_refresh(publication, rdata)) {
987                                 /* If an error occurs we want to terminate the publication */
988                                 expires = 0;
989                         }
990                         break;
991                 case SIP_PUBLISH_REMOVE:
992                         handler->publish_termination(publication, rdata);
993                         break;
994                 case SIP_PUBLISH_UNKNOWN:
995                 default:
996                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
997                         break;
998         }
999
1000         if (publication) {
1001                 if (expires) {
1002                         ao2_link(handler->publications, publication);
1003
1004                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1005                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1006                 } else {
1007                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1008                 }
1009         }
1010
1011         return PJ_TRUE;
1012 }
1013
1014 /*! \brief Internal destructor for publications */
1015 static void publication_destroy_fn(void *obj)
1016 {
1017         struct ast_sip_publication *publication = obj;
1018
1019         ast_debug(3, "Destroying SIP publication\n");
1020
1021         ao2_cleanup(publication->datastores);
1022         ao2_cleanup(publication->endpoint);
1023 }
1024
1025 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1026 {
1027         struct ast_sip_publication *publication;
1028         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1029
1030         ast_assert(endpoint != NULL);
1031
1032         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
1033                 return NULL;
1034         }
1035
1036         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1037                 ao2_ref(publication, -1);
1038                 return NULL;
1039         }
1040
1041         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1042         ao2_ref(endpoint, +1);
1043         publication->endpoint = endpoint;
1044         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1045         publication->sched_id = -1;
1046
1047         return publication;
1048 }
1049
1050 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1051 {
1052         return pub->endpoint;
1053 }
1054
1055 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
1056         pjsip_tx_data **tdata)
1057 {
1058         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
1059                 return -1;
1060         }
1061
1062         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1063                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1064                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1065
1066                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1067                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1068                         pjsip_tx_data_dec_ref(*tdata);
1069                         return -1;
1070                 }
1071
1072                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
1073                 ast_sip_add_header(*tdata, "Expires", expires);
1074         }
1075
1076         return 0;
1077 }
1078
1079 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
1080         pjsip_tx_data *tdata)
1081 {
1082         pj_status_t status;
1083         pjsip_transaction *tsx;
1084
1085         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1086                 return status;
1087         }
1088
1089         pjsip_tsx_recv_msg(tsx, rdata);
1090
1091         return pjsip_tsx_send_msg(tsx, tdata);
1092 }
1093
1094 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1095 {
1096         struct ast_sip_pubsub_body_generator *existing;
1097         pj_str_t accept;
1098         pj_size_t accept_len;
1099
1100         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1101         if (existing) {
1102                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1103                                 "One is already registered.\n", generator->type, generator->subtype);
1104                 return -1;
1105         }
1106
1107         AST_RWLIST_WRLOCK(&body_generators);
1108         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1109         AST_RWLIST_UNLOCK(&body_generators);
1110
1111         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1112          * null-terminated, so there is no need to allocate for the extra null
1113          * byte
1114          */
1115         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1116
1117         accept.ptr = alloca(accept_len);
1118         accept.slen = accept_len;
1119         /* Safe use of sprintf */
1120         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1121         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1122                         PJSIP_H_ACCEPT, NULL, 1, &accept);
1123
1124         return 0;
1125 }
1126
1127 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1128 {
1129         struct ast_sip_pubsub_body_generator *iter;
1130         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1131
1132         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1133                 if (iter == generator) {
1134                         AST_LIST_REMOVE_CURRENT(list);
1135                         break;
1136                 }
1137         }
1138         AST_RWLIST_TRAVERSE_SAFE_END;
1139 }
1140
1141 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1142 {
1143         AST_RWLIST_WRLOCK(&body_supplements);
1144         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1145         AST_RWLIST_UNLOCK(&body_supplements);
1146
1147         return 0;
1148 }
1149
1150 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1151 {
1152         struct ast_sip_pubsub_body_supplement *iter;
1153         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1154
1155         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1156                 if (iter == supplement) {
1157                         AST_LIST_REMOVE_CURRENT(list);
1158                         break;
1159                 }
1160         }
1161         AST_RWLIST_TRAVERSE_SAFE_END;
1162 }
1163
1164 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1165 {
1166         return sub->body_generator->type;
1167 }
1168
1169 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1170 {
1171         return sub->body_generator->subtype;
1172 }
1173
1174 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1175                 void *data, struct ast_str **str)
1176 {
1177         struct ast_sip_pubsub_body_supplement *supplement;
1178         struct ast_sip_pubsub_body_generator *generator;
1179         int res = 0;
1180         void *body;
1181
1182         generator = find_body_generator_type_subtype(type, subtype);
1183         if (!generator) {
1184                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1185                                 type, subtype);
1186                 return -1;
1187         }
1188
1189         body = generator->allocate_body(data);
1190         if (!body) {
1191                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1192                                 type, subtype);
1193                 return -1;
1194         }
1195
1196         if (generator->generate_body_content(body, data)) {
1197                 res = -1;
1198                 goto end;
1199         }
1200
1201         AST_RWLIST_RDLOCK(&body_supplements);
1202         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1203                 if (!strcmp(generator->type, supplement->type) &&
1204                                 !strcmp(generator->subtype, supplement->subtype)) {
1205                         res = supplement->supplement_body(body, data);
1206                         if (res) {
1207                                 break;
1208                         }
1209                 }
1210         }
1211         AST_RWLIST_UNLOCK(&body_supplements);
1212
1213         if (!res) {
1214                 generator->to_string(body, str);
1215         }
1216
1217 end:
1218         if (generator->destroy_body) {
1219                 generator->destroy_body(body);
1220         }
1221
1222         return res;
1223 }
1224
1225 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1226 {
1227         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1228                 return pubsub_on_rx_subscribe_request(rdata);
1229         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1230                 return pubsub_on_rx_publish_request(rdata);
1231         }
1232
1233         return PJ_FALSE;
1234 }
1235
1236 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1237 {
1238         struct ast_sip_subscription *sub;
1239         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1240                 return;
1241         }
1242
1243         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1244         if (!sub) {
1245                 return;
1246         }
1247
1248         if (sub->handler->subscription_shutdown) {
1249                 sub->handler->subscription_shutdown(sub);
1250         }
1251         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1252 }
1253
1254 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
1255 {
1256         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1257
1258         if (!sub) {
1259                 return;
1260         }
1261
1262         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
1263             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
1264                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
1265         }
1266 }
1267
1268 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
1269                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
1270                 struct ast_sip_subscription_response_data *response_data)
1271 {
1272         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
1273         *p_st_code = response_data->status_code;
1274
1275         if (!ast_strlen_zero(response_data->status_text)) {
1276                 pj_strdup2(pool, *p_st_text, response_data->status_text);
1277         }
1278
1279         if (response_data->headers) {
1280                 struct ast_variable *iter;
1281                 for (iter = response_data->headers; iter; iter = iter->next) {
1282                         pj_str_t header_name;
1283                         pj_str_t header_value;
1284                         pjsip_generic_string_hdr *hdr;
1285
1286                         pj_cstr(&header_name, iter->name);
1287                         pj_cstr(&header_value, iter->value);
1288                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1289                         pj_list_insert_before(res_hdr, hdr);
1290                 }
1291         }
1292
1293         if (response_data->body) {
1294                 pj_str_t type;
1295                 pj_str_t subtype;
1296                 pj_str_t body_text;
1297
1298                 pj_cstr(&type, response_data->body->type);
1299                 pj_cstr(&subtype, response_data->body->subtype);
1300                 pj_cstr(&body_text, response_data->body->body_text);
1301
1302                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1303         }
1304 }
1305
1306 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1307 {
1308         if (response_data->status_code != 200 ||
1309                         !ast_strlen_zero(response_data->status_text) ||
1310                         response_data->headers ||
1311                         response_data->body) {
1312                 return 1;
1313         }
1314         return 0;
1315 }
1316
1317 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1318                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1319 {
1320         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1321         struct ast_sip_subscription_response_data response_data = {
1322                 .status_code = 200,
1323         };
1324
1325         if (!sub) {
1326                 return;
1327         }
1328
1329         if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
1330                 sub->handler->subscription_terminated(sub, rdata);
1331                 return;
1332         }
1333
1334         sub->handler->resubscribe(sub, rdata, &response_data);
1335
1336         if (!response_data_changed(&response_data)) {
1337                 return;
1338         }
1339
1340         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1341                         res_hdr, p_body, &response_data);
1342 }
1343
1344 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1345                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1346 {
1347         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1348         struct ast_sip_subscription_response_data response_data = {
1349                 .status_code = 200,
1350         };
1351
1352         if (!sub || !sub->handler->notify_request) {
1353                 return;
1354         }
1355
1356         sub->handler->notify_request(sub, rdata, &response_data);
1357
1358         if (!response_data_changed(&response_data)) {
1359                 return;
1360         }
1361
1362         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1363                         res_hdr, p_body, &response_data);
1364 }
1365
1366 static int serialized_pubsub_on_client_refresh(void *userdata)
1367 {
1368         struct ast_sip_subscription *sub = userdata;
1369
1370         sub->handler->refresh_subscription(sub);
1371         ao2_cleanup(sub);
1372         return 0;
1373 }
1374
1375 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1376 {
1377         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1378
1379         ao2_ref(sub, +1);
1380         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1381 }
1382
1383 static int serialized_pubsub_on_server_timeout(void *userdata)
1384 {
1385         struct ast_sip_subscription *sub = userdata;
1386
1387         sub->handler->subscription_timeout(sub);
1388         ao2_cleanup(sub);
1389         return 0;
1390 }
1391
1392 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1393 {
1394         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1395
1396         if (!sub) {
1397                 /* if a subscription has been terminated and the subscription
1398                    timeout/expires is less than the time it takes for all pending
1399                    transactions to end then the subscription timer will not have
1400                    been canceled yet and sub will be null, so do nothing since
1401                    the subscription has already been terminated. */
1402                 return;
1403         }
1404
1405         ao2_ref(sub, +1);
1406         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1407 }
1408
1409 static int ami_subscription_detail(struct ast_sip_subscription *sub,
1410                                    struct ast_sip_ami *ami,
1411                                    const char *event)
1412 {
1413         RAII_VAR(struct ast_str *, buf,
1414                  ast_sip_create_ami_event(event, ami), ast_free);
1415
1416         if (!buf) {
1417                 return -1;
1418         }
1419
1420         sip_subscription_to_ami(sub, &buf);
1421         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
1422         return 0;
1423 }
1424
1425 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
1426 {
1427         return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
1428                 sub, arg, "InboundSubscriptionDetail") : 0;
1429 }
1430
1431 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
1432 {
1433         return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
1434                 sub, arg, "OutboundSubscriptionDetail") : 0;
1435 }
1436
1437 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
1438 {
1439         struct ast_sip_ami ami = { .s = s, .m = m };
1440         int num;
1441
1442         astman_send_listack(s, m, "Following are Events for "
1443                             "each inbound Subscription", "start");
1444
1445         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
1446
1447         astman_append(s,
1448                       "Event: InboundSubscriptionDetailComplete\r\n"
1449                       "EventList: Complete\r\n"
1450                       "ListItems: %d\r\n\r\n", num);
1451         return 0;
1452 }
1453
1454 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
1455 {
1456         struct ast_sip_ami ami = { .s = s, .m = m };
1457         int num;
1458
1459         astman_send_listack(s, m, "Following are Events for "
1460                             "each outbound Subscription", "start");
1461
1462         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
1463
1464         astman_append(s,
1465                       "Event: OutboundSubscriptionDetailComplete\r\n"
1466                       "EventList: Complete\r\n"
1467                       "ListItems: %d\r\n\r\n", num);
1468         return 0;
1469 }
1470
1471 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
1472 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
1473
1474 static int load_module(void)
1475 {
1476         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1477
1478         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1479
1480         if (!(sched = ast_sched_context_create())) {
1481                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1482                 return AST_MODULE_LOAD_FAILURE;
1483         }
1484
1485         if (ast_sched_start_thread(sched)) {
1486                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1487                 ast_sched_context_destroy(sched);
1488                 return AST_MODULE_LOAD_FAILURE;
1489         }
1490
1491         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1492
1493         if (ast_sip_register_service(&pubsub_module)) {
1494                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1495                 ast_sched_context_destroy(sched);
1496                 return AST_MODULE_LOAD_FAILURE;
1497         }
1498
1499         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
1500                                  ami_show_subscriptions_inbound);
1501         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
1502                                  ami_show_subscriptions_outbound);
1503
1504         return AST_MODULE_LOAD_SUCCESS;
1505 }
1506
1507 static int unload_module(void)
1508 {
1509         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
1510         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
1511
1512         if (sched) {
1513                 ast_sched_context_destroy(sched);
1514         }
1515
1516         return 0;
1517 }
1518
1519 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1520                 .load = load_module,
1521                 .unload = unload_module,
1522                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1523 );