df6155469128d39dcbcf4a823173b59d9998bd70
[asterisk/asterisk.git] / res / res_sip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_sip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_sip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_sip.h"
43
44 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
45
46 static struct pjsip_module pubsub_module = {
47         .name = { "PubSub Module", 13 },
48         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
49         .on_rx_request = pubsub_on_rx_request,
50 };
51
52 static const pj_str_t str_event_name = { "Event", 5 };
53
54 /*! \brief Scheduler used for automatically expiring publications */
55 static struct ast_sched_context *sched;
56
57 /*! \brief Number of buckets for publications (on a per handler) */
58 #define PUBLICATIONS_BUCKETS 37
59
60 /*! \brief Default expiration time for PUBLISH if one is not specified */
61 #define DEFAULT_PUBLISH_EXPIRES 3600
62
63 /*! \brief Defined method for PUBLISH */
64 const pjsip_method pjsip_publish_method =
65 {
66         PJSIP_OTHER_METHOD,
67         { "PUBLISH", 7 }
68 };
69
70 /*!
71  * \brief The types of PUBLISH messages defined in RFC 3903
72  */
73 enum sip_publish_type {
74         /*!
75          * \brief Unknown
76          *
77          * \details
78          * This actually is not defined in RFC 3903. We use this as a constant
79          * to indicate that an incoming PUBLISH does not fit into any of the
80          * other categories and is thus invalid.
81          */
82         SIP_PUBLISH_UNKNOWN,
83
84         /*!
85          * \brief Initial
86          *
87          * \details
88          * The first PUBLISH sent. This will contain a non-zero Expires header
89          * as well as a body that indicates the current state of the endpoint
90          * that has sent the message. The initial PUBLISH is the only type
91          * of PUBLISH to not contain a Sip-If-Match header in it.
92          */
93         SIP_PUBLISH_INITIAL,
94
95         /*!
96          * \brief Refresh
97          *
98          * \details
99          * Used to keep a published state from expiring. This will contain a
100          * non-zero Expires header but no body since its purpose is not to
101          * update state.
102          */
103         SIP_PUBLISH_REFRESH,
104
105         /*!
106          * \brief Modify
107          *
108          * \details
109          * Used to change state from its previous value. This will contain
110          * a body updating the published state. May or may not contain an
111          * Expires header.
112          */
113         SIP_PUBLISH_MODIFY,
114
115         /*!
116          * \brief Remove
117          *
118          * \details
119          * Used to remove published state from an ESC. This will contain
120          * an Expires header set to 0 and likely no body.
121          */
122         SIP_PUBLISH_REMOVE,
123 };
124
125 /*!
126  * Used to create new entity IDs by ESCs.
127  */
128 static int esc_etag_counter;
129
130 /*!
131  * \brief Structure representing a SIP publication
132  */
133 struct ast_sip_publication {
134         /*! Publication datastores set up by handlers */
135         struct ao2_container *datastores;
136         /*! \brief Entity tag for the publication */
137         int entity_tag;
138         /*! \brief Handler for this publication */
139         struct ast_sip_publish_handler *handler;
140         /*! \brief The endpoint with which the subscription is communicating */
141         struct ast_sip_endpoint *endpoint;
142         /*! \brief Expiration time of the publication */
143         int expires;
144         /*! \brief Scheduled item for expiration of publication */
145         int sched_id;
146 };
147
148 /*!
149  * \brief Structure representing a SIP subscription
150  */
151 struct ast_sip_subscription {
152         /*! Subscription datastores set up by handlers */
153         struct ao2_container *datastores;
154         /*! The endpoint with which the subscription is communicating */
155         struct ast_sip_endpoint *endpoint;
156         /*! Serializer on which to place operations for this subscription */
157         struct ast_taskprocessor *serializer;
158         /*! The handler for this subscription */
159         const struct ast_sip_subscription_handler *handler;
160         /*! The role for this subscription */
161         enum ast_sip_subscription_role role;
162         /*! The underlying PJSIP event subscription structure */
163         pjsip_evsub *evsub;
164         /*! The underlying PJSIP dialog */
165         pjsip_dialog *dlg;
166 };
167
168 #define DATASTORE_BUCKETS 53
169
170 #define DEFAULT_EXPIRES 3600
171
172 static int datastore_hash(const void *obj, int flags)
173 {
174         const struct ast_datastore *datastore = obj;
175         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
176
177         ast_assert(uid != NULL);
178
179         return ast_str_hash(uid);
180 }
181
182 static int datastore_cmp(void *obj, void *arg, int flags)
183 {
184         const struct ast_datastore *datastore1 = obj;
185         const struct ast_datastore *datastore2 = arg;
186         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
187
188         ast_assert(datastore1->uid != NULL);
189         ast_assert(uid2 != NULL);
190
191         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
192 }
193
194 static void subscription_destructor(void *obj)
195 {
196         struct ast_sip_subscription *sub = obj;
197
198         ast_debug(3, "Destroying SIP subscription\n");
199
200         ao2_cleanup(sub->datastores);
201         ao2_cleanup(sub->endpoint);
202
203         if (sub->dlg) {
204                 /* This is why we keep the dialog on the subscription. When the subscription
205                  * is destroyed, there is no guarantee that the underlying dialog is ready
206                  * to be destroyed. Furthermore, there's no guarantee in the opposite direction
207                  * either. The dialog could be destroyed before our subscription is. We fix
208                  * this problem by keeping a reference to the dialog until it is time to
209                  * destroy the subscription. We need to have the dialog available when the
210                  * subscription is destroyed so that we can guarantee that our attempt to
211                  * remove the serializer will be successful.
212                  */
213                 ast_sip_dialog_set_serializer(sub->dlg, NULL);
214                 pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
215         }
216         ast_taskprocessor_unreference(sub->serializer);
217 }
218
219 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
220 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
221 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
222                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
223 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
224                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
225 static void pubsub_on_client_refresh(pjsip_evsub *sub);
226 static void pubsub_on_server_timeout(pjsip_evsub *sub);
227
228
229 static pjsip_evsub_user pubsub_cb = {
230         .on_evsub_state = pubsub_on_evsub_state,
231         .on_tsx_state = pubsub_on_tsx_state,
232         .on_rx_refresh = pubsub_on_rx_refresh,
233         .on_rx_notify = pubsub_on_rx_notify,
234         .on_client_refresh = pubsub_on_client_refresh,
235         .on_server_timeout = pubsub_on_server_timeout,
236 };
237
238 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
239                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
240 {
241         pjsip_evsub *evsub;
242         /* PJSIP is kind enough to have some built-in support for certain
243          * events. We need to use the correct initialization function for the
244          * built-in events
245          */
246         if (role == AST_SIP_NOTIFIER) {
247                 if (!strcmp(event, "message-summary")) {
248                         pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub);
249                 } else {
250                         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
251                 }
252         } else {
253                 if (!strcmp(event, "message-summary")) {
254                         pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub);
255                 } else {
256                         pj_str_t pj_event;
257                         pj_cstr(&pj_event, event);
258                         pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
259                 }
260         }
261         return evsub;
262 }
263
264 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
265                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
266 {
267         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
268         pjsip_dialog *dlg;
269
270         if (!sub) {
271                 return NULL;
272         }
273         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
274         if (!sub->datastores) {
275                 ao2_ref(sub, -1);
276                 return NULL;
277         }
278         sub->serializer = ast_sip_create_serializer();
279         if (!sub->serializer) {
280                 ao2_ref(sub, -1);
281                 return NULL;
282         }
283         sub->role = role;
284         if (role == AST_SIP_NOTIFIER) {
285                 pjsip_dlg_create_uas(pjsip_ua_instance(), rdata, NULL, &dlg);
286         } else {
287                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
288
289                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
290                 if (!contact || ast_strlen_zero(contact->uri)) {
291                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
292                                         ast_sorcery_object_get_id(endpoint));
293                         ao2_ref(sub, -1);
294                         return NULL;
295                 }
296                 dlg = ast_sip_create_dialog(endpoint, contact->uri, NULL);
297         }
298         if (!dlg) {
299                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
300                 ao2_ref(sub, -1);
301                 return NULL;
302         }
303         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
304         /* We keep a reference to the dialog until our subscription is destroyed. See
305          * the subscription_destructor for more details
306          */
307         pjsip_dlg_inc_session(dlg, &pubsub_module);
308         sub->dlg = dlg;
309         ast_sip_dialog_set_serializer(dlg, sub->serializer);
310         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
311         ao2_ref(endpoint, +1);
312         sub->endpoint = endpoint;
313         sub->handler = handler;
314         return sub;
315 }
316
317 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
318 {
319         ast_assert(sub->endpoint != NULL);
320         ao2_ref(sub->endpoint, +1);
321         return sub->endpoint;
322 }
323
324 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
325 {
326         ast_assert(sub->serializer != NULL);
327         return sub->serializer;
328 }
329
330 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
331 {
332         return sub->evsub;
333 }
334
335 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
336 {
337         return sub->dlg;
338 }
339
340 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
341 {
342         return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
343                         tdata) == PJ_SUCCESS ? 0 : -1;
344 }
345
346 static void subscription_datastore_destroy(void *obj)
347 {
348         struct ast_datastore *datastore = obj;
349
350         /* Using the destroy function (if present) destroy the data */
351         if (datastore->info->destroy != NULL && datastore->data != NULL) {
352                 datastore->info->destroy(datastore->data);
353                 datastore->data = NULL;
354         }
355
356         ast_free((void *) datastore->uid);
357         datastore->uid = NULL;
358 }
359
360 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
361 {
362         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
363         const char *uid_ptr = uid;
364
365         if (!info) {
366                 return NULL;
367         }
368
369         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
370         if (!datastore) {
371                 return NULL;
372         }
373
374         datastore->info = info;
375         if (ast_strlen_zero(uid)) {
376                 /* They didn't provide an ID so we'll provide one ourself */
377                 struct ast_uuid *uuid = ast_uuid_generate();
378                 char uuid_buf[AST_UUID_STR_LEN];
379                 if (!uuid) {
380                         return NULL;
381                 }
382                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
383                 ast_free(uuid);
384         }
385
386         datastore->uid = ast_strdup(uid_ptr);
387         if (!datastore->uid) {
388                 return NULL;
389         }
390
391         ao2_ref(datastore, +1);
392         return datastore;
393 }
394
395 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
396 {
397         ast_assert(datastore != NULL);
398         ast_assert(datastore->info != NULL);
399         ast_assert(!ast_strlen_zero(datastore->uid));
400
401         if (!ao2_link(subscription->datastores, datastore)) {
402                 return -1;
403         }
404         return 0;
405 }
406
407 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
408 {
409         return ao2_find(subscription->datastores, name, OBJ_KEY);
410 }
411
412 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
413 {
414         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
415 }
416
417 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
418 {
419         ast_assert(datastore != NULL);
420         ast_assert(datastore->info != NULL);
421         ast_assert(!ast_strlen_zero(datastore->uid));
422
423         if (!ao2_link(publication->datastores, datastore)) {
424                 return -1;
425         }
426         return 0;
427 }
428
429 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
430 {
431         return ao2_find(publication->datastores, name, OBJ_KEY);
432 }
433
434 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
435 {
436         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
437 }
438
439 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
440
441 static int publication_hash_fn(const void *obj, const int flags)
442 {
443         const struct ast_sip_publication *publication = obj;
444         const int *entity_tag = obj;
445
446         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
447 }
448
449 static int publication_cmp_fn(void *obj, void *arg, int flags)
450 {
451         const struct ast_sip_publication *publication1 = obj;
452         const struct ast_sip_publication *publication2 = arg;
453         const int *entity_tag = arg;
454
455         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
456                 CMP_MATCH | CMP_STOP : 0);
457 }
458
459 static void publish_add_handler(struct ast_sip_publish_handler *handler)
460 {
461         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
462         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
463 }
464
465 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
466 {
467         if (ast_strlen_zero(handler->event_name)) {
468                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
469                 return -1;
470         }
471
472         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
473                 publication_hash_fn, publication_cmp_fn))) {
474                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
475                         handler->event_name);
476                 return -1;
477         }
478
479         publish_add_handler(handler);
480
481         ast_module_ref(ast_module_info->self);
482
483         return 0;
484 }
485
486 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
487 {
488         struct ast_sip_publish_handler *iter;
489         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
490         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
491                 if (handler == iter) {
492                         AST_RWLIST_REMOVE_CURRENT(next);
493                         ao2_cleanup(handler->publications);
494                         ast_module_unref(ast_module_info->self);
495                         break;
496                 }
497         }
498         AST_RWLIST_TRAVERSE_SAFE_END;
499 }
500
501 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
502
503 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
504 {
505         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
506         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
507         ast_module_ref(ast_module_info->self);
508 }
509
510 static int sub_handler_exists_for_event_name(const char *event_name)
511 {
512         struct ast_sip_subscription_handler *iter;
513         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
514
515         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
516                 if (!strcmp(iter->event_name, event_name)) {
517                         return 1;
518                 }
519         }
520         return 0;
521 }
522
523 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
524 {
525         pj_str_t accept[AST_SIP_MAX_ACCEPT];
526         int i;
527
528         if (ast_strlen_zero(handler->event_name)) {
529                 ast_log(LOG_ERROR, "No event package specifief for subscription handler. Cannot register\n");
530                 return -1;
531         }
532
533         if (ast_strlen_zero(handler->accept[0])) {
534                 ast_log(LOG_ERROR, "Subscription handler must supply at least one 'Accept' format\n");
535                 return -1;
536         }
537
538         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
539                 pj_cstr(&accept[i], handler->accept[i]);
540         }
541
542         if (!sub_handler_exists_for_event_name(handler->event_name)) {
543                 pj_str_t event;
544
545                 pj_cstr(&event, handler->event_name);
546
547                 if (!strcmp(handler->event_name, "message-summary")) {
548                         pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
549                 } else {
550                         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
551                 }
552         } else {
553                 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL,
554                         i, accept);
555         }
556
557         sub_add_handler(handler);
558         return 0;
559 }
560
561 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
562 {
563         struct ast_sip_subscription_handler *iter;
564         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
565         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
566                 if (handler == iter) {
567                         AST_RWLIST_REMOVE_CURRENT(next);
568                         ast_module_unref(ast_module_info->self);
569                         break;
570                 }
571         }
572         AST_RWLIST_TRAVERSE_SAFE_END;
573 }
574
575 static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
576 {
577         struct ast_sip_subscription_handler *iter;
578         int match = 0;
579         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
580         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
581                 int i;
582                 int j;
583                 if (strcmp(event, iter->event_name)) {
584                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
585                         continue;
586                 }
587                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
588                 for (i = 0; i < num_accept; ++i) {
589                         for (j = 0; j < num_accept; ++j) {
590                                 if (ast_strlen_zero(iter->accept[i])) {
591                                         ast_debug(3, "Breaking because subscription handler has run out of 'accept' types\n");
592                                         break;
593                                 }
594                                 if (!strcmp(accept[j], iter->accept[i])) {
595                                         ast_debug(3, "Accept headers match: %s = %s\n", accept[j], iter->accept[i]);
596                                         match = 1;
597                                         break;
598                                 }
599                                 ast_debug(3, "Accept %s does not match %s\n", accept[j], iter->accept[i]);
600                         }
601                         if (match) {
602                                 break;
603                         }
604                 }
605                 if (match) {
606                         break;
607                 }
608         }
609
610         return iter;
611 }
612
613 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
614 {
615         char event[32];
616         char accept[AST_SIP_MAX_ACCEPT][64];
617         pjsip_accept_hdr *accept_header;
618         pjsip_event_hdr *event_header;
619         struct ast_sip_subscription_handler *handler;
620         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
621         struct ast_sip_subscription *sub;
622         int i;
623
624         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
625         ast_assert(endpoint != NULL);
626
627         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
628         if (!event_header) {
629                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
630                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
631                 return PJ_TRUE;
632         }
633
634         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
635         if (!accept_header) {
636                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Accept header\n");
637                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
638                 return PJ_TRUE;
639         }
640
641         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
642         for (i = 0; i < accept_header->count; ++i) {
643                 ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
644         }
645
646         handler = find_sub_handler(event, accept, accept_header->count);
647         if (!handler) {
648                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
649                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
650                 return PJ_TRUE;
651         }
652         sub = handler->new_subscribe(endpoint, rdata);
653         if (!sub) {
654                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
655
656                 if (trans) {
657                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
658                         pjsip_tx_data *tdata;
659
660                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
661                                 return PJ_TRUE;
662                         }
663                         pjsip_dlg_send_response(dlg, trans, tdata);
664                 } else {
665                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
666                 }
667         }
668         return PJ_TRUE;
669 }
670
671 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
672 {
673         struct ast_sip_publish_handler *iter = NULL;
674         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
675
676         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
677                 if (strcmp(event, iter->event_name)) {
678                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
679                         continue;
680                 }
681                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
682                 break;
683         }
684
685         return iter;
686 }
687
688 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
689         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
690 {
691         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
692
693         if (etag_hdr) {
694                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
695
696                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
697
698                 if (sscanf(etag, "%30d", entity_id) != 1) {
699                         return SIP_PUBLISH_UNKNOWN;
700                 }
701         }
702
703         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
704
705         if (!(*expires)) {
706                 return SIP_PUBLISH_REMOVE;
707         } else if (!etag_hdr && rdata->msg_info.msg->body) {
708                 return SIP_PUBLISH_INITIAL;
709         } else if (etag_hdr && !rdata->msg_info.msg->body) {
710                 return SIP_PUBLISH_REFRESH;
711         } else if (etag_hdr && rdata->msg_info.msg->body) {
712                 return SIP_PUBLISH_MODIFY;
713         }
714
715         return SIP_PUBLISH_UNKNOWN;
716 }
717
718 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
719         struct ast_sip_publish_handler *handler)
720 {
721         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
722
723         if (!publication) {
724                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
725                 return NULL;
726         }
727
728         publication->handler = handler;
729
730         return publication;
731 }
732
733 static int publish_expire_callback(void *data)
734 {
735         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
736
737         publication->handler->publish_expire(publication);
738
739         return 0;
740 }
741
742 static int publish_expire(const void *data)
743 {
744         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
745
746         ao2_unlink(publication->handler->publications, publication);
747         publication->sched_id = -1;
748
749         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
750                 ao2_cleanup(publication);
751         }
752
753         return 0;
754 }
755
756 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
757 {
758         pjsip_event_hdr *event_header;
759         struct ast_sip_publish_handler *handler;
760         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
761         char event[32];
762         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
763         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
764         enum sip_publish_type publish_type;
765         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
766         int expires = 0, entity_id;
767
768         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
769         ast_assert(endpoint != NULL);
770
771         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
772         if (!event_header) {
773                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
774                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
775                 return PJ_TRUE;
776         }
777         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
778
779         handler = find_pub_handler(event);
780         if (!handler) {
781                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
782                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
783                 return PJ_TRUE;
784         }
785
786         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
787
788         /* If this is not an initial publish ensure that a publication is present */
789         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
790                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
791                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
792
793                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
794                                 NULL, NULL);
795                         return PJ_TRUE;
796                 }
797
798                 /* Per the RFC every response has to have a new entity tag */
799                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
800
801                 /* Update the expires here so that the created responses will contain the correct value */
802                 publication->expires = expires;
803         }
804
805         switch (publish_type) {
806                 case SIP_PUBLISH_INITIAL:
807                         publication = publish_request_initial(endpoint, rdata, handler);
808                         break;
809                 case SIP_PUBLISH_REFRESH:
810                 case SIP_PUBLISH_MODIFY:
811                         if (handler->publish_refresh(publication, rdata)) {
812                                 /* If an error occurs we want to terminate the publication */
813                                 expires = 0;
814                         }
815                         break;
816                 case SIP_PUBLISH_REMOVE:
817                         handler->publish_termination(publication, rdata);
818                         break;
819                 case SIP_PUBLISH_UNKNOWN:
820                 default:
821                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
822                         break;
823         }
824
825         if (publication) {
826                 if (expires) {
827                         ao2_link(handler->publications, publication);
828
829                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
830                                 ao2_ref(publication, -1), NULL, ao2_ref(publication, +1));
831                 } else {
832                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
833                 }
834         }
835
836         return PJ_TRUE;
837 }
838
839 /*! \brief Internal destructor for publications */
840 static void publication_destroy_fn(void *obj)
841 {
842         struct ast_sip_publication *publication = obj;
843
844         ast_debug(3, "Destroying SIP publication\n");
845
846         ao2_cleanup(publication->datastores);
847         ao2_cleanup(publication->endpoint);
848 }
849
850 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
851 {
852         struct ast_sip_publication *publication;
853         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
854
855         ast_assert(endpoint != NULL);
856
857         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
858                 return NULL;
859         }
860
861         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
862                 ao2_ref(publication, -1);
863                 return NULL;
864         }
865
866         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
867         ao2_ref(endpoint, +1);
868         publication->endpoint = endpoint;
869         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
870         publication->sched_id = -1;
871
872         return publication;
873 }
874
875 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
876 {
877         return pub->endpoint;
878 }
879
880 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
881         pjsip_tx_data **tdata)
882 {
883         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
884                 return -1;
885         }
886
887         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
888                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
889                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
890
891                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
892                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
893                         pjsip_tx_data_dec_ref(*tdata);
894                         return -1;
895                 }
896
897                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
898                 ast_sip_add_header(*tdata, "Expires", expires);
899         }
900
901         return 0;
902 }
903
904 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
905         pjsip_tx_data *tdata)
906 {
907         pj_status_t status;
908         pjsip_transaction *tsx;
909
910         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
911                 return status;
912         }
913
914         pjsip_tsx_recv_msg(tsx, rdata);
915
916         return pjsip_tsx_send_msg(tsx, tdata);
917 }
918
919 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
920 {
921         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
922                 return pubsub_on_rx_subscribe_request(rdata);
923         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
924                 return pubsub_on_rx_publish_request(rdata);
925         }
926
927         return PJ_FALSE;
928 }
929
930 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
931 {
932         struct ast_sip_subscription *sub;
933         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
934                 return;
935         }
936
937         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
938         if (!sub) {
939                 return;
940         }
941
942         if (event->type == PJSIP_EVENT_RX_MSG) {
943                 sub->handler->subscription_terminated(sub, event->body.rx_msg.rdata);
944         }
945
946         if (event->type == PJSIP_EVENT_TSX_STATE &&
947                         event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
948                 sub->handler->subscription_terminated(sub, event->body.tsx_state.src.rdata);
949         }
950
951         if (sub->handler->subscription_shutdown) {
952                 sub->handler->subscription_shutdown(sub);
953         }
954         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
955 }
956
957 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
958 {
959         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
960
961         if (!sub) {
962                 return;
963         }
964
965         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
966             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
967                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
968         }
969 }
970
971 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
972                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
973                 struct ast_sip_subscription_response_data *response_data)
974 {
975         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
976         *p_st_code = response_data->status_code;
977
978         if (!ast_strlen_zero(response_data->status_text)) {
979                 pj_strdup2(pool, *p_st_text, response_data->status_text);
980         }
981
982         if (response_data->headers) {
983                 struct ast_variable *iter;
984                 for (iter = response_data->headers; iter; iter = iter->next) {
985                         pj_str_t header_name;
986                         pj_str_t header_value;
987                         pjsip_generic_string_hdr *hdr;
988
989                         pj_cstr(&header_name, iter->name);
990                         pj_cstr(&header_value, iter->value);
991                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
992                         pj_list_insert_before(res_hdr, hdr);
993                 }
994         }
995
996         if (response_data->body) {
997                 pj_str_t type;
998                 pj_str_t subtype;
999                 pj_str_t body_text;
1000
1001                 pj_cstr(&type, response_data->body->type);
1002                 pj_cstr(&subtype, response_data->body->subtype);
1003                 pj_cstr(&body_text, response_data->body->body_text);
1004
1005                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1006         }
1007 }
1008
1009 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1010 {
1011         if (response_data->status_code != 200 ||
1012                         !ast_strlen_zero(response_data->status_text) ||
1013                         response_data->headers ||
1014                         response_data->body) {
1015                 return 1;
1016         }
1017         return 0;
1018 }
1019
1020 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1021                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1022 {
1023         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1024         struct ast_sip_subscription_response_data response_data = {
1025                 .status_code = 200,
1026         };
1027
1028         if (!sub) {
1029                 return;
1030         }
1031
1032         sub->handler->resubscribe(sub, rdata, &response_data);
1033
1034         if (!response_data_changed(&response_data)) {
1035                 return;
1036         }
1037
1038         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1039                         res_hdr, p_body, &response_data);
1040 }
1041
1042 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1043                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1044 {
1045         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1046         struct ast_sip_subscription_response_data response_data = {
1047                 .status_code = 200,
1048         };
1049
1050         if (!sub || !sub->handler->notify_request) {
1051                 return;
1052         }
1053
1054         sub->handler->notify_request(sub, rdata, &response_data);
1055
1056         if (!response_data_changed(&response_data)) {
1057                 return;
1058         }
1059
1060         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1061                         res_hdr, p_body, &response_data);
1062 }
1063
1064 static int serialized_pubsub_on_client_refresh(void *userdata)
1065 {
1066         struct ast_sip_subscription *sub = userdata;
1067
1068         sub->handler->refresh_subscription(sub);
1069         ao2_cleanup(sub);
1070         return 0;
1071 }
1072
1073 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1074 {
1075         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1076
1077         ao2_ref(sub, +1);
1078         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1079 }
1080
1081 static int serialized_pubsub_on_server_timeout(void *userdata)
1082 {
1083         struct ast_sip_subscription *sub = userdata;
1084
1085         sub->handler->subscription_timeout(sub);
1086         ao2_cleanup(sub);
1087         return 0;
1088 }
1089
1090 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1091 {
1092         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1093
1094         ao2_ref(sub, +1);
1095         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1096 }
1097
1098 static int load_module(void)
1099 {
1100         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1101
1102         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1103
1104         if (!(sched = ast_sched_context_create())) {
1105                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1106                 return AST_MODULE_LOAD_FAILURE;
1107         }
1108
1109         if (ast_sched_start_thread(sched)) {
1110                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1111                 ast_sched_context_destroy(sched);
1112                 return AST_MODULE_LOAD_FAILURE;
1113         }
1114
1115         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1116
1117         if (ast_sip_register_service(&pubsub_module)) {
1118                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1119                 ast_sched_context_destroy(sched);
1120                 return AST_MODULE_LOAD_FAILURE;
1121         }
1122
1123         return AST_MODULE_LOAD_SUCCESS;
1124 }
1125
1126 static int unload_module(void)
1127 {
1128         if (sched) {
1129                 ast_sched_context_destroy(sched);
1130         }
1131
1132         return 0;
1133 }
1134
1135 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "SIP event resource",
1136                 .load = load_module,
1137                 .unload = unload_module,
1138                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1139 );