2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * Mark Michelson <mmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
19 * \brief Opaque structure representing an RFC 3265 SIP subscription
23 <depend>pjproject</depend>
24 <depend>res_pjsip</depend>
25 <support_level>core</support_level>
31 #include <pjsip_simple.h>
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
49 <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
56 Provides a listing of all inbound subscriptions. An event <literal>InboundSubscriptionDetail</literal>
57 is issued for each subscription object. Once all detail events are completed an
58 <literal>InboundSubscriptionDetailComplete</literal> event is issued.
62 <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
69 Provides a listing of all outbound subscriptions. An event <literal>OutboundSubscriptionDetail</literal>
70 is issued for each subscription object. Once all detail events are completed an
71 <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
75 <configInfo name="res_pjsip_pubsub" language="en_US">
76 <synopsis>Module that implements publish and subscribe support.</synopsis>
77 <configFile name="pjsip.conf">
78 <configObject name="subscription_persistence">
79 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
80 <configOption name="packet">
81 <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
83 <configOption name="src_name">
84 <synopsis>The source address of the subscription</synopsis>
86 <configOption name="src_port">
87 <synopsis>The source port of the subscription</synopsis>
89 <configOption name="transport_key">
90 <synopsis>The type of transport the subscription was received on</synopsis>
92 <configOption name="local_name">
93 <synopsis>The local address the subscription was received on</synopsis>
95 <configOption name="local_port">
96 <synopsis>The local port the subscription was received on</synopsis>
98 <configOption name="cseq">
99 <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
101 <configOption name="tag">
102 <synopsis>The local tag of the dialog for the subscription</synopsis>
104 <configOption name="endpoint">
105 <synopsis>The name of the endpoint that subscribed</synopsis>
107 <configOption name="expires">
108 <synopsis>The time at which the subscription expires</synopsis>
115 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
117 static struct pjsip_module pubsub_module = {
118 .name = { "PubSub Module", 13 },
119 .priority = PJSIP_MOD_PRIORITY_APPLICATION,
120 .on_rx_request = pubsub_on_rx_request,
123 #define MOD_DATA_BODY_GENERATOR "sub_body_generator"
124 #define MOD_DATA_PERSISTENCE "sub_persistence"
126 static const pj_str_t str_event_name = { "Event", 5 };
128 /*! \brief Scheduler used for automatically expiring publications */
129 static struct ast_sched_context *sched;
131 /*! \brief Number of buckets for publications (on a per handler) */
132 #define PUBLICATIONS_BUCKETS 37
134 /*! \brief Default expiration time for PUBLISH if one is not specified */
135 #define DEFAULT_PUBLISH_EXPIRES 3600
137 /*! \brief Defined method for PUBLISH */
138 const pjsip_method pjsip_publish_method =
145 * \brief The types of PUBLISH messages defined in RFC 3903
147 enum sip_publish_type {
152 * This actually is not defined in RFC 3903. We use this as a constant
153 * to indicate that an incoming PUBLISH does not fit into any of the
154 * other categories and is thus invalid.
162 * The first PUBLISH sent. This will contain a non-zero Expires header
163 * as well as a body that indicates the current state of the endpoint
164 * that has sent the message. The initial PUBLISH is the only type
165 * of PUBLISH to not contain a Sip-If-Match header in it.
173 * Used to keep a published state from expiring. This will contain a
174 * non-zero Expires header but no body since its purpose is not to
183 * Used to change state from its previous value. This will contain
184 * a body updating the published state. May or may not contain an
193 * Used to remove published state from an ESC. This will contain
194 * an Expires header set to 0 and likely no body.
200 * Used to create new entity IDs by ESCs.
202 static int esc_etag_counter;
205 * \brief Structure representing a SIP publication
207 struct ast_sip_publication {
208 /*! Publication datastores set up by handlers */
209 struct ao2_container *datastores;
210 /*! \brief Entity tag for the publication */
212 /*! \brief Handler for this publication */
213 struct ast_sip_publish_handler *handler;
214 /*! \brief The endpoint with which the subscription is communicating */
215 struct ast_sip_endpoint *endpoint;
216 /*! \brief Expiration time of the publication */
218 /*! \brief Scheduled item for expiration of publication */
224 * \brief Structure used for persisting an inbound subscription
226 struct subscription_persistence {
227 /*! Sorcery object details */
228 SORCERY_OBJECT(details);
229 /*! The name of the endpoint involved in the subscrption */
231 /*! SIP message that creates the subscription */
232 char packet[PJSIP_MAX_PKT_LEN];
233 /*! Source address of the message */
234 char src_name[PJ_INET6_ADDRSTRLEN];
235 /*! Source port of the message */
237 /*! Local transport key type */
238 char transport_key[32];
239 /*! Local transport address */
240 char local_name[PJ_INET6_ADDRSTRLEN];
241 /*! Local transport port */
243 /*! Next CSeq to use for message */
245 /*! Local tag of the dialog */
247 /*! When this subscription expires */
248 struct timeval expires;
252 * \brief Structure representing a SIP subscription
254 struct ast_sip_subscription {
255 /*! Subscription datastores set up by handlers */
256 struct ao2_container *datastores;
257 /*! The endpoint with which the subscription is communicating */
258 struct ast_sip_endpoint *endpoint;
259 /*! Serializer on which to place operations for this subscription */
260 struct ast_taskprocessor *serializer;
261 /*! The handler for this subscription */
262 const struct ast_sip_subscription_handler *handler;
263 /*! The role for this subscription */
264 enum ast_sip_subscription_role role;
265 /*! The underlying PJSIP event subscription structure */
267 /*! The underlying PJSIP dialog */
269 /*! Body generaator for NOTIFYs */
270 struct ast_sip_pubsub_body_generator *body_generator;
271 /*! Persistence information */
272 struct subscription_persistence *persistence;
273 /*! Next item in the list */
274 AST_LIST_ENTRY(ast_sip_subscription) next;
277 static const char *sip_subscription_roles_map[] = {
278 [AST_SIP_SUBSCRIBER] = "Subscriber",
279 [AST_SIP_NOTIFIER] = "Notifier"
282 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
284 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
285 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
287 /*! \brief Destructor for subscription persistence */
288 static void subscription_persistence_destroy(void *obj)
290 struct subscription_persistence *persistence = obj;
292 ast_free(persistence->endpoint);
293 ast_free(persistence->tag);
296 /*! \brief Allocator for subscription persistence */
297 static void *subscription_persistence_alloc(const char *name)
299 return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
302 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
303 static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
305 char tag[PJ_GUID_STRING_LENGTH + 1];
307 /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
308 * look it up by id at all.
310 struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
311 "subscription_persistence", NULL);
317 persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
318 ast_copy_pj_str(tag, &sub->dlg->local.info->tag, sizeof(tag));
319 persistence->tag = ast_strdup(tag);
321 ast_sorcery_create(ast_sip_get_sorcery(), persistence);
325 /*! \brief Function which updates persistence information of a subscription in sorcery */
326 static void subscription_persistence_update(struct ast_sip_subscription *sub,
327 pjsip_rx_data *rdata)
329 if (!sub->persistence) {
333 sub->persistence->cseq = sub->dlg->local.cseq;
337 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
339 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
340 sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
342 ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
343 ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
344 sub->persistence->src_port = rdata->pkt_info.src_port;
345 ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
346 sizeof(sub->persistence->transport_key));
347 ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
348 sizeof(sub->persistence->local_name));
349 sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
352 ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
355 /*! \brief Function which removes persistence of a subscription from sorcery */
356 static void subscription_persistence_remove(struct ast_sip_subscription *sub)
358 if (!sub->persistence) {
362 ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
363 ao2_ref(sub->persistence, -1);
367 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
368 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
371 /*! \brief Retrieve a handler using the Event header of an rdata message */
372 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
374 pjsip_event_hdr *event_header;
376 struct ast_sip_subscription_handler *handler;
378 event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
380 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
383 ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
385 handler = find_sub_handler_for_event_name(event);
387 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
393 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
394 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
395 const struct ast_sip_subscription_handler *handler)
397 pjsip_accept_hdr *accept_header;
398 char accept[AST_SIP_MAX_ACCEPT][64];
399 size_t num_accept_headers;
401 accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
405 for (i = 0; i < accept_header->count; ++i) {
406 ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
408 num_accept_headers = accept_header->count;
410 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
411 * the default accept type for the event package is to be used.
413 ast_copy_string(accept[0], handler->default_accept, sizeof(accept[0]));
414 num_accept_headers = 1;
417 return find_body_generator(accept, num_accept_headers);
420 /*! \brief Callback function to perform the actual recreation of a subscription */
421 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
423 struct subscription_persistence *persistence = obj;
424 pj_pool_t *pool = arg;
425 pjsip_rx_data rdata = { { 0, }, };
426 pjsip_expires_hdr *expires_header;
427 struct ast_sip_subscription_handler *handler;
428 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
429 struct ast_sip_subscription *sub;
430 struct ast_sip_pubsub_body_generator *generator;
432 /* If this subscription has already expired remove it */
433 if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
434 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
438 endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
440 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
441 persistence->endpoint);
442 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
447 rdata.tp_info.pool = pool;
449 if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
450 persistence->transport_key, persistence->local_name, persistence->local_port)) {
451 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
452 persistence->endpoint);
453 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
457 /* Update the expiration header with the new expiration */
458 expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
459 if (!expires_header) {
460 expires_header = pjsip_expires_hdr_create(pool, 0);
461 if (!expires_header) {
462 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
465 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
467 expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
469 handler = subscription_get_handler_from_rdata(&rdata);
471 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
475 generator = subscription_get_generator_from_rdata(&rdata, handler);
477 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
481 ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
482 pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
483 ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
484 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
486 sub = handler->new_subscribe(endpoint, &rdata);
488 sub->persistence = ao2_bump(persistence);
489 subscription_persistence_update(sub, &rdata);
491 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
497 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
498 static int subscription_persistence_load(void *data)
500 struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
501 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
504 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
505 PJSIP_POOL_RDATA_INC);
507 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
511 ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
513 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
515 ao2_ref(persisted_subscriptions, -1);
519 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
520 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
522 struct ast_json_payload *payload;
525 if (stasis_message_type(message) != ast_manager_get_generic_type()) {
529 payload = stasis_message_data(message);
530 type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
532 /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
533 * recreate SIP subscriptions.
535 if (strcmp(type, "FullyBooted")) {
539 /* This has to be here so the subscription is recreated when the body generator is available */
540 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
542 /* Once the system is fully booted we don't care anymore */
543 stasis_unsubscribe(sub);
546 static void add_subscription(struct ast_sip_subscription *obj)
548 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
549 AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
550 ast_module_ref(ast_module_info->self);
553 static void remove_subscription(struct ast_sip_subscription *obj)
555 struct ast_sip_subscription *i;
556 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
557 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
559 AST_RWLIST_REMOVE_CURRENT(next);
560 ast_module_unref(ast_module_info->self);
564 AST_RWLIST_TRAVERSE_SAFE_END;
567 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
569 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
572 struct ast_sip_subscription *i;
573 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
575 if (!on_subscription) {
579 AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
580 if (on_subscription(i, arg)) {
588 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
589 struct ast_str **buf)
592 struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
594 ast_str_append(buf, 0, "Role: %s\r\n",
595 sip_subscription_roles_map[sub->role]);
596 ast_str_append(buf, 0, "Endpoint: %s\r\n",
597 ast_sorcery_object_get_id(sub->endpoint));
599 ast_copy_pj_str(str, &sub->dlg->call_id->id, sizeof(str));
600 ast_str_append(buf, 0, "Callid: %s\r\n", str);
602 ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
603 ast_sip_subscription_get_evsub(sub)));
605 ast_callerid_merge(str, sizeof(str),
606 S_COR(id->self.name.valid, id->self.name.str, NULL),
607 S_COR(id->self.number.valid, id->self.number.str, NULL),
610 ast_str_append(buf, 0, "Callerid: %s\r\n", str);
612 if (sub->handler->to_ami) {
613 sub->handler->to_ami(sub, buf);
617 #define DATASTORE_BUCKETS 53
619 #define DEFAULT_EXPIRES 3600
621 static int datastore_hash(const void *obj, int flags)
623 const struct ast_datastore *datastore = obj;
624 const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
626 ast_assert(uid != NULL);
628 return ast_str_hash(uid);
631 static int datastore_cmp(void *obj, void *arg, int flags)
633 const struct ast_datastore *datastore1 = obj;
634 const struct ast_datastore *datastore2 = arg;
635 const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
637 ast_assert(datastore1->uid != NULL);
638 ast_assert(uid2 != NULL);
640 return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
643 static int subscription_remove_serializer(void *obj)
645 struct ast_sip_subscription *sub = obj;
647 /* This is why we keep the dialog on the subscription. When the subscription
648 * is destroyed, there is no guarantee that the underlying dialog is ready
649 * to be destroyed. Furthermore, there's no guarantee in the opposite direction
650 * either. The dialog could be destroyed before our subscription is. We fix
651 * this problem by keeping a reference to the dialog until it is time to
652 * destroy the subscription. We need to have the dialog available when the
653 * subscription is destroyed so that we can guarantee that our attempt to
654 * remove the serializer will be successful.
656 ast_sip_dialog_set_serializer(sub->dlg, NULL);
657 pjsip_dlg_dec_session(sub->dlg, &pubsub_module);
662 static void subscription_destructor(void *obj)
664 struct ast_sip_subscription *sub = obj;
666 ast_debug(3, "Destroying SIP subscription\n");
668 subscription_persistence_remove(sub);
670 remove_subscription(sub);
672 ao2_cleanup(sub->datastores);
673 ao2_cleanup(sub->endpoint);
676 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
678 ast_taskprocessor_unreference(sub->serializer);
681 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
682 static void pubsub_on_tsx_state(pjsip_evsub *sub, pjsip_transaction *tsx, pjsip_event *event);
683 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
684 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
685 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
686 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
687 static void pubsub_on_client_refresh(pjsip_evsub *sub);
688 static void pubsub_on_server_timeout(pjsip_evsub *sub);
691 static pjsip_evsub_user pubsub_cb = {
692 .on_evsub_state = pubsub_on_evsub_state,
693 .on_tsx_state = pubsub_on_tsx_state,
694 .on_rx_refresh = pubsub_on_rx_refresh,
695 .on_rx_notify = pubsub_on_rx_notify,
696 .on_client_refresh = pubsub_on_client_refresh,
697 .on_server_timeout = pubsub_on_server_timeout,
700 static pjsip_evsub *allocate_evsub(const char *event, enum ast_sip_subscription_role role,
701 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, pjsip_dialog *dlg)
704 /* PJSIP is kind enough to have some built-in support for certain
705 * events. We need to use the correct initialization function for the
708 if (role == AST_SIP_NOTIFIER) {
709 pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &evsub);
712 pj_cstr(&pj_event, event);
713 pjsip_evsub_create_uac(dlg, &pubsub_cb, &pj_event, 0, &evsub);
718 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
719 enum ast_sip_subscription_role role, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
721 struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub), subscription_destructor);
723 struct subscription_persistence *persistence;
728 sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
729 if (!sub->datastores) {
733 sub->serializer = ast_sip_create_serializer();
734 if (!sub->serializer) {
738 sub->body_generator = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
739 pubsub_module.id, MOD_DATA_BODY_GENERATOR);
741 if (role == AST_SIP_NOTIFIER) {
742 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
744 RAII_VAR(struct ast_sip_contact *, contact, NULL, ao2_cleanup);
746 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
747 if (!contact || ast_strlen_zero(contact->uri)) {
748 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
749 ast_sorcery_object_get_id(endpoint));
753 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
756 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
760 persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
761 pubsub_module.id, MOD_DATA_PERSISTENCE);
763 /* Update the created dialog with the persisted information */
764 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
765 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
766 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
767 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
768 dlg->local.cseq = persistence->cseq;
769 dlg->remote.cseq = persistence->cseq;
771 sub->evsub = allocate_evsub(handler->event_name, role, endpoint, rdata, dlg);
772 /* We keep a reference to the dialog until our subscription is destroyed. See
773 * the subscription_destructor for more details
775 pjsip_dlg_inc_session(dlg, &pubsub_module);
777 ast_sip_dialog_set_serializer(dlg, sub->serializer);
778 pjsip_evsub_set_mod_data(sub->evsub, pubsub_module.id, sub);
779 ao2_ref(endpoint, +1);
780 sub->endpoint = endpoint;
781 sub->handler = handler;
783 add_subscription(sub);
787 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
789 ast_assert(sub->endpoint != NULL);
790 ao2_ref(sub->endpoint, +1);
791 return sub->endpoint;
794 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
796 ast_assert(sub->serializer != NULL);
797 return sub->serializer;
800 pjsip_evsub *ast_sip_subscription_get_evsub(struct ast_sip_subscription *sub)
805 pjsip_dialog *ast_sip_subscription_get_dlg(struct ast_sip_subscription *sub)
810 int ast_sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
812 /* If this is a persistence recreation the subscription has already been accepted */
813 if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
817 return pjsip_evsub_accept(ast_sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
820 int ast_sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
822 struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
826 res = pjsip_evsub_send_request(ast_sip_subscription_get_evsub(sub),
827 tdata) == PJ_SUCCESS ? 0 : -1;
829 subscription_persistence_update(sub, NULL);
831 ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
834 pjsip_evsub_get_state_name(ast_sip_subscription_get_evsub(sub)),
835 ast_sorcery_object_get_id(endpoint));
837 ao2_cleanup(endpoint);
842 static void subscription_datastore_destroy(void *obj)
844 struct ast_datastore *datastore = obj;
846 /* Using the destroy function (if present) destroy the data */
847 if (datastore->info->destroy != NULL && datastore->data != NULL) {
848 datastore->info->destroy(datastore->data);
849 datastore->data = NULL;
852 ast_free((void *) datastore->uid);
853 datastore->uid = NULL;
856 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
858 RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
859 const char *uid_ptr = uid;
865 datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
870 datastore->info = info;
871 if (ast_strlen_zero(uid)) {
872 /* They didn't provide an ID so we'll provide one ourself */
873 struct ast_uuid *uuid = ast_uuid_generate();
874 char uuid_buf[AST_UUID_STR_LEN];
878 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
882 datastore->uid = ast_strdup(uid_ptr);
883 if (!datastore->uid) {
887 ao2_ref(datastore, +1);
891 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
893 ast_assert(datastore != NULL);
894 ast_assert(datastore->info != NULL);
895 ast_assert(!ast_strlen_zero(datastore->uid));
897 if (!ao2_link(subscription->datastores, datastore)) {
903 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
905 return ao2_find(subscription->datastores, name, OBJ_KEY);
908 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
910 ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
913 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
915 ast_assert(datastore != NULL);
916 ast_assert(datastore->info != NULL);
917 ast_assert(!ast_strlen_zero(datastore->uid));
919 if (!ao2_link(publication->datastores, datastore)) {
925 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
927 return ao2_find(publication->datastores, name, OBJ_KEY);
930 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
932 ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
935 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
937 static int publication_hash_fn(const void *obj, const int flags)
939 const struct ast_sip_publication *publication = obj;
940 const int *entity_tag = obj;
942 return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
945 static int publication_cmp_fn(void *obj, void *arg, int flags)
947 const struct ast_sip_publication *publication1 = obj;
948 const struct ast_sip_publication *publication2 = arg;
949 const int *entity_tag = arg;
951 return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
952 CMP_MATCH | CMP_STOP : 0);
955 static void publish_add_handler(struct ast_sip_publish_handler *handler)
957 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
958 AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
961 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
963 if (ast_strlen_zero(handler->event_name)) {
964 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
968 if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
969 publication_hash_fn, publication_cmp_fn))) {
970 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
971 handler->event_name);
975 publish_add_handler(handler);
977 ast_module_ref(ast_module_info->self);
982 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
984 struct ast_sip_publish_handler *iter;
985 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
986 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
987 if (handler == iter) {
988 AST_RWLIST_REMOVE_CURRENT(next);
989 ao2_cleanup(handler->publications);
990 ast_module_unref(ast_module_info->self);
994 AST_RWLIST_TRAVERSE_SAFE_END;
997 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
999 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
1001 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1002 AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
1003 ast_module_ref(ast_module_info->self);
1006 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
1008 struct ast_sip_subscription_handler *iter;
1009 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1011 AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
1012 if (!strcmp(iter->event_name, event_name)) {
1019 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
1022 pj_str_t accept[AST_SIP_MAX_ACCEPT];
1023 struct ast_sip_subscription_handler *existing;
1026 if (ast_strlen_zero(handler->event_name)) {
1027 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
1031 existing = find_sub_handler_for_event_name(handler->event_name);
1033 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
1034 "A handler is already registered\n", handler->event_name);
1038 for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
1039 pj_cstr(&accept[i], handler->accept[i]);
1042 pj_cstr(&event, handler->event_name);
1044 pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
1046 sub_add_handler(handler);
1051 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
1053 struct ast_sip_subscription_handler *iter;
1054 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1055 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
1056 if (handler == iter) {
1057 AST_RWLIST_REMOVE_CURRENT(next);
1058 ast_module_unref(ast_module_info->self);
1062 AST_RWLIST_TRAVERSE_SAFE_END;
1065 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
1066 const char *content_subtype)
1068 struct ast_sip_pubsub_body_generator *iter;
1069 SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1071 AST_LIST_TRAVERSE(&body_generators, iter, list) {
1072 if (!strcmp(iter->type, content_type) &&
1073 !strcmp(iter->subtype, content_subtype)) {
1081 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
1083 char *accept_copy = ast_strdupa(accept);
1084 char *subtype = accept_copy;
1085 char *type = strsep(&subtype, "/");
1087 if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
1091 return find_body_generator_type_subtype(type, subtype);
1094 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
1098 struct ast_sip_pubsub_body_generator *generator = NULL;
1100 for (i = 0; i < num_accept; ++i) {
1101 generator = find_body_generator_accept(accept[i]);
1103 ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
1106 ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
1113 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
1115 pjsip_expires_hdr *expires_header;
1116 struct ast_sip_subscription_handler *handler;
1117 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1118 struct ast_sip_subscription *sub;
1119 struct ast_sip_pubsub_body_generator *generator;
1121 endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1122 ast_assert(endpoint != NULL);
1124 if (!endpoint->subscription.allow) {
1125 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
1126 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
1130 expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
1132 if (expires_header) {
1133 if (expires_header->ivalue == 0) {
1134 ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
1135 ast_sorcery_object_get_id(endpoint));
1136 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1139 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
1140 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
1141 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
1142 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
1147 handler = subscription_get_handler_from_rdata(rdata);
1149 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1153 generator = subscription_get_generator_from_rdata(rdata, handler);
1155 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1159 ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data,
1160 pubsub_module.id, MOD_DATA_BODY_GENERATOR, generator);
1162 sub = handler->new_subscribe(endpoint, rdata);
1164 pjsip_transaction *trans = pjsip_rdata_get_tsx(rdata);
1167 pjsip_dialog *dlg = pjsip_rdata_get_dlg(rdata);
1168 pjsip_tx_data *tdata;
1170 if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, &tdata) != PJ_SUCCESS) {
1173 pjsip_dlg_send_response(dlg, trans, tdata);
1175 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1178 sub->persistence = subscription_persistence_create(sub);
1179 subscription_persistence_update(sub, rdata);
1185 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
1187 struct ast_sip_publish_handler *iter = NULL;
1188 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1190 AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
1191 if (strcmp(event, iter->event_name)) {
1192 ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
1195 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
1202 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
1203 pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
1205 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1208 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
1210 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
1212 if (sscanf(etag, "%30d", entity_id) != 1) {
1213 return SIP_PUBLISH_UNKNOWN;
1217 *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1220 return SIP_PUBLISH_REMOVE;
1221 } else if (!etag_hdr && rdata->msg_info.msg->body) {
1222 return SIP_PUBLISH_INITIAL;
1223 } else if (etag_hdr && !rdata->msg_info.msg->body) {
1224 return SIP_PUBLISH_REFRESH;
1225 } else if (etag_hdr && rdata->msg_info.msg->body) {
1226 return SIP_PUBLISH_MODIFY;
1229 return SIP_PUBLISH_UNKNOWN;
1232 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1233 struct ast_sip_publish_handler *handler)
1235 struct ast_sip_publication *publication = handler->new_publication(endpoint, rdata);
1238 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
1242 publication->handler = handler;
1247 static int publish_expire_callback(void *data)
1249 RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
1251 publication->handler->publish_expire(publication);
1256 static int publish_expire(const void *data)
1258 struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
1260 ao2_unlink(publication->handler->publications, publication);
1261 publication->sched_id = -1;
1263 if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
1264 ao2_cleanup(publication);
1270 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
1272 pjsip_event_hdr *event_header;
1273 struct ast_sip_publish_handler *handler;
1274 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1276 static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
1277 pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
1278 enum sip_publish_type publish_type;
1279 RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
1280 int expires = 0, entity_id;
1282 endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1283 ast_assert(endpoint != NULL);
1285 event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
1286 if (!event_header) {
1287 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
1288 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1291 ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
1293 handler = find_pub_handler(event);
1295 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
1296 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1300 publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
1302 /* If this is not an initial publish ensure that a publication is present */
1303 if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
1304 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
1305 static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
1307 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
1312 /* Per the RFC every response has to have a new entity tag */
1313 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1315 /* Update the expires here so that the created responses will contain the correct value */
1316 publication->expires = expires;
1319 switch (publish_type) {
1320 case SIP_PUBLISH_INITIAL:
1321 publication = publish_request_initial(endpoint, rdata, handler);
1323 case SIP_PUBLISH_REFRESH:
1324 case SIP_PUBLISH_MODIFY:
1325 if (handler->publish_refresh(publication, rdata)) {
1326 /* If an error occurs we want to terminate the publication */
1330 case SIP_PUBLISH_REMOVE:
1331 handler->publish_termination(publication, rdata);
1333 case SIP_PUBLISH_UNKNOWN:
1335 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1341 ao2_link(handler->publications, publication);
1343 AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1344 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1346 AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1353 /*! \brief Internal destructor for publications */
1354 static void publication_destroy_fn(void *obj)
1356 struct ast_sip_publication *publication = obj;
1358 ast_debug(3, "Destroying SIP publication\n");
1360 ao2_cleanup(publication->datastores);
1361 ao2_cleanup(publication->endpoint);
1364 struct ast_sip_publication *ast_sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata)
1366 struct ast_sip_publication *publication;
1367 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1369 ast_assert(endpoint != NULL);
1371 if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) {
1375 if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1376 ao2_ref(publication, -1);
1380 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1381 ao2_ref(endpoint, +1);
1382 publication->endpoint = endpoint;
1383 publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1384 publication->sched_id = -1;
1389 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1391 return pub->endpoint;
1394 int ast_sip_publication_create_response(struct ast_sip_publication *pub, int status_code, pjsip_rx_data *rdata,
1395 pjsip_tx_data **tdata)
1397 if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, tdata) != PJ_SUCCESS) {
1401 if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1402 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1403 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1405 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1406 (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1407 pjsip_tx_data_dec_ref(*tdata);
1411 ast_sip_add_header(*tdata, "SIP-ETag", entity_tag);
1412 ast_sip_add_header(*tdata, "Expires", expires);
1418 pj_status_t ast_sip_publication_send_response(struct ast_sip_publication *pub, pjsip_rx_data *rdata,
1419 pjsip_tx_data *tdata)
1422 pjsip_transaction *tsx;
1424 if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1428 pjsip_tsx_recv_msg(tsx, rdata);
1430 return pjsip_tsx_send_msg(tsx, tdata);
1433 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1435 struct ast_sip_pubsub_body_generator *existing;
1437 pj_size_t accept_len;
1439 existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1441 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1442 "One is already registered.\n", generator->type, generator->subtype);
1446 AST_RWLIST_WRLOCK(&body_generators);
1447 AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1448 AST_RWLIST_UNLOCK(&body_generators);
1450 /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1451 * null-terminated, so there is no need to allocate for the extra null
1454 accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1456 accept.ptr = alloca(accept_len);
1457 accept.slen = accept_len;
1458 /* Safe use of sprintf */
1459 sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1460 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1461 PJSIP_H_ACCEPT, NULL, 1, &accept);
1466 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1468 struct ast_sip_pubsub_body_generator *iter;
1469 SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1471 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1472 if (iter == generator) {
1473 AST_LIST_REMOVE_CURRENT(list);
1477 AST_RWLIST_TRAVERSE_SAFE_END;
1480 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1482 AST_RWLIST_WRLOCK(&body_supplements);
1483 AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1484 AST_RWLIST_UNLOCK(&body_supplements);
1489 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1491 struct ast_sip_pubsub_body_supplement *iter;
1492 SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1494 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1495 if (iter == supplement) {
1496 AST_LIST_REMOVE_CURRENT(list);
1500 AST_RWLIST_TRAVERSE_SAFE_END;
1503 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1505 return sub->body_generator->type;
1508 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1510 return sub->body_generator->subtype;
1513 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1514 void *data, struct ast_str **str)
1516 struct ast_sip_pubsub_body_supplement *supplement;
1517 struct ast_sip_pubsub_body_generator *generator;
1521 generator = find_body_generator_type_subtype(type, subtype);
1523 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1528 body = generator->allocate_body(data);
1530 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1535 if (generator->generate_body_content(body, data)) {
1540 AST_RWLIST_RDLOCK(&body_supplements);
1541 AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1542 if (!strcmp(generator->type, supplement->type) &&
1543 !strcmp(generator->subtype, supplement->subtype)) {
1544 res = supplement->supplement_body(body, data);
1550 AST_RWLIST_UNLOCK(&body_supplements);
1553 generator->to_string(body, str);
1557 if (generator->destroy_body) {
1558 generator->destroy_body(body);
1564 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1566 if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1567 return pubsub_on_rx_subscribe_request(rdata);
1568 } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1569 return pubsub_on_rx_publish_request(rdata);
1575 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1577 struct ast_sip_subscription *sub;
1578 if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1582 sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1587 if (sub->handler->subscription_shutdown) {
1588 sub->handler->subscription_shutdown(sub);
1590 pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1593 static void pubsub_on_tsx_state(pjsip_evsub *evsub, pjsip_transaction *tsx, pjsip_event *event)
1595 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1601 if (sub->handler->notify_response && tsx->role == PJSIP_ROLE_UAC &&
1602 event->body.tsx_state.type == PJSIP_EVENT_RX_MSG) {
1603 sub->handler->notify_response(sub, event->body.tsx_state.src.rdata);
1607 static void set_parameters_from_response_data(pj_pool_t *pool, int *p_st_code,
1608 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body,
1609 struct ast_sip_subscription_response_data *response_data)
1611 ast_assert(response_data->status_code >= 200 && response_data->status_code <= 699);
1612 *p_st_code = response_data->status_code;
1614 if (!ast_strlen_zero(response_data->status_text)) {
1615 pj_strdup2(pool, *p_st_text, response_data->status_text);
1618 if (response_data->headers) {
1619 struct ast_variable *iter;
1620 for (iter = response_data->headers; iter; iter = iter->next) {
1621 pj_str_t header_name;
1622 pj_str_t header_value;
1623 pjsip_generic_string_hdr *hdr;
1625 pj_cstr(&header_name, iter->name);
1626 pj_cstr(&header_value, iter->value);
1627 hdr = pjsip_generic_string_hdr_create(pool, &header_name, &header_value);
1628 pj_list_insert_before(res_hdr, hdr);
1632 if (response_data->body) {
1637 pj_cstr(&type, response_data->body->type);
1638 pj_cstr(&subtype, response_data->body->subtype);
1639 pj_cstr(&body_text, response_data->body->body_text);
1641 *p_body = pjsip_msg_body_create(pool, &type, &subtype, &body_text);
1645 static int response_data_changed(struct ast_sip_subscription_response_data *response_data)
1647 if (response_data->status_code != 200 ||
1648 !ast_strlen_zero(response_data->status_text) ||
1649 response_data->headers ||
1650 response_data->body) {
1656 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1657 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1659 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1660 struct ast_sip_subscription_response_data response_data = {
1668 if (pjsip_evsub_get_state(sub->evsub) == PJSIP_EVSUB_STATE_TERMINATED) {
1669 sub->handler->subscription_terminated(sub, rdata);
1673 sub->handler->resubscribe(sub, rdata, &response_data);
1675 if (!response_data_changed(&response_data)) {
1679 set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1680 res_hdr, p_body, &response_data);
1683 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1684 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1686 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1687 struct ast_sip_subscription_response_data response_data = {
1691 if (!sub || !sub->handler->notify_request) {
1695 sub->handler->notify_request(sub, rdata, &response_data);
1697 if (!response_data_changed(&response_data)) {
1701 set_parameters_from_response_data(rdata->tp_info.pool, p_st_code, p_st_text,
1702 res_hdr, p_body, &response_data);
1705 static int serialized_pubsub_on_client_refresh(void *userdata)
1707 struct ast_sip_subscription *sub = userdata;
1709 sub->handler->refresh_subscription(sub);
1714 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1716 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1719 ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1722 static int serialized_pubsub_on_server_timeout(void *userdata)
1724 struct ast_sip_subscription *sub = userdata;
1726 sub->handler->subscription_timeout(sub);
1731 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
1733 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1736 /* if a subscription has been terminated and the subscription
1737 timeout/expires is less than the time it takes for all pending
1738 transactions to end then the subscription timer will not have
1739 been canceled yet and sub will be null, so do nothing since
1740 the subscription has already been terminated. */
1745 ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
1748 static int ami_subscription_detail(struct ast_sip_subscription *sub,
1749 struct ast_sip_ami *ami,
1752 RAII_VAR(struct ast_str *, buf,
1753 ast_sip_create_ami_event(event, ami), ast_free);
1759 sip_subscription_to_ami(sub, &buf);
1760 astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
1764 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
1766 return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
1767 sub, arg, "InboundSubscriptionDetail") : 0;
1770 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
1772 return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
1773 sub, arg, "OutboundSubscriptionDetail") : 0;
1776 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
1778 struct ast_sip_ami ami = { .s = s, .m = m };
1781 astman_send_listack(s, m, "Following are Events for "
1782 "each inbound Subscription", "start");
1784 num = for_each_subscription(ami_subscription_detail_inbound, &ami);
1787 "Event: InboundSubscriptionDetailComplete\r\n"
1788 "EventList: Complete\r\n"
1789 "ListItems: %d\r\n\r\n", num);
1793 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
1795 struct ast_sip_ami ami = { .s = s, .m = m };
1798 astman_send_listack(s, m, "Following are Events for "
1799 "each outbound Subscription", "start");
1801 num = for_each_subscription(ami_subscription_detail_outbound, &ami);
1804 "Event: OutboundSubscriptionDetailComplete\r\n"
1805 "EventList: Complete\r\n"
1806 "ListItems: %d\r\n\r\n", num);
1810 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
1811 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
1813 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1815 struct subscription_persistence *persistence = obj;
1817 persistence->endpoint = ast_strdup(var->value);
1821 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
1823 const struct subscription_persistence *persistence = obj;
1825 *buf = ast_strdup(persistence->endpoint);
1829 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1831 struct subscription_persistence *persistence = obj;
1833 persistence->tag = ast_strdup(var->value);
1837 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
1839 const struct subscription_persistence *persistence = obj;
1841 *buf = ast_strdup(persistence->tag);
1845 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
1847 struct subscription_persistence *persistence = obj;
1848 return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
1851 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
1853 const struct subscription_persistence *persistence = obj;
1854 return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
1857 static int load_module(void)
1859 static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
1860 struct ast_sorcery *sorcery = ast_sip_get_sorcery();
1862 pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
1864 if (!(sched = ast_sched_context_create())) {
1865 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
1866 return AST_MODULE_LOAD_FAILURE;
1869 if (ast_sched_start_thread(sched)) {
1870 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
1871 ast_sched_context_destroy(sched);
1872 return AST_MODULE_LOAD_FAILURE;
1875 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
1877 if (ast_sip_register_service(&pubsub_module)) {
1878 ast_log(LOG_ERROR, "Could not register pubsub service\n");
1879 ast_sched_context_destroy(sched);
1880 return AST_MODULE_LOAD_FAILURE;
1883 ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
1884 ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
1885 if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
1887 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
1888 ast_sip_unregister_service(&pubsub_module);
1889 ast_sched_context_destroy(sched);
1890 return AST_MODULE_LOAD_FAILURE;
1892 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
1893 CHARFLDSET(struct subscription_persistence, packet));
1894 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
1895 CHARFLDSET(struct subscription_persistence, src_name));
1896 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
1897 FLDSET(struct subscription_persistence, src_port));
1898 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
1899 CHARFLDSET(struct subscription_persistence, transport_key));
1900 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
1901 CHARFLDSET(struct subscription_persistence, local_name));
1902 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
1903 FLDSET(struct subscription_persistence, local_port));
1904 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
1905 FLDSET(struct subscription_persistence, cseq));
1906 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
1907 persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
1908 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
1909 persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
1910 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
1911 persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
1913 if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
1914 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
1916 stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
1919 ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
1920 ami_show_subscriptions_inbound);
1921 ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
1922 ami_show_subscriptions_outbound);
1924 return AST_MODULE_LOAD_SUCCESS;
1927 static int unload_module(void)
1929 ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
1930 ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
1933 ast_sched_context_destroy(sched);
1939 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
1940 .load = load_module,
1941 .unload = unload_module,
1942 .load_pri = AST_MODPRI_CHANNEL_DEPEND,