4daacd42c30ff036b675b7431fb4750b8716e32d
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43
44 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
45
46 static struct pjsip_module pubsub_module = {
47         .name = { "PubSub Module", 13 },
48         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
49         .on_rx_request = pubsub_on_rx_request,
50 };
51
52 static const pj_str_t str_event_name = { "Event", 5 };
53
54 /*! \brief Scheduler used for automatically expiring publications */
55 static struct ast_sched_context *sched;
56
57 /*! \brief Number of buckets for publications (on a per handler) */
58 #define PUBLICATIONS_BUCKETS 37
59
60 /*! \brief Default expiration time for PUBLISH if one is not specified */
61 #define DEFAULT_PUBLISH_EXPIRES 3600
62
63 /*! \brief Defined method for PUBLISH */
64 const pjsip_method pjsip_publish_method =
65 {
66         PJSIP_OTHER_METHOD,
67         { "PUBLISH", 7 }
68 };
69
70 /*!
71  * \brief The types of PUBLISH messages defined in RFC 3903
72  */
73 enum sip_publish_type {
74         /*!
75          * \brief Unknown
76          *
77          * \details
78          * This actually is not defined in RFC 3903. We use this as a constant
79          * to indicate that an incoming PUBLISH does not fit into any of the
80          * other categories and is thus invalid.
81          */
82         SIP_PUBLISH_UNKNOWN,
83
84         /*!
85          * \brief Initial
86          *
87          * \details
88          * The first PUBLISH sent. This will contain a non-zero Expires header
89          * as well as a body that indicates the current state of the endpoint
90          * that has sent the message. The initial PUBLISH is the only type
91          * of PUBLISH to not contain a Sip-If-Match header in it.
92          */
93         SIP_PUBLISH_INITIAL,
94
95         /*!
96          * \brief Refresh
97          *
98          * \details
99          * Used to keep a published state from expiring. This will contain a
100          * non-zero Expires header but no body since its purpose is not to
101          * update state.
102          */
103         SIP_PUBLISH_REFRESH,
104
105         /*!
106          * \brief Modify
107          *
108          * \details
109          * Used to change state from its previous value. This will contain
110          * a body updating the published state. May or may not contain an
111          * Expires header.
112          */
113         SIP_PUBLISH_MODIFY,
114
115         /*!
116          * \brief Remove
117          *
118          * \details
119          * Used to remove published state from an ESC. This will contain
120          * an Expires header set to 0 and likely no body.
121          */
122         SIP_PUBLISH_REMOVE,
123 };
124
125 /*!
126  * Used to create new entity IDs by ESCs.
127  */
128 static int esc_etag_counter;
129
130 /*!
131  * \brief Structure representing a SIP publication
132  */
133 struct ast_sip_publication {
134         /*! Publication datastores set up by handlers */
135         struct ao2_container *datastores;
136         /*! \brief Entity tag for the publication */
137         int entity_tag;
138         /*! \brief Handler for this publication */
139         struct ast_sip_publish_handler *handler;
140         /*! \brief The endpoint with which the subscription is communicating */
141         struct ast_sip_endpoint *endpoint;
142         /*! \brief Expiration time of the publication */
143         int expires;
144         /*! \brief Scheduled item for expiration of publication */
145         int sched_id;
146 };
147
148 /*!
149  * \brief Structure representing a SIP subscription
150  */
151 struct ast_sip_subscription {
152         /*! Subscription datastores set up by handlers */
153         struct ao2_container *datastores;
154         /*! The endpoint with which the subscription is communicating */
155         struct ast_sip_endpoint *endpoint;
156         /*! Serializer on which to place operations for this subscription */
157         struct ast_taskprocessor *serializer;
158         /*! The handler for this subscription */
159         const struct ast_sip_subscription_handler *handler;
160         /*! The role for this subscription */
161         enum ast_sip_subscription_role role;
162         /*! The underlying PJSIP event subscription structure */
163         pjsip_evsub *evsub;
164         /*! The underlying PJSIP dialog */
165         pjsip_dialog *dlg;
166 };
167
168 #define DATASTORE_BUCKETS 53
169
170 #define DEFAULT_EXPIRES 3600
171
172 static int datastore_hash(const void *obj, int flags)
173 {
174         const struct ast_datastore *datastore = obj;
175         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
176
177         ast_assert(uid != NULL);
178
179         return ast_str_hash(uid);
180 }
181
182 static int datastore_cmp(void *obj, void *arg, int flags)
183 {
184         const struct ast_datastore *datastore1 = obj;
185         const struct ast_datastore *datastore2 = arg;
186         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
187
188         ast_assert(datastore1->uid != NULL);
189         ast_assert(uid2 != NULL);
190
191         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
192 }
193
194 static void subscription_destructor(void *obj)
195 {
196         struct ast_sip_subscription *sub = obj;
197
198         ast_debug(3, "Destroying SIP subscription\n");
199
200         ao2_cleanup(sub->datastores);
201         ao2_cleanup(sub->endpoint);
202
203         if (sub->dlg) {
204                 /* This is why we keep the dialog on the subscription. When the subscription
205                  * is destroyed, there is no guarantee that the underlying dialog is ready
206                  * to be destroyed. Furthermore, there's no guarantee in the opposite direction
207                  * either. The dialog could be destroyed before our subscription is. We fix
208                  * this problem by keeping a reference to the dialog until it is time to
209                  * destroy the subscription. We need to have the dialog available when the
210                  * subscription is destroyed so that we can guarantee that our attempt to
211                  * remove the serializer will be successful.
212                  */
213                 ast_sip_dialog_set_serializer(sub->dlg, NULL);
214                 pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
215         }
216         ast_taskprocessor_unreference(sub->serializer);
217 }
218
219 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
220 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
221 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
222                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
223 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
224                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
225 static void pubsub_on_client_refresh(pjsip_evsub *sub);
226 static void pubsub_on_server_timeout(pjsip_evsub *sub);
227
228
229 static pjsip_evsub_user pubsub_cb = {
230         .on_evsub_state = pubsub_on_evsub_state,
231         .on_tsx_state = pubsub_on_tsx_state,
232         .on_rx_refresh = pubsub_on_rx_refresh,
233         .on_rx_notify = pubsub_on_rx_notify,
234         .on_client_refresh = pubsub_on_client_refresh,
235         .on_server_timeout = pubsub_on_server_timeout,
236 };
237
238 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
239                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
240 {
241         pjsip_evsub *evsub;
242         /* PJSIP is kind enough to have some built-in support for certain
243          * events. We need to use the correct initialization function for the
244          * built-in events
245          */
246         if (role == AST_SIP_NOTIFIER) {
247                 if (!strcmp(event, "message-summary")) {
248                         pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub);
249                 } else {
250                         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
251                 }
252         } else {
253                 if (!strcmp(event, "message-summary")) {
254                         pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub);
255                 } else {
256                         pj_str_t pj_event;
257                         pj_cstr(&pj_event, event);
258                         pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
259                 }
260         }
261         return evsub;
262 }
263
264 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
265                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
266 {
267         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
268         pjsip_dialog *dlg;
269
270         if (!sub) {
271                 return NULL;
272         }
273         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
274         if (!sub->datastores) {
275                 ao2_ref(sub, -1);
276                 return NULL;
277         }
278         sub->serializer = ast_sip_create_serializer();
279         if (!sub->serializer) {
280                 ao2_ref(sub, -1);
281                 return NULL;
282         }
283         sub->role = role;
284         if (role == AST_SIP_NOTIFIER) {
285                 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
286         } else {
287                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
288
289                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
290                 if (!contact || ast_strlen_zero(contact->uri)) {
291                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
292                                         ast_sorcery_object_get_id(endpoint));
293                         ao2_ref(sub, -1);
294                         return NULL;
295                 }
296                 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
297         }
298         if (!dlg) {
299                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
300                 ao2_ref(sub, -1);
301                 return NULL;
302         }
303         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
304         /* We keep a reference to the dialog until our subscription is destroyed. See
305          * the subscription_destructor for more details
306          */
307         pjsip_dlg_inc_session(dlg, &pubsub_module);
308         sub->dlg = dlg;
309         ast_sip_dialog_set_serializer(dlg, sub->serializer);
310         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
311         ao2_ref(endpoint, +1);
312         sub->endpoint = endpoint;
313         sub->handler = handler;
314         return sub;
315 }
316
317 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
318 {
319         ast_assert(sub->endpoint != NULL);
320         ao2_ref(sub->endpoint, +1);
321         return sub->endpoint;
322 }
323
324 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
325 {
326         ast_assert(sub->serializer != NULL);
327         return sub->serializer;
328 }
329
330 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
331 {
332         return sub->evsub;
333 }
334
335 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
336 {
337         return sub->dlg;
338 }
339
340 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
341 {
342         return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
343                         tdata) == PJ_SUCCESS ? 0 : -1;
344 }
345
346 static void subscription_datastore_destroy(void *obj)
347 {
348         struct ast_datastore *datastore = obj;
349
350         /* Using the destroy function (if present) destroy the data */
351         if (datastore->info->destroy != NULL && datastore->data != NULL) {
352                 datastore->info->destroy(datastore->data);
353                 datastore->data = NULL;
354         }
355
356         ast_free((void *) datastore->uid);
357         datastore->uid = NULL;
358 }
359
360 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
361 {
362         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
363         const char *uid_ptr = uid;
364
365         if (!info) {
366                 return NULL;
367         }
368
369         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
370         if (!datastore) {
371                 return NULL;
372         }
373
374         datastore->info = info;
375         if (ast_strlen_zero(uid)) {
376                 /* They didn't provide an ID so we'll provide one ourself */
377                 struct ast_uuid *uuid = ast_uuid_generate();
378                 char uuid_buf[AST_UUID_STR_LEN];
379                 if (!uuid) {
380                         return NULL;
381                 }
382                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
383                 ast_free(uuid);
384         }
385
386         datastore->uid = ast_strdup(uid_ptr);
387         if (!datastore->uid) {
388                 return NULL;
389         }
390
391         ao2_ref(datastore, +1);
392         return datastore;
393 }
394
395 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
396 {
397         ast_assert(datastore != NULL);
398         ast_assert(datastore->info != NULL);
399         ast_assert(!ast_strlen_zero(datastore->uid));
400
401         if (!ao2_link(subscription->datastores, datastore)) {
402                 return -1;
403         }
404         return 0;
405 }
406
407 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
408 {
409         return ao2_find(subscription->datastores, name, OBJ_KEY);
410 }
411
412 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
413 {
414         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
415 }
416
417 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
418 {
419         ast_assert(datastore != NULL);
420         ast_assert(datastore->info != NULL);
421         ast_assert(!ast_strlen_zero(datastore->uid));
422
423         if (!ao2_link(publication->datastores, datastore)) {
424                 return -1;
425         }
426         return 0;
427 }
428
429 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
430 {
431         return ao2_find(publication->datastores, name, OBJ_KEY);
432 }
433
434 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
435 {
436         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
437 }
438
439 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
440
441 static int publication_hash_fn(const void *obj, const int flags)
442 {
443         const struct ast_sip_publication *publication = obj;
444         const int *entity_tag = obj;
445
446         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
447 }
448
449 static int publication_cmp_fn(void *obj, void *arg, int flags)
450 {
451         const struct ast_sip_publication *publication1 = obj;
452         const struct ast_sip_publication *publication2 = arg;
453         const int *entity_tag = arg;
454
455         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
456                 CMP_MATCH | CMP_STOP : 0);
457 }
458
459 static void publish_add_handler(struct ast_sip_publish_handler *handler)
460 {
461         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
462         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
463 }
464
465 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
466 {
467         if (ast_strlen_zero(handler->event_name)) {
468                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
469                 return -1;
470         }
471
472         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
473                 publication_hash_fn, publication_cmp_fn))) {
474                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
475                         handler->event_name);
476                 return -1;
477         }
478
479         publish_add_handler(handler);
480
481         ast_module_ref(ast_module_info->self);
482
483         return 0;
484 }
485
486 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
487 {
488         struct ast_sip_publish_handler *iter;
489         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
490         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
491                 if (handler == iter) {
492                         AST_RWLIST_REMOVE_CURRENT(next);
493                         ao2_cleanup(handler->publications);
494                         ast_module_unref(ast_module_info->self);
495                         break;
496                 }
497         }
498         AST_RWLIST_TRAVERSE_SAFE_END;
499 }
500
501 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
502
503 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
504 {
505         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
506         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
507         ast_module_ref(ast_module_info->self);
508 }
509
510 static int sub_handler_exists_for_event_name(const char *event_name)
511 {
512         struct ast_sip_subscription_handler *iter;
513         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
514
515         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
516                 if (!strcmp(iter->event_name, event_name)) {
517                         return 1;
518                 }
519         }
520         return 0;
521 }
522
523 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
524 {
525         pj_str_t accept[AST_SIP_MAX_ACCEPT];
526         int i;
527
528         if (ast_strlen_zero(handler->event_name)) {
529                 ast_log(LOG_ERROR, "No event package specifief for subscription handler. Cannot register\n");
530                 return -1;
531         }
532
533         if (ast_strlen_zero(handler->accept[0])) {
534                 ast_log(LOG_ERROR, "Subscription handler must supply at least one 'Accept' format\n");
535                 return -1;
536         }
537
538         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
539                 pj_cstr(&accept[i], handler->accept[i]);
540         }
541
542         if (!sub_handler_exists_for_event_name(handler->event_name)) {
543                 pj_str_t event;
544
545                 pj_cstr(&event, handler->event_name);
546
547                 if (!strcmp(handler->event_name, "message-summary")) {
548                         pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
549                 } else {
550                         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
551                 }
552         } else {
553                 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL,
554                         i, accept);
555         }
556
557         sub_add_handler(handler);
558         return 0;
559 }
560
561 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
562 {
563         struct ast_sip_subscription_handler *iter;
564         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
565         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
566                 if (handler == iter) {
567                         AST_RWLIST_REMOVE_CURRENT(next);
568                         ast_module_unref(ast_module_info->self);
569                         break;
570                 }
571         }
572         AST_RWLIST_TRAVERSE_SAFE_END;
573 }
574
575 static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
576 {
577         struct ast_sip_subscription_handler *iter;
578         int match = 0;
579         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
580         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
581                 int i;
582                 int j;
583                 if (strcmp(event, iter->event_name)) {
584                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
585                         continue;
586                 }
587                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
588                 if (!num_accept && iter->handles_default_accept) {
589                         /* The SUBSCRIBE contained no Accept headers, and this subscription handler
590                          * provides the default body type, so it's a match!
591                          */
592                         break;
593                 }
594                 for (i = 0; i < num_accept; ++i) {
595                         for (j = 0; j < num_accept; ++j) {
596                                 if (ast_strlen_zero(iter->accept[i])) {
597                                         ast_debug(3, "Breaking because subscription handler has run out of 'accept' types\n");
598                                         break;
599                                 }
600                                 if (!strcmp(accept[j], iter->accept[i])) {
601                                         ast_debug(3, "Accept headers match: %s = %s\n", accept[j], iter->accept[i]);
602                                         match = 1;
603                                         break;
604                                 }
605                                 ast_debug(3, "Accept %s does not match %s\n", accept[j], iter->accept[i]);
606                         }
607                         if (match) {
608                                 break;
609                         }
610                 }
611                 if (match) {
612                         break;
613                 }
614         }
615
616         return iter;
617 }
618
619 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
620 {
621         char event[32];
622         char accept[AST_SIP_MAX_ACCEPT][64];
623         pjsip_accept_hdr *accept_header;
624         pjsip_event_hdr *event_header;
625         pjsip_expires_hdr *expires_header;
626         struct ast_sip_subscription_handler *handler;
627         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
628         struct ast_sip_subscription *sub;
629         size_t num_accept_headers;
630
631         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
632         ast_assert(endpoint != NULL);
633
634         if (!endpoint->subscription.allow) {
635                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
636                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
637                 return PJ_TRUE;
638         }
639
640         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
641
642         if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) {
643                 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %d\n",
644                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
645                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
646                 return PJ_TRUE;
647         }
648
649         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
650         if (!event_header) {
651                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
652                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
653                 return PJ_TRUE;
654         }
655         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
656
657         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
658         if (accept_header) {
659                 int i;
660
661                 for (i = 0; i < accept_header->count; ++i) {
662                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
663                 }
664                 num_accept_headers = accept_header->count;
665         } else {
666                 num_accept_headers = 0;
667         }
668
669         handler = find_sub_handler(event, accept, num_accept_headers);
670         if (!handler) {
671                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
672                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
673                 return PJ_TRUE;
674         }
675         sub = handler->new_subscribe(endpoint, rdata);
676         if (!sub) {
677                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
678
679                 if (trans) {
680                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
681                         pjsip_tx_data *tdata;
682
683                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
684                                 return PJ_TRUE;
685                         }
686                         pjsip_dlg_send_response(dlg, trans, tdata);
687                 } else {
688                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
689                 }
690         }
691         return PJ_TRUE;
692 }
693
694 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
695 {
696         struct ast_sip_publish_handler *iter = NULL;
697         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
698
699         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
700                 if (strcmp(event, iter->event_name)) {
701                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
702                         continue;
703                 }
704                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
705                 break;
706         }
707
708         return iter;
709 }
710
711 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
712         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
713 {
714         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
715
716         if (etag_hdr) {
717                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
718
719                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
720
721                 if (sscanf(etag, "%30d", entity_id) != 1) {
722                         return SIP_PUBLISH_UNKNOWN;
723                 }
724         }
725
726         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
727
728         if (!(*expires)) {
729                 return SIP_PUBLISH_REMOVE;
730         } else if (!etag_hdr && rdata->msg_info.msg->body) {
731                 return SIP_PUBLISH_INITIAL;
732         } else if (etag_hdr && !rdata->msg_info.msg->body) {
733                 return SIP_PUBLISH_REFRESH;
734         } else if (etag_hdr && rdata->msg_info.msg->body) {
735                 return SIP_PUBLISH_MODIFY;
736         }
737
738         return SIP_PUBLISH_UNKNOWN;
739 }
740
741 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
742         struct ast_sip_publish_handler *handler)
743 {
744         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
745
746         if (!publication) {
747                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
748                 return NULL;
749         }
750
751         publication->handler = handler;
752
753         return publication;
754 }
755
756 static int publish_expire_callback(void *data)
757 {
758         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
759
760         publication->handler->publish_expire(publication);
761
762         return 0;
763 }
764
765 static int publish_expire(const void *data)
766 {
767         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
768
769         ao2_unlink(publication->handler->publications, publication);
770         publication->sched_id = -1;
771
772         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
773                 ao2_cleanup(publication);
774         }
775
776         return 0;
777 }
778
779 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
780 {
781         pjsip_event_hdr *event_header;
782         struct ast_sip_publish_handler *handler;
783         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
784         char event[32];
785         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
786         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
787         enum sip_publish_type publish_type;
788         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
789         int expires = 0, entity_id;
790
791         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
792         ast_assert(endpoint != NULL);
793
794         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
795         if (!event_header) {
796                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
797                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
798                 return PJ_TRUE;
799         }
800         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
801
802         handler = find_pub_handler(event);
803         if (!handler) {
804                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
805                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
806                 return PJ_TRUE;
807         }
808
809         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
810
811         /* If this is not an initial publish ensure that a publication is present */
812         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
813                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
814                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
815
816                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
817                                 NULL, NULL);
818                         return PJ_TRUE;
819                 }
820
821                 /* Per the RFC every response has to have a new entity tag */
822                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
823
824                 /* Update the expires here so that the created responses will contain the correct value */
825                 publication->expires = expires;
826         }
827
828         switch (publish_type) {
829                 case SIP_PUBLISH_INITIAL:
830                         publication = publish_request_initial(endpoint, rdata, handler);
831                         break;
832                 case SIP_PUBLISH_REFRESH:
833                 case SIP_PUBLISH_MODIFY:
834                         if (handler->publish_refresh(publication, rdata)) {
835                                 /* If an error occurs we want to terminate the publication */
836                                 expires = 0;
837                         }
838                         break;
839                 case SIP_PUBLISH_REMOVE:
840                         handler->publish_termination(publication, rdata);
841                         break;
842                 case SIP_PUBLISH_UNKNOWN:
843                 default:
844                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
845                         break;
846         }
847
848         if (publication) {
849                 if (expires) {
850                         ao2_link(handler->publications, publication);
851
852                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
853                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
854                 } else {
855                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
856                 }
857         }
858
859         return PJ_TRUE;
860 }
861
862 /*! \brief Internal destructor for publications */
863 static void publication_destroy_fn(void *obj)
864 {
865         struct ast_sip_publication *publication = obj;
866
867         ast_debug(3, "Destroying SIP publication\n");
868
869         ao2_cleanup(publication->datastores);
870         ao2_cleanup(publication->endpoint);
871 }
872
873 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
874 {
875         struct ast_sip_publication *publication;
876         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
877
878         ast_assert(endpoint != NULL);
879
880         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
881                 return NULL;
882         }
883
884         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
885                 ao2_ref(publication, -1);
886                 return NULL;
887         }
888
889         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
890         ao2_ref(endpoint, +1);
891         publication->endpoint = endpoint;
892         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
893         publication->sched_id = -1;
894
895         return publication;
896 }
897
898 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
899 {
900         return pub->endpoint;
901 }
902
903 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
904         pjsip_tx_data **tdata)
905 {
906         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
907                 return -1;
908         }
909
910         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
911                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
912                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
913
914                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
915                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
916                         pjsip_tx_data_dec_ref(*tdata);
917                         return -1;
918                 }
919
920                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
921                 ast_sip_add_header(*tdata, "Expires", expires);
922         }
923
924         return 0;
925 }
926
927 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
928         pjsip_tx_data *tdata)
929 {
930         pj_status_t status;
931         pjsip_transaction *tsx;
932
933         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
934                 return status;
935         }
936
937         pjsip_tsx_recv_msg(tsx, rdata);
938
939         return pjsip_tsx_send_msg(tsx, tdata);
940 }
941
942 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
943 {
944         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
945                 return pubsub_on_rx_subscribe_request(rdata);
946         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
947                 return pubsub_on_rx_publish_request(rdata);
948         }
949
950         return PJ_FALSE;
951 }
952
953 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
954 {
955         struct ast_sip_subscription *sub;
956         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
957                 return;
958         }
959
960         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
961         if (!sub) {
962                 return;
963         }
964
965         if (sub->handler->subscription_shutdown) {
966                 sub->handler->subscription_shutdown(sub);
967         }
968         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
969 }
970
971 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
972 {
973         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
974
975         if (!sub) {
976                 return;
977         }
978
979         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
980             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
981                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
982         }
983 }
984
985 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
986                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
987                 struct ast_sip_subscription_response_data *response_data)
988 {
989         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
990         *p_st_code = response_data->status_code;
991
992         if (!ast_strlen_zero(response_data->status_text)) {
993                 pj_strdup2(pool, *p_st_text, response_data->status_text);
994         }
995
996         if (response_data->headers) {
997                 struct ast_variable *iter;
998                 for (iter = response_data->headers; iter; iter = iter->next) {
999                         pj_str_t header_name;
1000                         pj_str_t header_value;
1001                         pjsip_generic_string_hdr *hdr;
1002
1003                         pj_cstr(&header_name, iter->name);
1004                         pj_cstr(&header_value, iter->value);
1005                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1006                         pj_list_insert_before(res_hdr, hdr);
1007                 }
1008         }
1009
1010         if (response_data->body) {
1011                 pj_str_t type;
1012                 pj_str_t subtype;
1013                 pj_str_t body_text;
1014
1015                 pj_cstr(&type, response_data->body->type);
1016                 pj_cstr(&subtype, response_data->body->subtype);
1017                 pj_cstr(&body_text, response_data->body->body_text);
1018
1019                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1020         }
1021 }
1022
1023 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1024 {
1025         if (response_data->status_code != 200 ||
1026                         !ast_strlen_zero(response_data->status_text) ||
1027                         response_data->headers ||
1028                         response_data->body) {
1029                 return 1;
1030         }
1031         return 0;
1032 }
1033
1034 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1035                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1036 {
1037         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1038         struct ast_sip_subscription_response_data response_data = {
1039                 .status_code = 200,
1040         };
1041
1042         if (!sub) {
1043                 return;
1044         }
1045
1046         if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
1047                 sub->handler->subscription_terminated(sub, rdata);
1048                 return;
1049         }
1050
1051         sub->handler->resubscribe(sub, rdata, &response_data);
1052
1053         if (!response_data_changed(&response_data)) {
1054                 return;
1055         }
1056
1057         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1058                         res_hdr, p_body, &response_data);
1059 }
1060
1061 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1062                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1063 {
1064         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1065         struct ast_sip_subscription_response_data response_data = {
1066                 .status_code = 200,
1067         };
1068
1069         if (!sub || !sub->handler->notify_request) {
1070                 return;
1071         }
1072
1073         sub->handler->notify_request(sub, rdata, &response_data);
1074
1075         if (!response_data_changed(&response_data)) {
1076                 return;
1077         }
1078
1079         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1080                         res_hdr, p_body, &response_data);
1081 }
1082
1083 static int serialized_pubsub_on_client_refresh(void *userdata)
1084 {
1085         struct ast_sip_subscription *sub = userdata;
1086
1087         sub->handler->refresh_subscription(sub);
1088         ao2_cleanup(sub);
1089         return 0;
1090 }
1091
1092 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1093 {
1094         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1095
1096         ao2_ref(sub, +1);
1097         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1098 }
1099
1100 static int serialized_pubsub_on_server_timeout(void *userdata)
1101 {
1102         struct ast_sip_subscription *sub = userdata;
1103
1104         sub->handler->subscription_timeout(sub);
1105         ao2_cleanup(sub);
1106         return 0;
1107 }
1108
1109 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1110 {
1111         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1112
1113         ao2_ref(sub, +1);
1114         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1115 }
1116
1117 static int load_module(void)
1118 {
1119         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1120
1121         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1122
1123         if (!(sched = ast_sched_context_create())) {
1124                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1125                 return AST_MODULE_LOAD_FAILURE;
1126         }
1127
1128         if (ast_sched_start_thread(sched)) {
1129                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1130                 ast_sched_context_destroy(sched);
1131                 return AST_MODULE_LOAD_FAILURE;
1132         }
1133
1134         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1135
1136         if (ast_sip_register_service(&pubsub_module)) {
1137                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1138                 ast_sched_context_destroy(sched);
1139                 return AST_MODULE_LOAD_FAILURE;
1140         }
1141
1142         return AST_MODULE_LOAD_SUCCESS;
1143 }
1144
1145 static int unload_module(void)
1146 {
1147         if (sched) {
1148                 ast_sched_context_destroy(sched);
1149         }
1150
1151         return 0;
1152 }
1153
1154 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1155                 .load = load_module,
1156                 .unload = unload_module,
1157                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1158 );