Handle default body types for SIP event packages in res_pjsip_pubsub
[asterisk/asterisk.git] / res / res_pjsip_pubsub.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18 /*!
19  * \brief Opaque structure representing an RFC 3265 SIP subscription
20  */
21
22 /*** MODULEINFO
23         <depend>pjproject</depend>
24         <depend>res_pjsip</depend>
25         <support_level>core</support_level>
26  ***/
27
28 #include "asterisk.h"
29
30 #include <pjsip.h>
31 #include <pjsip_simple.h>
32 #include <pjlib.h>
33
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43
44 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
45
46 static struct pjsip_module pubsub_module = {
47         .name = { "PubSub Module", 13 },
48         .priority = PJSIP_MOD_PRIORITY_APPLICATION,
49         .on_rx_request = pubsub_on_rx_request,
50 };
51
52 static const pj_str_t str_event_name = { "Event", 5 };
53
54 /*! \brief Scheduler used for automatically expiring publications */
55 static struct ast_sched_context *sched;
56
57 /*! \brief Number of buckets for publications (on a per handler) */
58 #define PUBLICATIONS_BUCKETS 37
59
60 /*! \brief Default expiration time for PUBLISH if one is not specified */
61 #define DEFAULT_PUBLISH_EXPIRES 3600
62
63 /*! \brief Defined method for PUBLISH */
64 const pjsip_method pjsip_publish_method =
65 {
66         PJSIP_OTHER_METHOD,
67         { "PUBLISH", 7 }
68 };
69
70 /*!
71  * \brief The types of PUBLISH messages defined in RFC 3903
72  */
73 enum sip_publish_type {
74         /*!
75          * \brief Unknown
76          *
77          * \details
78          * This actually is not defined in RFC 3903. We use this as a constant
79          * to indicate that an incoming PUBLISH does not fit into any of the
80          * other categories and is thus invalid.
81          */
82         SIP_PUBLISH_UNKNOWN,
83
84         /*!
85          * \brief Initial
86          *
87          * \details
88          * The first PUBLISH sent. This will contain a non-zero Expires header
89          * as well as a body that indicates the current state of the endpoint
90          * that has sent the message. The initial PUBLISH is the only type
91          * of PUBLISH to not contain a Sip-If-Match header in it.
92          */
93         SIP_PUBLISH_INITIAL,
94
95         /*!
96          * \brief Refresh
97          *
98          * \details
99          * Used to keep a published state from expiring. This will contain a
100          * non-zero Expires header but no body since its purpose is not to
101          * update state.
102          */
103         SIP_PUBLISH_REFRESH,
104
105         /*!
106          * \brief Modify
107          *
108          * \details
109          * Used to change state from its previous value. This will contain
110          * a body updating the published state. May or may not contain an
111          * Expires header.
112          */
113         SIP_PUBLISH_MODIFY,
114
115         /*!
116          * \brief Remove
117          *
118          * \details
119          * Used to remove published state from an ESC. This will contain
120          * an Expires header set to 0 and likely no body.
121          */
122         SIP_PUBLISH_REMOVE,
123 };
124
125 /*!
126  * Used to create new entity IDs by ESCs.
127  */
128 static int esc_etag_counter;
129
130 /*!
131  * \brief Structure representing a SIP publication
132  */
133 struct ast_sip_publication {
134         /*! Publication datastores set up by handlers */
135         struct ao2_container *datastores;
136         /*! \brief Entity tag for the publication */
137         int entity_tag;
138         /*! \brief Handler for this publication */
139         struct ast_sip_publish_handler *handler;
140         /*! \brief The endpoint with which the subscription is communicating */
141         struct ast_sip_endpoint *endpoint;
142         /*! \brief Expiration time of the publication */
143         int expires;
144         /*! \brief Scheduled item for expiration of publication */
145         int sched_id;
146 };
147
148 /*!
149  * \brief Structure representing a SIP subscription
150  */
151 struct ast_sip_subscription {
152         /*! Subscription datastores set up by handlers */
153         struct ao2_container *datastores;
154         /*! The endpoint with which the subscription is communicating */
155         struct ast_sip_endpoint *endpoint;
156         /*! Serializer on which to place operations for this subscription */
157         struct ast_taskprocessor *serializer;
158         /*! The handler for this subscription */
159         const struct ast_sip_subscription_handler *handler;
160         /*! The role for this subscription */
161         enum ast_sip_subscription_role role;
162         /*! The underlying PJSIP event subscription structure */
163         pjsip_evsub *evsub;
164         /*! The underlying PJSIP dialog */
165         pjsip_dialog *dlg;
166 };
167
168 #define DATASTORE_BUCKETS 53
169
170 #define DEFAULT_EXPIRES 3600
171
172 static int datastore_hash(const void *obj, int flags)
173 {
174         const struct ast_datastore *datastore = obj;
175         const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
176
177         ast_assert(uid != NULL);
178
179         return ast_str_hash(uid);
180 }
181
182 static int datastore_cmp(void *obj, void *arg, int flags)
183 {
184         const struct ast_datastore *datastore1 = obj;
185         const struct ast_datastore *datastore2 = arg;
186         const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
187
188         ast_assert(datastore1->uid != NULL);
189         ast_assert(uid2 != NULL);
190
191         return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
192 }
193
194 static void subscription_destructor(void *obj)
195 {
196         struct ast_sip_subscription *sub = obj;
197
198         ast_debug(3, "Destroying SIP subscription\n");
199
200         ao2_cleanup(sub->datastores);
201         ao2_cleanup(sub->endpoint);
202
203         if (sub->dlg) {
204                 /* This is why we keep the dialog on the subscription. When the subscription
205                  * is destroyed, there is no guarantee that the underlying dialog is ready
206                  * to be destroyed. Furthermore, there's no guarantee in the opposite direction
207                  * either. The dialog could be destroyed before our subscription is. We fix
208                  * this problem by keeping a reference to the dialog until it is time to
209                  * destroy the subscription. We need to have the dialog available when the
210                  * subscription is destroyed so that we can guarantee that our attempt to
211                  * remove the serializer will be successful.
212                  */
213                 ast_sip_dialog_set_serializer(sub->dlg, NULL);
214                 pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
215         }
216         ast_taskprocessor_unreference(sub->serializer);
217 }
218
219 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
220 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
221 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
222                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
223 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
224                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
225 static void pubsub_on_client_refresh(pjsip_evsub *sub);
226 static void pubsub_on_server_timeout(pjsip_evsub *sub);
227
228
229 static pjsip_evsub_user pubsub_cb = {
230         .on_evsub_state = pubsub_on_evsub_state,
231         .on_tsx_state = pubsub_on_tsx_state,
232         .on_rx_refresh = pubsub_on_rx_refresh,
233         .on_rx_notify = pubsub_on_rx_notify,
234         .on_client_refresh = pubsub_on_client_refresh,
235         .on_server_timeout = pubsub_on_server_timeout,
236 };
237
238 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
239                 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
240 {
241         pjsip_evsub *evsub;
242         /* PJSIP is kind enough to have some built-in support for certain
243          * events. We need to use the correct initialization function for the
244          * built-in events
245          */
246         if (role == AST_SIP_NOTIFIER) {
247                 if (!strcmp(event, "message-summary")) {
248                         pjsip_mwi_create_uas(dlg, &pubsub_cb, rdata, &evsub);
249                 } else {
250                         pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
251                 }
252         } else {
253                 if (!strcmp(event, "message-summary")) {
254                         pjsip_mwi_create_uac(dlg, &pubsub_cb, 0, &evsub);
255                 } else {
256                         pj_str_t pj_event;
257                         pj_cstr(&pj_event, event);
258                         pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
259                 }
260         }
261         return evsub;
262 }
263
264 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
265                 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
266 {
267         struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
268         pjsip_dialog *dlg;
269
270         if (!sub) {
271                 return NULL;
272         }
273         sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
274         if (!sub->datastores) {
275                 ao2_ref(sub, -1);
276                 return NULL;
277         }
278         sub->serializer = ast_sip_create_serializer();
279         if (!sub->serializer) {
280                 ao2_ref(sub, -1);
281                 return NULL;
282         }
283         sub->role = role;
284         if (role == AST_SIP_NOTIFIER) {
285                 pjsip_dlg_create_uas(pjsip_ua_instance(), rdata, NULL, &dlg);
286         } else {
287                 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
288
289                 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
290                 if (!contact || ast_strlen_zero(contact->uri)) {
291                         ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
292                                         ast_sorcery_object_get_id(endpoint));
293                         ao2_ref(sub, -1);
294                         return NULL;
295                 }
296                 dlg = ast_sip_create_dialog(endpoint, contact->uri, NULL);
297         }
298         if (!dlg) {
299                 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
300                 ao2_ref(sub, -1);
301                 return NULL;
302         }
303         sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
304         /* We keep a reference to the dialog until our subscription is destroyed. See
305          * the subscription_destructor for more details
306          */
307         pjsip_dlg_inc_session(dlg, &pubsub_module);
308         sub->dlg = dlg;
309         ast_sip_dialog_set_serializer(dlg, sub->serializer);
310         pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
311         ao2_ref(endpoint, +1);
312         sub->endpoint = endpoint;
313         sub->handler = handler;
314         return sub;
315 }
316
317 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
318 {
319         ast_assert(sub->endpoint != NULL);
320         ao2_ref(sub->endpoint, +1);
321         return sub->endpoint;
322 }
323
324 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
325 {
326         ast_assert(sub->serializer != NULL);
327         return sub->serializer;
328 }
329
330 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
331 {
332         return sub->evsub;
333 }
334
335 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
336 {
337         return sub->dlg;
338 }
339
340 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
341 {
342         return pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
343                         tdata) == PJ_SUCCESS ? 0 : -1;
344 }
345
346 static void subscription_datastore_destroy(void *obj)
347 {
348         struct ast_datastore *datastore = obj;
349
350         /* Using the destroy function (if present) destroy the data */
351         if (datastore->info->destroy != NULL && datastore->data != NULL) {
352                 datastore->info->destroy(datastore->data);
353                 datastore->data = NULL;
354         }
355
356         ast_free((void *) datastore->uid);
357         datastore->uid = NULL;
358 }
359
360 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
361 {
362         RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
363         const char *uid_ptr = uid;
364
365         if (!info) {
366                 return NULL;
367         }
368
369         datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
370         if (!datastore) {
371                 return NULL;
372         }
373
374         datastore->info = info;
375         if (ast_strlen_zero(uid)) {
376                 /* They didn't provide an ID so we'll provide one ourself */
377                 struct ast_uuid *uuid = ast_uuid_generate();
378                 char uuid_buf[AST_UUID_STR_LEN];
379                 if (!uuid) {
380                         return NULL;
381                 }
382                 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
383                 ast_free(uuid);
384         }
385
386         datastore->uid = ast_strdup(uid_ptr);
387         if (!datastore->uid) {
388                 return NULL;
389         }
390
391         ao2_ref(datastore, +1);
392         return datastore;
393 }
394
395 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
396 {
397         ast_assert(datastore != NULL);
398         ast_assert(datastore->info != NULL);
399         ast_assert(!ast_strlen_zero(datastore->uid));
400
401         if (!ao2_link(subscription->datastores, datastore)) {
402                 return -1;
403         }
404         return 0;
405 }
406
407 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
408 {
409         return ao2_find(subscription->datastores, name, OBJ_KEY);
410 }
411
412 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
413 {
414         ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
415 }
416
417 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
418 {
419         ast_assert(datastore != NULL);
420         ast_assert(datastore->info != NULL);
421         ast_assert(!ast_strlen_zero(datastore->uid));
422
423         if (!ao2_link(publication->datastores, datastore)) {
424                 return -1;
425         }
426         return 0;
427 }
428
429 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
430 {
431         return ao2_find(publication->datastores, name, OBJ_KEY);
432 }
433
434 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
435 {
436         ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
437 }
438
439 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
440
441 static int publication_hash_fn(const void *obj, const int flags)
442 {
443         const struct ast_sip_publication *publication = obj;
444         const int *entity_tag = obj;
445
446         return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
447 }
448
449 static int publication_cmp_fn(void *obj, void *arg, int flags)
450 {
451         const struct ast_sip_publication *publication1 = obj;
452         const struct ast_sip_publication *publication2 = arg;
453         const int *entity_tag = arg;
454
455         return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
456                 CMP_MATCH | CMP_STOP : 0);
457 }
458
459 static void publish_add_handler(struct ast_sip_publish_handler *handler)
460 {
461         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
462         AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
463 }
464
465 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
466 {
467         if (ast_strlen_zero(handler->event_name)) {
468                 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
469                 return -1;
470         }
471
472         if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
473                 publication_hash_fn, publication_cmp_fn))) {
474                 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
475                         handler->event_name);
476                 return -1;
477         }
478
479         publish_add_handler(handler);
480
481         ast_module_ref(ast_module_info->self);
482
483         return 0;
484 }
485
486 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
487 {
488         struct ast_sip_publish_handler *iter;
489         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
490         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
491                 if (handler == iter) {
492                         AST_RWLIST_REMOVE_CURRENT(next);
493                         ao2_cleanup(handler->publications);
494                         ast_module_unref(ast_module_info->self);
495                         break;
496                 }
497         }
498         AST_RWLIST_TRAVERSE_SAFE_END;
499 }
500
501 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
502
503 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
504 {
505         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
506         AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
507         ast_module_ref(ast_module_info->self);
508 }
509
510 static int sub_handler_exists_for_event_name(const char *event_name)
511 {
512         struct ast_sip_subscription_handler *iter;
513         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
514
515         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
516                 if (!strcmp(iter->event_name, event_name)) {
517                         return 1;
518                 }
519         }
520         return 0;
521 }
522
523 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
524 {
525         pj_str_t accept[AST_SIP_MAX_ACCEPT];
526         int i;
527
528         if (ast_strlen_zero(handler->event_name)) {
529                 ast_log(LOG_ERROR, "No event package specifief for subscription handler. Cannot register\n");
530                 return -1;
531         }
532
533         if (ast_strlen_zero(handler->accept[0])) {
534                 ast_log(LOG_ERROR, "Subscription handler must supply at least one 'Accept' format\n");
535                 return -1;
536         }
537
538         for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
539                 pj_cstr(&accept[i], handler->accept[i]);
540         }
541
542         if (!sub_handler_exists_for_event_name(handler->event_name)) {
543                 pj_str_t event;
544
545                 pj_cstr(&event, handler->event_name);
546
547                 if (!strcmp(handler->event_name, "message-summary")) {
548                         pjsip_mwi_init_module(ast_sip_get_pjsip_endpoint(), pjsip_evsub_instance());
549                 } else {
550                         pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
551                 }
552         } else {
553                 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, PJSIP_H_ACCEPT, NULL,
554                         i, accept);
555         }
556
557         sub_add_handler(handler);
558         return 0;
559 }
560
561 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
562 {
563         struct ast_sip_subscription_handler *iter;
564         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
565         AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
566                 if (handler == iter) {
567                         AST_RWLIST_REMOVE_CURRENT(next);
568                         ast_module_unref(ast_module_info->self);
569                         break;
570                 }
571         }
572         AST_RWLIST_TRAVERSE_SAFE_END;
573 }
574
575 static struct ast_sip_subscription_handler *find_sub_handler(const char *event, char accept[AST_SIP_MAX_ACCEPT][64], size_t num_accept)
576 {
577         struct ast_sip_subscription_handler *iter;
578         int match = 0;
579         SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
580         AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
581                 int i;
582                 int j;
583                 if (strcmp(event, iter->event_name)) {
584                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
585                         continue;
586                 }
587                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
588                 if (!num_accept && iter->handles_default_accept) {
589                         /* The SUBSCRIBE contained no Accept headers, and this subscription handler
590                          * provides the default body type, so it's a match!
591                          */
592                         break;
593                 }
594                 for (i = 0; i < num_accept; ++i) {
595                         for (j = 0; j < num_accept; ++j) {
596                                 if (ast_strlen_zero(iter->accept[i])) {
597                                         ast_debug(3, "Breaking because subscription handler has run out of 'accept' types\n");
598                                         break;
599                                 }
600                                 if (!strcmp(accept[j], iter->accept[i])) {
601                                         ast_debug(3, "Accept headers match: %s = %s\n", accept[j], iter->accept[i]);
602                                         match = 1;
603                                         break;
604                                 }
605                                 ast_debug(3, "Accept %s does not match %s\n", accept[j], iter->accept[i]);
606                         }
607                         if (match) {
608                                 break;
609                         }
610                 }
611                 if (match) {
612                         break;
613                 }
614         }
615
616         return iter;
617 }
618
619 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
620 {
621         char event[32];
622         char accept[AST_SIP_MAX_ACCEPT][64];
623         pjsip_accept_hdr *accept_header;
624         pjsip_event_hdr *event_header;
625         pjsip_expires_hdr *expires_header;
626         struct ast_sip_subscription_handler *handler;
627         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
628         struct ast_sip_subscription *sub;
629         size_t num_accept_headers;
630
631         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
632         ast_assert(endpoint != NULL);
633
634         if (!endpoint->subscription.allow) {
635                 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
636                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
637                 return PJ_TRUE;
638         }
639
640         expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
641
642         if (expires_header && expires_header->ivalue < endpoint->subscription.minexpiry) {
643                 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %d\n",
644                                 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
645                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
646                 return PJ_TRUE;
647         }
648
649         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
650         if (!event_header) {
651                 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
652                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
653                 return PJ_TRUE;
654         }
655         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
656
657         accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
658         if (accept_header) {
659                 int i;
660
661                 for (i = 0; i < accept_header->count; ++i) {
662                         ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
663                 }
664                 num_accept_headers = accept_header->count;
665         } else {
666                 num_accept_headers = 0;
667         }
668
669         handler = find_sub_handler(event, accept, num_accept_headers);
670         if (!handler) {
671                 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
672                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
673                 return PJ_TRUE;
674         }
675         sub = handler->new_subscribe(endpoint, rdata);
676         if (!sub) {
677                 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
678
679                 if (trans) {
680                         pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
681                         pjsip_tx_data *tdata;
682
683                         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
684                                 return PJ_TRUE;
685                         }
686                         pjsip_dlg_send_response(dlg, trans, tdata);
687                 } else {
688                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
689                 }
690         }
691         return PJ_TRUE;
692 }
693
694 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
695 {
696         struct ast_sip_publish_handler *iter = NULL;
697         SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
698
699         AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
700                 if (strcmp(event, iter->event_name)) {
701                         ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
702                         continue;
703                 }
704                 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
705                 break;
706         }
707
708         return iter;
709 }
710
711 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
712         pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
713 {
714         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
715
716         if (etag_hdr) {
717                 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
718
719                 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
720
721                 if (sscanf(etag, "%30d", entity_id) != 1) {
722                         return SIP_PUBLISH_UNKNOWN;
723                 }
724         }
725
726         *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
727
728         if (!(*expires)) {
729                 return SIP_PUBLISH_REMOVE;
730         } else if (!etag_hdr && rdata->msg_info.msg->body) {
731                 return SIP_PUBLISH_INITIAL;
732         } else if (etag_hdr && !rdata->msg_info.msg->body) {
733                 return SIP_PUBLISH_REFRESH;
734         } else if (etag_hdr && rdata->msg_info.msg->body) {
735                 return SIP_PUBLISH_MODIFY;
736         }
737
738         return SIP_PUBLISH_UNKNOWN;
739 }
740
741 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
742         struct ast_sip_publish_handler *handler)
743 {
744         struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
745
746         if (!publication) {
747                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
748                 return NULL;
749         }
750
751         publication->handler = handler;
752
753         return publication;
754 }
755
756 static int publish_expire_callback(void *data)
757 {
758         RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
759
760         publication->handler->publish_expire(publication);
761
762         return 0;
763 }
764
765 static int publish_expire(const void *data)
766 {
767         struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
768
769         ao2_unlink(publication->handler->publications, publication);
770         publication->sched_id = -1;
771
772         if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
773                 ao2_cleanup(publication);
774         }
775
776         return 0;
777 }
778
779 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
780 {
781         pjsip_event_hdr *event_header;
782         struct ast_sip_publish_handler *handler;
783         RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
784         char event[32];
785         static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
786         pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
787         enum sip_publish_type publish_type;
788         RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
789         int expires = 0, entity_id;
790
791         endpoint = ast_pjsip_rdata_get_endpoint(rdata);
792         ast_assert(endpoint != NULL);
793
794         event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
795         if (!event_header) {
796                 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
797                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
798                 return PJ_TRUE;
799         }
800         ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
801
802         handler = find_pub_handler(event);
803         if (!handler) {
804                 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
805                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
806                 return PJ_TRUE;
807         }
808
809         publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
810
811         /* If this is not an initial publish ensure that a publication is present */
812         if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
813                 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
814                         static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
815
816                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
817                                 NULL, NULL);
818                         return PJ_TRUE;
819                 }
820
821                 /* Per the RFC every response has to have a new entity tag */
822                 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
823
824                 /* Update the expires here so that the created responses will contain the correct value */
825                 publication->expires = expires;
826         }
827
828         switch (publish_type) {
829                 case SIP_PUBLISH_INITIAL:
830                         publication = publish_request_initial(endpoint, rdata, handler);
831                         break;
832                 case SIP_PUBLISH_REFRESH:
833                 case SIP_PUBLISH_MODIFY:
834                         if (handler->publish_refresh(publication, rdata)) {
835                                 /* If an error occurs we want to terminate the publication */
836                                 expires = 0;
837                         }
838                         break;
839                 case SIP_PUBLISH_REMOVE:
840                         handler->publish_termination(publication, rdata);
841                         break;
842                 case SIP_PUBLISH_UNKNOWN:
843                 default:
844                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
845                         break;
846         }
847
848         if (publication) {
849                 if (expires) {
850                         ao2_link(handler->publications, publication);
851
852                         AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
853                                                 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
854                 } else {
855                         AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
856                 }
857         }
858
859         return PJ_TRUE;
860 }
861
862 /*! \brief Internal destructor for publications */
863 static void publication_destroy_fn(void *obj)
864 {
865         struct ast_sip_publication *publication = obj;
866
867         ast_debug(3, "Destroying SIP publication\n");
868
869         ao2_cleanup(publication->datastores);
870         ao2_cleanup(publication->endpoint);
871 }
872
873 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
874 {
875         struct ast_sip_publication *publication;
876         pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
877
878         ast_assert(endpoint != NULL);
879
880         if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
881                 return NULL;
882         }
883
884         if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
885                 ao2_ref(publication, -1);
886                 return NULL;
887         }
888
889         publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
890         ao2_ref(endpoint, +1);
891         publication->endpoint = endpoint;
892         publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
893         publication->sched_id = -1;
894
895         return publication;
896 }
897
898 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
899 {
900         return pub->endpoint;
901 }
902
903 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
904         pjsip_tx_data **tdata)
905 {
906         if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
907                 return -1;
908         }
909
910         if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
911                 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
912                 RAII_VAR(char *, expires, NULL, ast_free_ptr);
913
914                 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
915                         (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
916                         pjsip_tx_data_dec_ref(*tdata);
917                         return -1;
918                 }
919
920                 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
921                 ast_sip_add_header(*tdata, "Expires", expires);
922         }
923
924         return 0;
925 }
926
927 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
928         pjsip_tx_data *tdata)
929 {
930         pj_status_t status;
931         pjsip_transaction *tsx;
932
933         if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
934                 return status;
935         }
936
937         pjsip_tsx_recv_msg(tsx, rdata);
938
939         return pjsip_tsx_send_msg(tsx, tdata);
940 }
941
942 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
943 {
944         if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
945                 return pubsub_on_rx_subscribe_request(rdata);
946         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
947                 return pubsub_on_rx_publish_request(rdata);
948         }
949
950         return PJ_FALSE;
951 }
952
953 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
954 {
955         struct ast_sip_subscription *sub;
956         if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
957                 return;
958         }
959
960         sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
961         if (!sub) {
962                 return;
963         }
964
965         if (event->type == PJSIP_EVENT_RX_MSG) {
966                 sub->handler->subscription_terminated(sub, event->body.rx_msg.rdata);
967         }
968
969         if (event->type == PJSIP_EVENT_TSX_STATE &&
970                         event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
971                 sub->handler->subscription_terminated(sub, event->body.tsx_state.src.rdata);
972         }
973
974         if (sub->handler->subscription_shutdown) {
975                 sub->handler->subscription_shutdown(sub);
976         }
977         pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
978 }
979
980 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
981 {
982         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
983
984         if (!sub) {
985                 return;
986         }
987
988         if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
989             event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
990                 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
991         }
992 }
993
994 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
995                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
996                 struct ast_sip_subscription_response_data *response_data)
997 {
998         ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
999         *p_st_code = response_data->status_code;
1000
1001         if (!ast_strlen_zero(response_data->status_text)) {
1002                 pj_strdup2(pool, *p_st_text, response_data->status_text);
1003         }
1004
1005         if (response_data->headers) {
1006                 struct ast_variable *iter;
1007                 for (iter = response_data->headers; iter; iter = iter->next) {
1008                         pj_str_t header_name;
1009                         pj_str_t header_value;
1010                         pjsip_generic_string_hdr *hdr;
1011
1012                         pj_cstr(&header_name, iter->name);
1013                         pj_cstr(&header_value, iter->value);
1014                         hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1015                         pj_list_insert_before(res_hdr, hdr);
1016                 }
1017         }
1018
1019         if (response_data->body) {
1020                 pj_str_t type;
1021                 pj_str_t subtype;
1022                 pj_str_t body_text;
1023
1024                 pj_cstr(&type, response_data->body->type);
1025                 pj_cstr(&subtype, response_data->body->subtype);
1026                 pj_cstr(&body_text, response_data->body->body_text);
1027
1028                 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1029         }
1030 }
1031
1032 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1033 {
1034         if (response_data->status_code != 200 ||
1035                         !ast_strlen_zero(response_data->status_text) ||
1036                         response_data->headers ||
1037                         response_data->body) {
1038                 return 1;
1039         }
1040         return 0;
1041 }
1042
1043 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1044                 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1045 {
1046         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1047         struct ast_sip_subscription_response_data response_data = {
1048                 .status_code = 200,
1049         };
1050
1051         if (!sub) {
1052                 return;
1053         }
1054
1055         sub->handler->resubscribe(sub, rdata, &response_data);
1056
1057         if (!response_data_changed(&response_data)) {
1058                 return;
1059         }
1060
1061         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1062                         res_hdr, p_body, &response_data);
1063 }
1064
1065 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1066                 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1067 {
1068         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1069         struct ast_sip_subscription_response_data response_data = {
1070                 .status_code = 200,
1071         };
1072
1073         if (!sub || !sub->handler->notify_request) {
1074                 return;
1075         }
1076
1077         sub->handler->notify_request(sub, rdata, &response_data);
1078
1079         if (!response_data_changed(&response_data)) {
1080                 return;
1081         }
1082
1083         set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1084                         res_hdr, p_body, &response_data);
1085 }
1086
1087 static int serialized_pubsub_on_client_refresh(void *userdata)
1088 {
1089         struct ast_sip_subscription *sub = userdata;
1090
1091         sub->handler->refresh_subscription(sub);
1092         ao2_cleanup(sub);
1093         return 0;
1094 }
1095
1096 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1097 {
1098         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1099
1100         ao2_ref(sub, +1);
1101         ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1102 }
1103
1104 static int serialized_pubsub_on_server_timeout(void *userdata)
1105 {
1106         struct ast_sip_subscription *sub = userdata;
1107
1108         sub->handler->subscription_timeout(sub);
1109         ao2_cleanup(sub);
1110         return 0;
1111 }
1112
1113 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1114 {
1115         struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1116
1117         ao2_ref(sub, +1);
1118         ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1119 }
1120
1121 static int load_module(void)
1122 {
1123         static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1124
1125         pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1126
1127         if (!(sched = ast_sched_context_create())) {
1128                 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1129                 return AST_MODULE_LOAD_FAILURE;
1130         }
1131
1132         if (ast_sched_start_thread(sched)) {
1133                 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1134                 ast_sched_context_destroy(sched);
1135                 return AST_MODULE_LOAD_FAILURE;
1136         }
1137
1138         pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1139
1140         if (ast_sip_register_service(&pubsub_module)) {
1141                 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1142                 ast_sched_context_destroy(sched);
1143                 return AST_MODULE_LOAD_FAILURE;
1144         }
1145
1146         return AST_MODULE_LOAD_SUCCESS;
1147 }
1148
1149 static int unload_module(void)
1150 {
1151         if (sched) {
1152                 ast_sched_context_destroy(sched);
1153         }
1154
1155         return 0;
1156 }
1157
1158 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1159                 .load = load_module,
1160                 .unload = unload_module,
1161                 .load_pri = AST_MODPRI_CHANNEL_DEPEND,
1162 );