security_events: Fix error caused by DTD validation error
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "res_pjsip/include/res_pjsip_private.h"
46
47 /*** DOCUMENTATION
48         <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
49                 <synopsis>
50                         Lists subscriptions.
51                 </synopsis>
52                 <syntax />
53                 <description>
54                         <para>
55                         Provides a listing of all inbound subscriptions.  An event <literal>InboundSubscriptionDetail</literal>
56                         is issued for each subscription object.  Once all detail events are completed an
57                         <literal>InboundSubscriptionDetailComplete</literal> event is issued.
58                         </para>
59                 </description>
60         </manager>
61         <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
62                 <synopsis>
63                         Lists subscriptions.
64                 </synopsis>
65                 <syntax />
66                 <description>
67                         <para>
68                         Provides a listing of all outbound subscriptions.  An event <literal>OutboundSubscriptionDetail</literal>
69                         is issued for each subscription object.  Once all detail events are completed an
70                         <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
71                         </para>
72                 </description>
73         </manager>
74  ***/
75
76 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
77
78 static struct pjsip_module pubsub_module = {
79         .name = { "PubSub Module", 13 },
80         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
81         .on_rx_request = pubsub_on_rx_request,
82 };
83
84 #define MOD_DATA_BODY_GENERATOR "sub_body_generator"
85
86 static const pj_str_t str_event_name = { "Event", 5 };
87
88 /*! \brief Scheduler used for automatically expiring publications */
89 static struct ast_sched_context *sched;
90
91 /*! \brief Number of buckets for publications (on a per handler) */
92 #define PUBLICATIONS_BUCKETS 37
93
94 /*! \brief Default expiration time for PUBLISH if one is not specified */
95 #define DEFAULT_PUBLISH_EXPIRES 3600
96
97 /*! \brief Defined method for PUBLISH */
98 const pjsip_method pjsip_publish_method =
99 {
100         PJSIP_OTHER_METHOD,
101         { "PUBLISH", 7 }
102 };
103
104 /*!
105  * \brief The types of PUBLISH messages defined in RFC 3903
106  */
107 enum sip_publish_type {
108         /*!
109          * \brief Unknown
110          *
111          * \details
112          * This actually is not defined in RFC 3903. We use this as a constant
113          * to indicate that an incoming PUBLISH does not fit into any of the
114          * other categories and is thus invalid.
115          */
116         SIP_PUBLISH_UNKNOWN,
117
118         /*!
119          * \brief Initial
120          *
121          * \details
122          * The first PUBLISH sent. This will contain a non-zero Expires header
123          * as well as a body that indicates the current state of the endpoint
124          * that has sent the message. The initial PUBLISH is the only type
125          * of PUBLISH to not contain a Sip-If-Match header in it.
126          */
127         SIP_PUBLISH_INITIAL,
128
129         /*!
130          * \brief Refresh
131          *
132          * \details
133          * Used to keep a published state from expiring. This will contain a
134          * non-zero Expires header but no body since its purpose is not to
135          * update state.
136          */
137         SIP_PUBLISH_REFRESH,
138
139         /*!
140          * \brief Modify
141          *
142          * \details
143          * Used to change state from its previous value. This will contain
144          * a body updating the published state. May or may not contain an
145          * Expires header.
146          */
147         SIP_PUBLISH_MODIFY,
148
149         /*!
150          * \brief Remove
151          *
152          * \details
153          * Used to remove published state from an ESC. This will contain
154          * an Expires header set to 0 and likely no body.
155          */
156         SIP_PUBLISH_REMOVE,
157 };
158
159 /*!
160  * Used to create new entity IDs by ESCs.
161  */
162 static int esc_etag_counter;
163
164 /*!
165  * \brief Structure representing a SIP publication
166  */
167 struct ast_sip_publication {
168         /*! Publication datastores set up by handlers */
169         struct ao2_container *datastores;
170         /*! \brief Entity tag for the publication */
171         int entity_tag;
172         /*! \brief Handler for this publication */
173         struct ast_sip_publish_handler *handler;
174         /*! \brief The endpoint with which the subscription is communicating */
175         struct ast_sip_endpoint *endpoint;
176         /*! \brief Expiration time of the publication */
177         int expires;
178         /*! \brief Scheduled item for expiration of publication */
179         int sched_id;
180 };
181
182 /*!
183  * \brief Structure representing a SIP subscription
184  */
185 struct ast_sip_subscription {
186         /*! Subscription datastores set up by handlers */
187         struct ao2_container *datastores;
188         /*! The endpoint with which the subscription is communicating */
189         struct ast_sip_endpoint *endpoint;
190         /*! Serializer on which to place operations for this subscription */
191         struct ast_taskprocessor *serializer;
192         /*! The handler for this subscription */
193         const struct ast_sip_subscription_handler *handler;
194         /*! The role for this subscription */
195         enum ast_sip_subscription_role role;
196         /*! The underlying PJSIP event subscription structure */
197         pjsip_evsub *evsub;
198         /*! The underlying PJSIP dialog */
199         pjsip_dialog *dlg;
200         /*! Body generaator for NOTIFYs */
201         struct ast_sip_pubsub_body_generator *body_generator;
202         /*! Next item in the list */
203         AST_LIST_ENTRY(ast_sip_subscription) next;
204 };
205
206 static const char *sip_subscription_roles_map[] = {
207         [AST_SIP_SUBSCRIBER] = "Subscriber",
208         [AST_SIP_NOTIFIER] = "Notifier"
209 };
210
211 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
212
213 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
214 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
215
216 static void add_subscription(struct ast_sip_subscription *obj)
217 {
218         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
219         AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
220         ast_module_ref(ast_module_info->self);
221 }
222
223 static void remove_subscription(struct ast_sip_subscription *obj)
224 {
225         struct ast_sip_subscription *i;
226         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
227         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
228                 if (i == obj) {
229                         AST_RWLIST_REMOVE_CURRENT(next);
230                         ast_module_unref(ast_module_info->self);
231                         break;
232                 }
233         }
234         AST_RWLIST_TRAVERSE_SAFE_END;
235 }
236
237 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
238
239 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
240 {
241         int num = 0;
242         struct ast_sip_subscription *i;
243         SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
244
245         if (!on_subscription) {
246                 return num;
247         }
248
249         AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
250                 if (on_subscription(i, arg)) {
251                         break;
252                 }
253                 ++num;
254         }
255         return num;
256 }
257
258 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
259                                     struct ast_str **buf)
260 {
261         char str[256];
262         struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
263
264         ast_str_append(buf, 0, "Role: %s\r\n",
265                        sip_subscription_roles_map[sub->role]);
266         ast_str_append(buf, 0, "Endpoint: %s\r\n",
267                        ast_sorcery_object_get_id(sub->endpoint));
268
269         ast_copy_pj_str(str, &sub->dlg->call_id->id, sizeof(str));
270         ast_str_append(buf, 0, "Callid: %s\r\n", str);
271
272         ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
273                                ast_sip_subscription_get_evsub(sub)));
274
275         ast_callerid_merge(str, sizeof(str),
276                            S_COR(id->self.name.valid, id->self.name.str, NULL),
277                            S_COR(id->self.number.valid, id->self.number.str, NULL),
278                            "Unknown");
279
280         ast_str_append(buf, 0, "Callerid: %s\r\n", str);
281
282         if (sub->handler->to_ami) {
283                 sub->handler->to_ami(sub, buf);
284         }
285 }
286
287 #define DATASTORE_BUCKETS 53
288
289 #define DEFAULT_EXPIRES 3600
290
291 static int datastore_hash(const void *obj, int flags)
292 {
293         const struct ast_datastore *datastore = obj;
294         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
295
296         ast_assert(uid != NULL);
297
298         return ast_str_hash(uid);
299 }
300
301 static int datastore_cmp(void *obj, void *arg, int flags)
302 {
303         const struct ast_datastore *datastore1 = obj;
304         const struct ast_datastore *datastore2 = arg;
305         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
306
307         ast_assert(datastore1->uid != NULL);
308         ast_assert(uid2 != NULL);
309
310         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
311 }
312
313 static int subscription_remove_serializer(void *obj)
314 {
315         struct ast_sip_subscription *sub = obj;
316
317         /* This is why we keep the dialog on the subscription. When the subscription
318          * is destroyed, there is no guarantee that the underlying dialog is ready
319          * to be destroyed. Furthermore, there's no guarantee in the opposite direction
320          * either. The dialog could be destroyed before our subscription is. We fix
321          * this problem by keeping a reference to the dialog until it is time to
322          * destroy the subscription. We need to have the dialog available when the
323          * subscription is destroyed so that we can guarantee that our attempt to
324          * remove the serializer will be successful.
325          */
326         ast_sip_dialog_set_serializer(sub->dlg, NULL);
327         pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
328
329         return 0;
330 }
331
332 static void subscription_destructor(void *obj)
333 {
334         struct ast_sip_subscription *sub = obj;
335
336         ast_debug(3, "Destroying SIP subscription\n");
337         remove_subscription(sub);
338
339         ao2_cleanup(sub->datastores);
340         ao2_cleanup(sub->endpoint);
341
342         if (sub->dlg) {
343                 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
344         }
345         ast_taskprocessor_unreference(sub->serializer);
346 }
347
348 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
349 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
350 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
351                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
352 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
353                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
354 static void pubsub_on_client_refresh(pjsip_evsub *sub);
355 static void pubsub_on_server_timeout(pjsip_evsub *sub);
356
357
358 static pjsip_evsub_user pubsub_cb = {
359         .on_evsub_state = pubsub_on_evsub_state,
360         .on_tsx_state = pubsub_on_tsx_state,
361         .on_rx_refresh = pubsub_on_rx_refresh,
362         .on_rx_notify = pubsub_on_rx_notify,
363         .on_client_refresh = pubsub_on_client_refresh,
364         .on_server_timeout = pubsub_on_server_timeout,
365 };
366
367 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
368                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
369 {
370         pjsip_evsub *evsub;
371         /* PJSIP is kind enough to have some built-in support for certain
372          * events. We need to use the correct initialization function for the
373          * built-in events
374          */
375         if (role == AST_SIP_NOTIFIER) {
376                 if (!strcmp(event, "message-summary")) {
377                         pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub);
378                 } else {
379                         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
380                 }
381         } else {
382                 if (!strcmp(event, "message-summary")) {
383                         pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub);
384                 } else {
385                         pj_str_t pj_event;
386                         pj_cstr(&pj_event, event);
387                         pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
388                 }
389         }
390         return evsub;
391 }
392
393 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
394                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
395 {
396         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
397         pjsip_dialog *dlg;
398
399         if (!sub) {
400                 return NULL;
401         }
402         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
403         if (!sub->datastores) {
404                 ao2_ref(sub, -1);
405                 return NULL;
406         }
407         sub->serializer = ast_sip_create_serializer();
408         if (!sub->serializer) {
409                 ao2_ref(sub, -1);
410                 return NULL;
411         }
412         sub->body_generator = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
413                         pubsub_module.id, MOD_DATA_BODY_GENERATOR);
414         sub->role = role;
415         if (role == AST_SIP_NOTIFIER) {
416                 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
417         } else {
418                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
419
420                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
421                 if (!contact || ast_strlen_zero(contact->uri)) {
422                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
423                                         ast_sorcery_object_get_id(endpoint));
424                         ao2_ref(sub, -1);
425                         return NULL;
426                 }
427                 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
428         }
429         if (!dlg) {
430                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
431                 ao2_ref(sub, -1);
432                 return NULL;
433         }
434         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
435         /* We keep a reference to the dialog until our subscription is destroyed. See
436          * the subscription_destructor for more details
437          */
438         pjsip_dlg_inc_session(dlg, &pubsub_module);
439         sub->dlg = dlg;
440         ast_sip_dialog_set_serializer(dlg, sub->serializer);
441         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
442         ao2_ref(endpoint, +1);
443         sub->endpoint = endpoint;
444         sub->handler = handler;
445
446         add_subscription(sub);
447         return sub;
448 }
449
450 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
451 {
452         ast_assert(sub->endpoint != NULL);
453         ao2_ref(sub->endpoint, +1);
454         return sub->endpoint;
455 }
456
457 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
458 {
459         ast_assert(sub->serializer != NULL);
460         return sub->serializer;
461 }
462
463 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
464 {
465         return sub->evsub;
466 }
467
468 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
469 {
470         return sub->dlg;
471 }
472
473 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
474 {
475         return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
476                         tdata) == PJ_SUCCESS ? 0 : -1;
477 }
478
479 static void subscription_datastore_destroy(void *obj)
480 {
481         struct ast_datastore *datastore = obj;
482
483         /* Using the destroy function (if present) destroy the data */
484         if (datastore->info->destroy != NULL && datastore->data != NULL) {
485                 datastore->info->destroy(datastore->data);
486                 datastore->data = NULL;
487         }
488
489         ast_free((void *) datastore->uid);
490         datastore->uid = NULL;
491 }
492
493 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
494 {
495         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
496         const char *uid_ptr = uid;
497
498         if (!info) {
499                 return NULL;
500         }
501
502         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
503         if (!datastore) {
504                 return NULL;
505         }
506
507         datastore->info = info;
508         if (ast_strlen_zero(uid)) {
509                 /* They didn't provide an ID so we'll provide one ourself */
510                 struct ast_uuid *uuid = ast_uuid_generate();
511                 char uuid_buf[AST_UUID_STR_LEN];
512                 if (!uuid) {
513                         return NULL;
514                 }
515                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
516                 ast_free(uuid);
517         }
518
519         datastore->uid = ast_strdup(uid_ptr);
520         if (!datastore->uid) {
521                 return NULL;
522         }
523
524         ao2_ref(datastore, +1);
525         return datastore;
526 }
527
528 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
529 {
530         ast_assert(datastore != NULL);
531         ast_assert(datastore->info != NULL);
532         ast_assert(!ast_strlen_zero(datastore->uid));
533
534         if (!ao2_link(subscription->datastores, datastore)) {
535                 return -1;
536         }
537         return 0;
538 }
539
540 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
541 {
542         return ao2_find(subscription->datastores, name, OBJ_KEY);
543 }
544
545 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
546 {
547         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
548 }
549
550 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
551 {
552         ast_assert(datastore != NULL);
553         ast_assert(datastore->info != NULL);
554         ast_assert(!ast_strlen_zero(datastore->uid));
555
556         if (!ao2_link(publication->datastores, datastore)) {
557                 return -1;
558         }
559         return 0;
560 }
561
562 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
563 {
564         return ao2_find(publication->datastores, name, OBJ_KEY);
565 }
566
567 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
568 {
569         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
570 }
571
572 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
573
574 static int publication_hash_fn(const void *obj, const int flags)
575 {
576         const struct ast_sip_publication *publication = obj;
577         const int *entity_tag = obj;
578
579         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
580 }
581
582 static int publication_cmp_fn(void *obj, void *arg, int flags)
583 {
584         const struct ast_sip_publication *publication1 = obj;
585         const struct ast_sip_publication *publication2 = arg;
586         const int *entity_tag = arg;
587
588         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
589                 CMP_MATCH | CMP_STOP : 0);
590 }
591
592 static void publish_add_handler(struct ast_sip_publish_handler *handler)
593 {
594         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
595         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
596 }
597
598 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
599 {
600         if (ast_strlen_zero(handler->event_name)) {
601                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
602                 return -1;
603         }
604
605         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
606                 publication_hash_fn, publication_cmp_fn))) {
607                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
608                         handler->event_name);
609                 return -1;
610         }
611
612         publish_add_handler(handler);
613
614         ast_module_ref(ast_module_info->self);
615
616         return 0;
617 }
618
619 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
620 {
621         struct ast_sip_publish_handler *iter;
622         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
623         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
624                 if (handler == iter) {
625                         AST_RWLIST_REMOVE_CURRENT(next);
626                         ao2_cleanup(handler->publications);
627                         ast_module_unref(ast_module_info->self);
628                         break;
629                 }
630         }
631         AST_RWLIST_TRAVERSE_SAFE_END;
632 }
633
634 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
635
636 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
637 {
638         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
639         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
640         ast_module_ref(ast_module_info->self);
641 }
642
643 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
644 {
645         struct ast_sip_subscription_handler *iter;
646         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
647
648         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
649                 if (!strcmp(iter->event_name, event_name)) {
650                         break;
651                 }
652         }
653         return iter;
654 }
655
656 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
657 {
658         pj_str_t event;
659         pj_str_t accept[AST_SIP_MAX_ACCEPT];
660         struct ast_sip_subscription_handler *existing;
661         int i;
662
663         if (ast_strlen_zero(handler->event_name)) {
664                 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
665                 return -1;
666         }
667
668         existing = find_sub_handler_for_event_name(handler->event_name);
669         if (existing) {
670                 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
671                                 "A handler is already registered\n", handler->event_name);
672                 return -1;
673         }
674
675         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
676                 pj_cstr(&accept[i], handler->accept[i]);
677         }
678
679         pj_cstr(&event, handler->event_name);
680
681         if (!strcmp(handler->event_name, "message-summary")) {
682                 pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
683         } else {
684                 pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
685         }
686
687         sub_add_handler(handler);
688         return 0;
689 }
690
691 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
692 {
693         struct ast_sip_subscription_handler *iter;
694         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
695         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
696                 if (handler == iter) {
697                         AST_RWLIST_REMOVE_CURRENT(next);
698                         ast_module_unref(ast_module_info->self);
699                         break;
700                 }
701         }
702         AST_RWLIST_TRAVERSE_SAFE_END;
703 }
704
705 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
706                 const char *content_subtype)
707 {
708         struct ast_sip_pubsub_body_generator *iter;
709         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
710
711         AST_LIST_TRAVERSE(&body_generators, iter, list) {
712                 if (!strcmp(iter->type, content_type) &&
713                                 !strcmp(iter->subtype, content_subtype)) {
714                         break;
715                 }
716         };
717
718         return iter;
719 }
720
721 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
722 {
723         char *accept_copy = ast_strdupa(accept);
724         char *subtype = accept_copy;
725         char *type = strsep(&subtype, "/");
726
727         if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
728                 return NULL;
729         }
730
731         return find_body_generator_type_subtype(type, subtype);
732 }
733
734 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
735                 size_t num_accept)
736 {
737         int i;
738         struct ast_sip_pubsub_body_generator *generator = NULL;
739
740         for (i = 0; i < num_accept; ++i) {
741                 generator = find_body_generator_accept(accept[i]);
742                 if (generator) {
743                         ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
744                         break;
745                 } else {
746                         ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
747                 }
748         }
749
750         return generator;
751 }
752
753 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
754 {
755         char event[32];
756         char accept[AST_SIP_MAX_ACCEPT][64];
757         pjsip_accept_hdr *accept_header;
758         pjsip_event_hdr *event_header;
759         pjsip_expires_hdr *expires_header;
760         struct ast_sip_subscription_handler *handler;
761         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
762         struct ast_sip_subscription *sub;
763         size_t num_accept_headers;
764         struct ast_sip_pubsub_body_generator *generator;
765
766         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
767         ast_assert(endpoint != NULL);
768
769         if (!endpoint->subscription.allow) {
770                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
771                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
772                 return PJ_TRUE;
773         }
774
775         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
776
777         if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) {
778                 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %d\n",
779                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
780                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
781                 return PJ_TRUE;
782         }
783
784         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
785         if (!event_header) {
786                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
787                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
788                 return PJ_TRUE;
789         }
790         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
791
792         handler = find_sub_handler_for_event_name(event);
793         if (!handler) {
794                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
795                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
796                 return PJ_TRUE;
797         }
798
799         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
800         if (accept_header) {
801                 int i;
802
803                 for (i = 0; i < accept_header->count; ++i) {
804                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
805                 }
806                 num_accept_headers = accept_header->count;
807         } else {
808                 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
809                  * the default accept type for the event package is to be used.
810                  */
811                 ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
812                 num_accept_headers = 1;
813         }
814
815         generator = find_body_generator(accept, num_accept_headers);
816         if (!generator) {
817                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
818                 return PJ_TRUE;
819         }
820
821         ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
822                         pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
823
824         sub = handler->new_subscribe(endpoint, rdata);
825         if (!sub) {
826                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
827
828                 if (trans) {
829                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
830                         pjsip_tx_data *tdata;
831
832                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
833                                 return PJ_TRUE;
834                         }
835                         pjsip_dlg_send_response(dlg, trans, tdata);
836                 } else {
837                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
838                 }
839         }
840         return PJ_TRUE;
841 }
842
843 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
844 {
845         struct ast_sip_publish_handler *iter = NULL;
846         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
847
848         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
849                 if (strcmp(event, iter->event_name)) {
850                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
851                         continue;
852                 }
853                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
854                 break;
855         }
856
857         return iter;
858 }
859
860 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
861         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
862 {
863         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
864
865         if (etag_hdr) {
866                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
867
868                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
869
870                 if (sscanf(etag, "%30d", entity_id) != 1) {
871                         return SIP_PUBLISH_UNKNOWN;
872                 }
873         }
874
875         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
876
877         if (!(*expires)) {
878                 return SIP_PUBLISH_REMOVE;
879         } else if (!etag_hdr && rdata->msg_info.msg->body) {
880                 return SIP_PUBLISH_INITIAL;
881         } else if (etag_hdr && !rdata->msg_info.msg->body) {
882                 return SIP_PUBLISH_REFRESH;
883         } else if (etag_hdr && rdata->msg_info.msg->body) {
884                 return SIP_PUBLISH_MODIFY;
885         }
886
887         return SIP_PUBLISH_UNKNOWN;
888 }
889
890 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
891         struct ast_sip_publish_handler *handler)
892 {
893         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
894
895         if (!publication) {
896                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
897                 return NULL;
898         }
899
900         publication->handler = handler;
901
902         return publication;
903 }
904
905 static int publish_expire_callback(void *data)
906 {
907         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
908
909         publication->handler->publish_expire(publication);
910
911         return 0;
912 }
913
914 static int publish_expire(const void *data)
915 {
916         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
917
918         ao2_unlink(publication->handler->publications, publication);
919         publication->sched_id = -1;
920
921         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
922                 ao2_cleanup(publication);
923         }
924
925         return 0;
926 }
927
928 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
929 {
930         pjsip_event_hdr *event_header;
931         struct ast_sip_publish_handler *handler;
932         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
933         char event[32];
934         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
935         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
936         enum sip_publish_type publish_type;
937         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
938         int expires = 0, entity_id;
939
940         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
941         ast_assert(endpoint != NULL);
942
943         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
944         if (!event_header) {
945                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
946                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
947                 return PJ_TRUE;
948         }
949         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
950
951         handler = find_pub_handler(event);
952         if (!handler) {
953                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
954                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
955                 return PJ_TRUE;
956         }
957
958         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
959
960         /* If this is not an initial publish ensure that a publication is present */
961         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
962                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
963                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
964
965                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
966                                 NULL, NULL);
967                         return PJ_TRUE;
968                 }
969
970                 /* Per the RFC every response has to have a new entity tag */
971                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
972
973                 /* Update the expires here so that the created responses will contain the correct value */
974                 publication->expires = expires;
975         }
976
977         switch (publish_type) {
978                 case SIP_PUBLISH_INITIAL:
979                         publication = publish_request_initial(endpoint, rdata, handler);
980                         break;
981                 case SIP_PUBLISH_REFRESH:
982                 case SIP_PUBLISH_MODIFY:
983                         if (handler->publish_refresh(publication, rdata)) {
984                                 /* If an error occurs we want to terminate the publication */
985                                 expires = 0;
986                         }
987                         break;
988                 case SIP_PUBLISH_REMOVE:
989                         handler->publish_termination(publication, rdata);
990                         break;
991                 case SIP_PUBLISH_UNKNOWN:
992                 default:
993                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
994                         break;
995         }
996
997         if (publication) {
998                 if (expires) {
999                         ao2_link(handler->publications, publication);
1000
1001                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1002                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1003                 } else {
1004                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1005                 }
1006         }
1007
1008         return PJ_TRUE;
1009 }
1010
1011 /*! \brief Internal destructor for publications */
1012 static void publication_destroy_fn(void *obj)
1013 {
1014         struct ast_sip_publication *publication = obj;
1015
1016         ast_debug(3, "Destroying SIP publication\n");
1017
1018         ao2_cleanup(publication->datastores);
1019         ao2_cleanup(publication->endpoint);
1020 }
1021
1022 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1023 {
1024         struct ast_sip_publication *publication;
1025         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1026
1027         ast_assert(endpoint != NULL);
1028
1029         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
1030                 return NULL;
1031         }
1032
1033         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1034                 ao2_ref(publication, -1);
1035                 return NULL;
1036         }
1037
1038         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1039         ao2_ref(endpoint, +1);
1040         publication->endpoint = endpoint;
1041         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1042         publication->sched_id = -1;
1043
1044         return publication;
1045 }
1046
1047 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1048 {
1049         return pub->endpoint;
1050 }
1051
1052 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
1053         pjsip_tx_data **tdata)
1054 {
1055         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
1056                 return -1;
1057         }
1058
1059         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1060                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1061                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1062
1063                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1064                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1065                         pjsip_tx_data_dec_ref(*tdata);
1066                         return -1;
1067                 }
1068
1069                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
1070                 ast_sip_add_header(*tdata, "Expires", expires);
1071         }
1072
1073         return 0;
1074 }
1075
1076 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
1077         pjsip_tx_data *tdata)
1078 {
1079         pj_status_t status;
1080         pjsip_transaction *tsx;
1081
1082         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1083                 return status;
1084         }
1085
1086         pjsip_tsx_recv_msg(tsx, rdata);
1087
1088         return pjsip_tsx_send_msg(tsx, tdata);
1089 }
1090
1091 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1092 {
1093         struct ast_sip_pubsub_body_generator *existing;
1094         pj_str_t accept;
1095         pj_size_t accept_len;
1096
1097         existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1098         if (existing) {
1099                 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1100                                 "One is already registered.\n", generator->type, generator->subtype);
1101                 return -1;
1102         }
1103
1104         AST_RWLIST_WRLOCK(&body_generators);
1105         AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1106         AST_RWLIST_UNLOCK(&body_generators);
1107
1108         /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1109          * null-terminated, so there is no need to allocate for the extra null
1110          * byte
1111          */
1112         accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1113
1114         accept.ptr = alloca(accept_len);
1115         accept.slen = accept_len;
1116         /* Safe use of sprintf */
1117         sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1118         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1119                         PJSIP_H_ACCEPT, NULL, 1, &accept);
1120
1121         return 0;
1122 }
1123
1124 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1125 {
1126         struct ast_sip_pubsub_body_generator *iter;
1127         SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1128
1129         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1130                 if (iter == generator) {
1131                         AST_LIST_REMOVE_CURRENT(list);
1132                         break;
1133                 }
1134         }
1135         AST_RWLIST_TRAVERSE_SAFE_END;
1136 }
1137
1138 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1139 {
1140         AST_RWLIST_WRLOCK(&body_supplements);
1141         AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1142         AST_RWLIST_UNLOCK(&body_supplements);
1143
1144         return 0;
1145 }
1146
1147 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1148 {
1149         struct ast_sip_pubsub_body_supplement *iter;
1150         SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1151
1152         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1153                 if (iter == supplement) {
1154                         AST_LIST_REMOVE_CURRENT(list);
1155                         break;
1156                 }
1157         }
1158         AST_RWLIST_TRAVERSE_SAFE_END;
1159 }
1160
1161 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1162 {
1163         return sub->body_generator->type;
1164 }
1165
1166 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1167 {
1168         return sub->body_generator->subtype;
1169 }
1170
1171 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1172                 void *data, struct ast_str **str)
1173 {
1174         struct ast_sip_pubsub_body_supplement *supplement;
1175         struct ast_sip_pubsub_body_generator *generator;
1176         int res;
1177         void *body;
1178
1179         generator = find_body_generator_type_subtype(type, subtype);
1180         if (!generator) {
1181                 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1182                                 type, subtype);
1183                 return -1;
1184         }
1185
1186         body = generator->allocate_body(data);
1187         if (!body) {
1188                 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1189                                 type, subtype);
1190                 return -1;
1191         }
1192
1193         if (generator->generate_body_content(body, data)) {
1194                 res = -1;
1195                 goto end;
1196         }
1197
1198         AST_RWLIST_RDLOCK(&body_supplements);
1199         AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1200                 if (!strcmp(generator->type, supplement->type) &&
1201                                 !strcmp(generator->subtype, supplement->subtype)) {
1202                         res = supplement->supplement_body(body, data);
1203                         if (res) {
1204                                 break;
1205                         }
1206                 }
1207         }
1208         AST_RWLIST_UNLOCK(&body_supplements);
1209
1210         if (!res) {
1211                 generator->to_string(body, str);
1212         }
1213
1214 end:
1215         if (generator->destroy_body) {
1216                 generator->destroy_body(body);
1217         }
1218
1219         return res;
1220 }
1221
1222 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1223 {
1224         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1225                 return pubsub_on_rx_subscribe_request(rdata);
1226         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1227                 return pubsub_on_rx_publish_request(rdata);
1228         }
1229
1230         return PJ_FALSE;
1231 }
1232
1233 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1234 {
1235         struct ast_sip_subscription *sub;
1236         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1237                 return;
1238         }
1239
1240         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1241         if (!sub) {
1242                 return;
1243         }
1244
1245         if (sub->handler->subscription_shutdown) {
1246                 sub->handler->subscription_shutdown(sub);
1247         }
1248         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1249 }
1250
1251 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
1252 {
1253         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1254
1255         if (!sub) {
1256                 return;
1257         }
1258
1259         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
1260             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
1261                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
1262         }
1263 }
1264
1265 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
1266                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
1267                 struct ast_sip_subscription_response_data *response_data)
1268 {
1269         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
1270         *p_st_code = response_data->status_code;
1271
1272         if (!ast_strlen_zero(response_data->status_text)) {
1273                 pj_strdup2(pool, *p_st_text, response_data->status_text);
1274         }
1275
1276         if (response_data->headers) {
1277                 struct ast_variable *iter;
1278                 for (iter = response_data->headers; iter; iter = iter->next) {
1279                         pj_str_t header_name;
1280                         pj_str_t header_value;
1281                         pjsip_generic_string_hdr *hdr;
1282
1283                         pj_cstr(&header_name, iter->name);
1284                         pj_cstr(&header_value, iter->value);
1285                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1286                         pj_list_insert_before(res_hdr, hdr);
1287                 }
1288         }
1289
1290         if (response_data->body) {
1291                 pj_str_t type;
1292                 pj_str_t subtype;
1293                 pj_str_t body_text;
1294
1295                 pj_cstr(&type, response_data->body->type);
1296                 pj_cstr(&subtype, response_data->body->subtype);
1297                 pj_cstr(&body_text, response_data->body->body_text);
1298
1299                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1300         }
1301 }
1302
1303 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1304 {
1305         if (response_data->status_code != 200 ||
1306                         !ast_strlen_zero(response_data->status_text) ||
1307                         response_data->headers ||
1308                         response_data->body) {
1309                 return 1;
1310         }
1311         return 0;
1312 }
1313
1314 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1315                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1316 {
1317         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1318         struct ast_sip_subscription_response_data response_data = {
1319                 .status_code = 200,
1320         };
1321
1322         if (!sub) {
1323                 return;
1324         }
1325
1326         if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
1327                 sub->handler->subscription_terminated(sub, rdata);
1328                 return;
1329         }
1330
1331         sub->handler->resubscribe(sub, rdata, &response_data);
1332
1333         if (!response_data_changed(&response_data)) {
1334                 return;
1335         }
1336
1337         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1338                         res_hdr, p_body, &response_data);
1339 }
1340
1341 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1342                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1343 {
1344         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1345         struct ast_sip_subscription_response_data response_data = {
1346                 .status_code = 200,
1347         };
1348
1349         if (!sub || !sub->handler->notify_request) {
1350                 return;
1351         }
1352
1353         sub->handler->notify_request(sub, rdata, &response_data);
1354
1355         if (!response_data_changed(&response_data)) {
1356                 return;
1357         }
1358
1359         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1360                         res_hdr, p_body, &response_data);
1361 }
1362
1363 static int serialized_pubsub_on_client_refresh(void *userdata)
1364 {
1365         struct ast_sip_subscription *sub = userdata;
1366
1367         sub->handler->refresh_subscription(sub);
1368         ao2_cleanup(sub);
1369         return 0;
1370 }
1371
1372 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1373 {
1374         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1375
1376         ao2_ref(sub, +1);
1377         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1378 }
1379
1380 static int serialized_pubsub_on_server_timeout(void *userdata)
1381 {
1382         struct ast_sip_subscription *sub = userdata;
1383
1384         sub->handler->subscription_timeout(sub);
1385         ao2_cleanup(sub);
1386         return 0;
1387 }
1388
1389 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1390 {
1391         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1392
1393         if (!sub) {
1394                 /* if a subscription has been terminated and the subscription
1395                    timeout/expires is less than the time it takes for all pending
1396                    transactions to end then the subscription timer will not have
1397                    been canceled yet and sub will be null, so do nothing since
1398                    the subscription has already been terminated. */
1399                 return;
1400         }
1401
1402         ao2_ref(sub, +1);
1403         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1404 }
1405
1406 static int ami_subscription_detail(struct ast_sip_subscription *sub,
1407                                    struct ast_sip_ami *ami,
1408                                    const char *event)
1409 {
1410         RAII_VAR(struct ast_str *, buf,
1411                  ast_sip_create_ami_event(event, ami), ast_free);
1412
1413         if (!buf) {
1414                 return -1;
1415         }
1416
1417         sip_subscription_to_ami(sub, &buf);
1418         astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
1419         return 0;
1420 }
1421
1422 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
1423 {
1424         return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
1425                 sub, arg, "InboundSubscriptionDetail") : 0;
1426 }
1427
1428 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
1429 {
1430         return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
1431                 sub, arg, "OutboundSubscriptionDetail") : 0;
1432 }
1433
1434 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
1435 {
1436         struct ast_sip_ami ami = { .s = s, .m = m };
1437         int num;
1438
1439         astman_send_listack(s, m, "Following are Events for "
1440                             "each inbound Subscription", "start");
1441
1442         num = for_each_subscription(ami_subscription_detail_inbound, &ami);
1443
1444         astman_append(s,
1445                       "Event: InboundSubscriptionDetailComplete\r\n"
1446                       "EventList: Complete\r\n"
1447                       "ListItems: %d\r\n\r\n", num);
1448         return 0;
1449 }
1450
1451 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
1452 {
1453         struct ast_sip_ami ami = { .s = s, .m = m };
1454         int num;
1455
1456         astman_send_listack(s, m, "Following are Events for "
1457                             "each outbound Subscription", "start");
1458
1459         num = for_each_subscription(ami_subscription_detail_outbound, &ami);
1460
1461         astman_append(s,
1462                       "Event: OutboundSubscriptionDetailComplete\r\n"
1463                       "EventList: Complete\r\n"
1464                       "ListItems: %d\r\n\r\n", num);
1465         return 0;
1466 }
1467
1468 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
1469 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
1470
1471 static int load_module(void)
1472 {
1473         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1474
1475         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1476
1477         if (!(sched = ast_sched_context_create())) {
1478                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1479                 return AST_MODULE_LOAD_FAILURE;
1480         }
1481
1482         if (ast_sched_start_thread(sched)) {
1483                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1484                 ast_sched_context_destroy(sched);
1485                 return AST_MODULE_LOAD_FAILURE;
1486         }
1487
1488         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1489
1490         if (ast_sip_register_service(&pubsub_module)) {
1491                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1492                 ast_sched_context_destroy(sched);
1493                 return AST_MODULE_LOAD_FAILURE;
1494         }
1495
1496         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
1497                                  ami_show_subscriptions_inbound);
1498         ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
1499                                  ami_show_subscriptions_outbound);
1500
1501         return AST_MODULE_LOAD_SUCCESS;
1502 }
1503
1504 static int unload_module(void)
1505 {
1506         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
1507         ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
1508
1509         if (sched) {
1510                 ast_sched_context_destroy(sched);
1511         }
1512
1513         return 0;
1514 }
1515
1516 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1517                 .load = load_module,
1518                 .unload = unload_module,
1519                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1520 );