X-Git-Url: http://git.asterisk.org/gitweb/?p=asterisk%2Fasterisk.git;a=blobdiff_plain;f=res%2Fres_pjsip_pubsub.c;h=a1f3f246257e4cde0a08d6e40031fdfcb9d9077e;hp=9032bd3f62e90529bf05400ed73e0d3a738fda3d;hb=48d047ad5a6cbd24368a003027c5a9c5cc43ef26;hpb=15dcaeef82ce4fc013668392838f8ff9e0ec8075 diff --git a/res/res_pjsip_pubsub.c b/res/res_pjsip_pubsub.c index 9032bd3..a1f3f24 100644 --- a/res/res_pjsip_pubsub.c +++ b/res/res_pjsip_pubsub.c @@ -42,8 +42,10 @@ #include "asterisk/res_pjsip.h" #include "asterisk/callerid.h" #include "asterisk/manager.h" +#include "asterisk/cli.h" #include "asterisk/test.h" #include "res_pjsip/include/res_pjsip_private.h" +#include "asterisk/res_pjsip_presence_xml.h" /*** DOCUMENTATION @@ -72,6 +74,20 @@ + + + Displays settings for configured resource lists. + + + + + Provides a listing of all resource lists. An event ResourceListDetail + is issued for each resource list object. Once all detail events are completed a + ResourceListDetailComplete event is issued. + + + + Module that implements publish and subscribe support. @@ -107,6 +123,91 @@ The time at which the subscription expires + + The Contact URI of the dialog for the subscription + + + + Resource list configuration parameters. + + This configuration object allows for RFC 4662 resource list subscriptions + to be specified. This can be useful to decrease the amount of subscription traffic + that a server has to process. + + Current limitations limit the size of SIP NOTIFY requests that Asterisk sends + to 64000 bytes. If your resource list notifications are larger than this maximum, you + will need to make adjustments. + + + + Must be of type 'resource_list' + + + The SIP event package that the list resource belong to. + + The SIP event package describes the types of resources that Asterisk reports + the state of. + + + + Device state and presence reporting. + + + This is identical to presence. + + + Message-waiting indication (MWI) reporting. + + + + + + The name of a resource to report state on + + In general Asterisk looks up list items in the following way: + 1. Check if the list item refers to another configured resource list. + 2. Pass the name of the resource off to event-package-specific handlers + to find the specified resource. + The second part means that the way the list item is specified depends + on what type of list this is. For instance, if you have the event + set to presence, then list items should be in the form of + dialplan_extension@dialplan_context. For message-summary mailbox + names should be listed. + + + + Indicates if the entire list's state should be sent out. + + If this option is enabled, and a resource changes state, then Asterisk will construct + a notification that contains the state of all resources in the list. If the option is + disabled, Asterisk will construct a notification that only contains the states of + resources that have changed. + + Even with this option disabled, there are certain situations where Asterisk is forced + to send a notification with the states of all resources in the list. When a subscriber + renews or terminates its subscription to the list, Asterisk MUST send a full state + notification. + + + + + Time Asterisk should wait, in milliseconds, before sending notifications. + + When a resource's state changes, it may be desired to wait a certain amount before Asterisk + sends a notification to subscribers. This allows for other state changes to accumulate, so that + Asterisk can communicate multiple state changes in a single notification instead of rapidly sending + many notifications. + + + + + The configuration for inbound publications + + Optional name of an endpoint that is only allowed to publish to this resource + + + Must be of type 'inbound-publication'. + @@ -134,6 +235,12 @@ static struct ast_sched_context *sched; /*! \brief Default expiration time for PUBLISH if one is not specified */ #define DEFAULT_PUBLISH_EXPIRES 3600 +/*! \brief Number of buckets for subscription datastore */ +#define DATASTORE_BUCKETS 53 + +/*! \brief Default expiration for subscriptions */ +#define DEFAULT_EXPIRES 3600 + /*! \brief Defined method for PUBLISH */ const pjsip_method pjsip_publish_method = { @@ -197,6 +304,26 @@ enum sip_publish_type { }; /*! + * \brief A vector of strings commonly used throughout this module + */ +AST_VECTOR(resources, const char *); + +/*! + * \brief Resource list configuration item + */ +struct resource_list { + SORCERY_OBJECT(details); + /*! SIP event package the list uses. */ + char event[32]; + /*! Strings representing resources in the list. */ + struct resources items; + /*! Indicates if Asterisk sends full or partial state on notifications. */ + unsigned int full_state; + /*! Time, in milliseconds Asterisk waits before sending a batched notification.*/ + unsigned int notification_batch_interval; +}; + +/*! * Used to create new entity IDs by ESCs. */ static int esc_etag_counter; @@ -217,6 +344,12 @@ struct ast_sip_publication { int expires; /*! \brief Scheduled item for expiration of publication */ int sched_id; + /*! \brief The resource the publication is to */ + char *resource; + /*! \brief The name of the event type configuration */ + char *event_configuration_name; + /*! \brief Data containing the above */ + char data[0]; }; @@ -226,7 +359,7 @@ struct ast_sip_publication { struct subscription_persistence { /*! Sorcery object details */ SORCERY_OBJECT(details); - /*! The name of the endpoint involved in the subscrption */ + /*! The name of the endpoint involved in the subscription */ char *endpoint; /*! SIP message that creates the subscription */ char packet[PJSIP_MAX_PKT_LEN]; @@ -246,108 +379,171 @@ struct subscription_persistence { char *tag; /*! When this subscription expires */ struct timeval expires; + /*! Contact URI */ + char contact_uri[PJSIP_MAX_URL_SIZE]; }; /*! - * \brief Real subscription details - * - * A real subscription is one that has a direct link to a - * PJSIP subscription and dialog. + * \brief The state of the subscription tree */ -struct ast_sip_real_subscription { - /*! The underlying PJSIP event subscription structure */ - pjsip_evsub *evsub; - /*! The underlying PJSIP dialog */ - pjsip_dialog *dlg; +enum sip_subscription_tree_state { + /*! Normal operation */ + SIP_SUB_TREE_NORMAL = 0, + /*! A terminate has been requested by Asterisk, the client, or pjproject */ + SIP_SUB_TREE_TERMINATE_PENDING, + /*! The terminate is in progress */ + SIP_SUB_TREE_TERMINATE_IN_PROGRESS, + /*! The terminate process has finished and the subscription tree is no longer valid */ + SIP_SUB_TREE_TERMINATED, }; -/*! - * \brief Virtual subscription details - * - * A virtual subscription is one that does not have a direct - * link to a PJSIP subscription. Instead, it is a descendent - * of an ast_sip_subscription. Following the ancestry will - * eventually lead to a real subscription. - */ -struct ast_sip_virtual_subscription { - struct ast_sip_subscription *parent; +static char *sub_tree_state_description[] = { + "Normal", + "TerminatePending", + "TerminateInProgress", + "Terminated" }; /*! - * \brief Discriminator between real and virtual subscriptions + * \brief A tree of SIP subscriptions + * + * Because of the ability to subscribe to resource lists, a SIP + * subscription can result in a tree of subscriptions being created. + * This structure represents the information relevant to the subscription + * as a whole, to include the underlying PJSIP structure for the + * subscription. */ -enum sip_subscription_type { - /*! - * \brief a "real" subscription. - * - * Real subscriptions are at the root of a tree of subscriptions. - * A real subscription has a corresponding SIP subscription in the - * PJSIP stack. - */ - SIP_SUBSCRIPTION_REAL, - /*! - * \brief a "virtual" subscription. - * - * Virtual subscriptions are the descendents of real subscriptions - * in a tree of subscriptions. Virtual subscriptions do not have - * a corresponding SIP subscription in the PJSIP stack. Instead, - * when a state change happens on a virtual subscription, the - * state change is indicated to the virtual subscription's parent. +struct sip_subscription_tree { + /*! The endpoint with which the subscription is communicating */ + struct ast_sip_endpoint *endpoint; + /*! Serializer on which to place operations for this subscription */ + struct ast_taskprocessor *serializer; + /*! The role for this subscription */ + enum ast_sip_subscription_role role; + /*! Persistence information */ + struct subscription_persistence *persistence; + /*! The underlying PJSIP event subscription structure */ + pjsip_evsub *evsub; + /*! The underlying PJSIP dialog */ + pjsip_dialog *dlg; + /*! Interval to use for batching notifications */ + unsigned int notification_batch_interval; + /*! Scheduler ID for batched notification */ + int notify_sched_id; + /*! Indicator if scheduled batched notification should be sent */ + unsigned int send_scheduled_notify; + /*! The root of the subscription tree */ + struct ast_sip_subscription *root; + /*! Is this subscription to a list? */ + int is_list; + /*! Next item in the list */ + AST_LIST_ENTRY(sip_subscription_tree) next; + /*! Subscription tree state */ + enum sip_subscription_tree_state state; + /*! On asterisk restart, this is the task data used + * to restart the expiration timer if pjproject isn't + * capable of restarting the timer. */ - SIP_SUBSCRIPTION_VIRTUAL, + struct ast_sip_sched_task *expiration_task; }; /*! - * \brief Structure representing a SIP subscription + * \brief Structure representing a "virtual" SIP subscription. + * + * This structure serves a dual purpose. Structurally, it is + * the constructed tree of subscriptions based on the resources + * being subscribed to. API-wise, this serves as the handle that + * subscription handlers use in order to interact with the pubsub API. */ struct ast_sip_subscription { /*! Subscription datastores set up by handlers */ struct ao2_container *datastores; - /*! The endpoint with which the subscription is communicating */ - struct ast_sip_endpoint *endpoint; - /*! Serializer on which to place operations for this subscription */ - struct ast_taskprocessor *serializer; /*! The handler for this subscription */ const struct ast_sip_subscription_handler *handler; - /*! The role for this subscription */ - enum ast_sip_subscription_role role; - /*! Indicator of real or virtual subscription */ - enum sip_subscription_type type; - /*! Real and virtual components of the subscription */ - union { - struct ast_sip_real_subscription real; - struct ast_sip_virtual_subscription virtual; - } reality; + /*! Pointer to the base of the tree */ + struct sip_subscription_tree *tree; /*! Body generaator for NOTIFYs */ struct ast_sip_pubsub_body_generator *body_generator; - /*! Persistence information */ - struct subscription_persistence *persistence; - /*! Next item in the list */ - AST_LIST_ENTRY(ast_sip_subscription) next; - /*! List of child subscriptions */ - AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children; + /*! Vector of child subscriptions */ + AST_VECTOR(, struct ast_sip_subscription *) children; + /*! Saved NOTIFY body text for this subscription */ + struct ast_str *body_text; + /*! Indicator that the body text has changed since the last notification */ + int body_changed; + /*! The current state of the subscription */ + pjsip_evsub_state subscription_state; + /*! For lists, the current version to place in the RLMI body */ + unsigned int version; + /*! For lists, indicates if full state should always be communicated. */ + unsigned int full_state; + /*! URI associated with the subscription */ + pjsip_sip_uri *uri; /*! Name of resource being subscribed to */ char resource[0]; }; +/*! + * \brief Structure representing a publication resource + */ +struct ast_sip_publication_resource { + /*! \brief Sorcery object details */ + SORCERY_OBJECT(details); + /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */ + char *endpoint; + /*! \brief Mapping for event types to configuration */ + struct ast_variable *events; +}; + static const char *sip_subscription_roles_map[] = { [AST_SIP_SUBSCRIBER] = "Subscriber", [AST_SIP_NOTIFIER] = "Notifier" }; -AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription); +enum sip_persistence_update_type { + /*! Called from send request */ + SUBSCRIPTION_PERSISTENCE_SEND_REQUEST = 0, + /*! Subscription created from initial client request */ + SUBSCRIPTION_PERSISTENCE_CREATED, + /*! Subscription recreated by asterisk on startup */ + SUBSCRIPTION_PERSISTENCE_RECREATED, + /*! Subscription created from client refresh */ + SUBSCRIPTION_PERSISTENCE_REFRESHED, +}; + +AST_RWLIST_HEAD_STATIC(subscriptions, sip_subscription_tree); AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator); AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement); -static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub) +static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event); +static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, + int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); +static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, + pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); +static void pubsub_on_client_refresh(pjsip_evsub *sub); +static void pubsub_on_server_timeout(pjsip_evsub *sub); + +static pjsip_evsub_user pubsub_cb = { + .on_evsub_state = pubsub_on_evsub_state, + .on_rx_refresh = pubsub_on_rx_refresh, + .on_rx_notify = pubsub_on_rx_notify, + .on_client_refresh = pubsub_on_client_refresh, + .on_server_timeout = pubsub_on_server_timeout, +}; + +/*! \brief Destructor for publication resource */ +static void publication_resource_destroy(void *obj) { - return sub->reality.real.evsub; + struct ast_sip_publication_resource *resource = obj; + + ast_free(resource->endpoint); + ast_variables_destroy(resource->events); } -static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub) +/*! \brief Allocator for publication resource */ +static void *publication_resource_alloc(const char *name) { - return sub->reality.real.dlg; + return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy); } /*! \brief Destructor for subscription persistence */ @@ -366,7 +562,7 @@ static void *subscription_persistence_alloc(const char *name) } /*! \brief Function which creates initial persistence information of a subscription in sorcery */ -static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub) +static struct subscription_persistence *subscription_persistence_create(struct sip_subscription_tree *sub_tree) { char tag[PJ_GUID_STRING_LENGTH + 1]; @@ -376,13 +572,13 @@ static struct subscription_persistence *subscription_persistence_create(struct a struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(), "subscription_persistence", NULL); - pjsip_dialog *dlg = sip_subscription_get_dlg(sub); + pjsip_dialog *dlg = sub_tree->dlg; if (!persistence) { return NULL; } - persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint)); + persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub_tree->endpoint)); ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag)); persistence->tag = ast_strdup(tag); @@ -391,53 +587,77 @@ static struct subscription_persistence *subscription_persistence_create(struct a } /*! \brief Function which updates persistence information of a subscription in sorcery */ -static void subscription_persistence_update(struct ast_sip_subscription *sub, - pjsip_rx_data *rdata) +static void subscription_persistence_update(struct sip_subscription_tree *sub_tree, + pjsip_rx_data *rdata, enum sip_persistence_update_type type) { pjsip_dialog *dlg; - if (!sub->persistence) { + if (!sub_tree->persistence) { return; } - dlg = sip_subscription_get_dlg(sub); - sub->persistence->cseq = dlg->local.cseq; + ast_debug(3, "Updating persistence for '%s->%s'\n", sub_tree->persistence->endpoint, + sub_tree->root->resource); + + dlg = sub_tree->dlg; + sub_tree->persistence->cseq = dlg->local.cseq; if (rdata) { int expires; pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + pjsip_contact_hdr *contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL); expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; - sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1)); + sub_tree->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1)); + + pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, contact_hdr->uri, + sub_tree->persistence->contact_uri, sizeof(sub_tree->persistence->contact_uri)); - ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet)); - ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name)); - sub->persistence->src_port = rdata->pkt_info.src_port; - ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name, - sizeof(sub->persistence->transport_key)); - ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host, - sizeof(sub->persistence->local_name)); - sub->persistence->local_port = rdata->tp_info.transport->local_name.port; + /* When receiving a packet on an streaming transport, it's possible to receive more than one SIP + * message at a time into the rdata->pkt_info.packet buffer. However, the rdata->msg_info.msg_buf + * will always point to the proper SIP message that is to be processed. When updating subscription + * persistence that is pulled from persistent storage, though, the rdata->pkt_info.packet will + * only ever have a single SIP message on it, and so we base persistence on that. + */ + if (type == SUBSCRIPTION_PERSISTENCE_CREATED + || type == SUBSCRIPTION_PERSISTENCE_RECREATED) { + if (rdata->msg_info.msg_buf) { + ast_copy_string(sub_tree->persistence->packet, rdata->msg_info.msg_buf, + MIN(sizeof(sub_tree->persistence->packet), rdata->msg_info.len)); + } else { + ast_copy_string(sub_tree->persistence->packet, rdata->pkt_info.packet, + sizeof(sub_tree->persistence->packet)); + } + } + ast_copy_string(sub_tree->persistence->src_name, rdata->pkt_info.src_name, + sizeof(sub_tree->persistence->src_name)); + sub_tree->persistence->src_port = rdata->pkt_info.src_port; + ast_copy_string(sub_tree->persistence->transport_key, rdata->tp_info.transport->type_name, + sizeof(sub_tree->persistence->transport_key)); + ast_copy_pj_str(sub_tree->persistence->local_name, &rdata->tp_info.transport->local_name.host, + sizeof(sub_tree->persistence->local_name)); + sub_tree->persistence->local_port = rdata->tp_info.transport->local_name.port; } - ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence); + ast_sorcery_update(ast_sip_get_sorcery(), sub_tree->persistence); } /*! \brief Function which removes persistence of a subscription from sorcery */ -static void subscription_persistence_remove(struct ast_sip_subscription *sub) +static void subscription_persistence_remove(struct sip_subscription_tree *sub_tree) { - if (!sub->persistence) { + if (!sub_tree->persistence) { return; } - ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence); - ao2_ref(sub->persistence, -1); + ast_sorcery_delete(ast_sip_get_sorcery(), sub_tree->persistence); + ao2_ref(sub_tree->persistence, -1); + sub_tree->persistence = NULL; } static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name); static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64], - size_t num_accept); + size_t num_accept, const char *body_type); /*! \brief Retrieve a handler using the Event header of an rdata message */ static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata) @@ -461,23 +681,62 @@ static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata( return handler; } +/*! + * \brief Accept headers that are exceptions to the rule + * + * Typically, when a SUBSCRIBE arrives, we attempt to find a + * body generator that matches one of the Accept headers in + * the request. When subscribing to a single resource, this works + * great. However, when subscribing to a list, things work + * differently. Most Accept header values are fine, but there + * are a couple that are endemic to resource lists that need + * to be ignored when searching for a body generator to use + * for the individual resources of the subscription. + */ +const char *accept_exceptions[] = { + "multipart/related", + "application/rlmi+xml", +}; + +/*! + * \brief Is the Accept header from the SUBSCRIBE in the list of exceptions? + * + * \retval 1 This Accept header value is an exception to the rule. + * \retval 0 This Accept header is not an exception to the rule. + */ +static int exceptional_accept(const pj_str_t *accept) +{ + int i; + + for (i = 0; i < ARRAY_LEN(accept_exceptions); ++i) { + if (!pj_strcmp2(accept, accept_exceptions[i])) { + return 1; + } + } + + return 0; +} + /*! \brief Retrieve a body generator using the Accept header of an rdata message */ static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata, const struct ast_sip_subscription_handler *handler) { - pjsip_accept_hdr *accept_header; + pjsip_accept_hdr *accept_header = (pjsip_accept_hdr *) &rdata->msg_info.msg->hdr; char accept[AST_SIP_MAX_ACCEPT][64]; - size_t num_accept_headers; + size_t num_accept_headers = 0; - accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next); - if (accept_header) { + while ((accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, accept_header->next))) { int i; for (i = 0; i < accept_header->count; ++i) { - ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i])); + if (!exceptional_accept(&accept_header->values[i])) { + ast_copy_pj_str(accept[num_accept_headers], &accept_header->values[i], sizeof(accept[num_accept_headers])); + ++num_accept_headers; + } } - num_accept_headers = accept_header->count; - } else { + } + + if (num_accept_headers == 0) { /* If a SUBSCRIBE contains no Accept headers, then we must assume that * the default accept type for the event package is to be used. */ @@ -485,247 +744,480 @@ static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rda num_accept_headers = 1; } - return find_body_generator(accept, num_accept_headers); + return find_body_generator(accept, num_accept_headers, handler->body_type); } -static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler, - struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource, - struct ast_sip_pubsub_body_generator *generator); - -/*! \brief Callback function to perform the actual recreation of a subscription */ -static int subscription_persistence_recreate(void *obj, void *arg, int flags) +/*! \brief Check if the rdata has a Supported header containing 'eventlist' + * + * \retval 1 rdata has an eventlist containing supported header + * \retval 0 rdata doesn't have an eventlist containing supported header + */ +static int ast_sip_pubsub_has_eventlist_support(pjsip_rx_data *rdata) { - struct subscription_persistence *persistence = obj; - pj_pool_t *pool = arg; - pjsip_rx_data rdata = { { 0, }, }; - pjsip_expires_hdr *expires_header; - struct ast_sip_subscription_handler *handler; - RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); - struct ast_sip_subscription *sub; - struct ast_sip_pubsub_body_generator *generator; - int resp; - char *resource; - size_t resource_size; - pjsip_sip_uri *request_uri; + pjsip_supported_hdr *supported_header = (pjsip_supported_hdr *) &rdata->msg_info.msg->hdr; - /* If this subscription has already expired remove it */ - if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } + while ((supported_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_SUPPORTED, supported_header->next))) { + int i; - endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint); - if (!endpoint) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n", - persistence->endpoint); - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; + for (i = 0; i < supported_header->count; i++) { + if (!pj_stricmp2(&supported_header->values[i], "eventlist")) { + return 1; + } + } } - pj_pool_reset(pool); - rdata.tp_info.pool = pool; - - if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port, - persistence->transport_key, persistence->local_name, persistence->local_port)) { - ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n", - persistence->endpoint); - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } + return 0; +} - request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri); - resource_size = pj_strlen(&request_uri->user) + 1; - resource = alloca(resource_size); - ast_copy_pj_str(resource, &request_uri->user, resource_size); +struct resource_tree; - /* Update the expiration header with the new expiration */ - expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next); - if (!expires_header) { - expires_header = pjsip_expires_hdr_create(pool, 0); - if (!expires_header) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } - pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header); - } - expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); +/*! + * \brief A node for a resource tree. + */ +struct tree_node { + AST_VECTOR(, struct tree_node *) children; + unsigned int full_state; + char resource[0]; +}; - handler = subscription_get_handler_from_rdata(&rdata); - if (!handler || !handler->notifier) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; - } +/*! + * \brief Helper function for retrieving a resource list for a given event. + * + * This will retrieve a resource list that corresponds to the resource and event provided. + * + * \param resource The name of the resource list to retrieve + * \param event The expected event name on the resource list + */ +static struct resource_list *retrieve_resource_list(const char *resource, const char *event) +{ + struct resource_list *list; - generator = subscription_get_generator_from_rdata(&rdata, handler); - if (!generator) { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); - return 0; + list = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "resource_list", resource); + if (!list) { + return NULL; } - ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data, - pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); - - resp = handler->notifier->new_subscribe(endpoint, resource); - if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { - sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator); - sub->persistence = ao2_bump(persistence); - subscription_persistence_update(sub, &rdata); - } else { - ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + if (strcmp(list->event, event)) { + ast_log(LOG_WARNING, "Found resource list %s, but its event type (%s) does not match SUBSCRIBE's (%s)\n", + resource, list->event, event); + ao2_cleanup(list); + return NULL; } - return 0; + return list; } -/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */ -static int subscription_persistence_load(void *data) +/*! + * \brief Allocate a tree node + * + * In addition to allocating and initializing the tree node, the node is also added + * to the vector of visited resources. See \ref build_resource_tree for more information + * on the visited resources. + * + * \param resource The name of the resource for this tree node. + * \param visited The vector of resources that have been visited. + * \param if allocating a list, indicate whether full state is requested in notifications. + * \retval NULL Allocation failure. + * \retval non-NULL The newly-allocated tree_node + */ +static struct tree_node *tree_node_alloc(const char *resource, struct resources *visited, unsigned int full_state) { - struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), - "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); - pj_pool_t *pool; + struct tree_node *node; - pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN, - PJSIP_POOL_RDATA_INC); - if (!pool) { - ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n"); - return 0; + node = ast_calloc(1, sizeof(*node) + strlen(resource) + 1); + if (!node) { + return NULL; } - ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool); - - pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + strcpy(node->resource, resource); + if (AST_VECTOR_INIT(&node->children, 4)) { + ast_free(node); + return NULL; + } + node->full_state = full_state; - ao2_ref(persisted_subscriptions, -1); - return 0; + if (visited) { + AST_VECTOR_APPEND(visited, resource); + } + return node; } -/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */ -static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) +/*! + * \brief Destructor for a tree node + * + * This function calls recursively in order to destroy + * all nodes lower in the tree from the given node in + * addition to the node itself. + * + * \param node The node to destroy. + */ +static void tree_node_destroy(struct tree_node *node) { - struct ast_json_payload *payload; - const char *type; - - if (stasis_message_type(message) != ast_manager_get_generic_type()) { + int i; + if (!node) { return; } - payload = stasis_message_data(message); - type = ast_json_string_get(ast_json_object_get(payload->json, "type")); - - /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we - * recreate SIP subscriptions. - */ - if (strcmp(type, "FullyBooted")) { - return; + for (i = 0; i < AST_VECTOR_SIZE(&node->children); ++i) { + tree_node_destroy(AST_VECTOR_GET(&node->children, i)); } - - /* This has to be here so the subscription is recreated when the body generator is available */ - ast_sip_push_task(NULL, subscription_persistence_load, NULL); - - /* Once the system is fully booted we don't care anymore */ - stasis_unsubscribe(sub); + AST_VECTOR_FREE(&node->children); + ast_free(node); } -static void add_subscription(struct ast_sip_subscription *obj) +/*! + * \brief Determine if this resource has been visited already + * + * See \ref build_resource_tree for more information + * + * \param resource The resource currently being visited + * \param visited The resources that have previously been visited + */ +static int have_visited(const char *resource, struct resources *visited) { - SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next); - ast_module_ref(ast_module_info->self); -} + int i; -static void remove_subscription(struct ast_sip_subscription *obj) -{ - struct ast_sip_subscription *i; - SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) { - if (i == obj) { - AST_RWLIST_REMOVE_CURRENT(next); - ast_module_unref(ast_module_info->self); - break; + for (i = 0; i < AST_VECTOR_SIZE(visited); ++i) { + if (!strcmp(resource, AST_VECTOR_GET(visited, i))) { + return 1; } } - AST_RWLIST_TRAVERSE_SAFE_END; + + return 0; } -typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg); +/*! + * \brief Build child nodes for a given parent. + * + * This iterates through the items on a resource list and creates tree nodes for each one. The + * tree nodes created are children of the supplied parent node. If an item in the resource + * list is itself a list, then this function is called recursively to provide children for + * the the new node. + * + * If an item in a resource list is not a list, then the supplied subscription handler is + * called into as if a new SUBSCRIBE for the list item were presented. The handler's response + * is used to determine if the node can be added to the tree or not. + * + * If a parent node ends up having no child nodes added under it, then the parent node is + * pruned from the tree. + * + * \param endpoint The endpoint that sent the inbound SUBSCRIBE. + * \param handler The subscription handler for leaf nodes in the tree. + * \param list The configured resource list from which the child node is being built. + * \param parent The parent node for these children. + * \param visited The resources that have already been visited. + */ +static void build_node_children(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler, + struct resource_list *list, struct tree_node *parent, struct resources *visited) +{ + int i; -static int for_each_subscription(on_subscription_t on_subscription, void *arg) + for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) { + struct tree_node *current; + struct resource_list *child_list; + const char *resource = AST_VECTOR_GET(&list->items, i); + + if (have_visited(resource, visited)) { + ast_debug(1, "Already visited resource %s. Avoiding duplicate resource or potential loop.\n", resource); + continue; + } + + child_list = retrieve_resource_list(resource, list->event); + if (!child_list) { + int resp = handler->notifier->new_subscribe(endpoint, resource); + if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + current = tree_node_alloc(resource, visited, 0); + if (!current) { + ast_debug(1, + "Subscription to leaf resource %s was successful, but encountered allocation error afterwards\n", + resource); + continue; + } + ast_debug(2, "Subscription to leaf resource %s resulted in success. Adding to parent %s\n", + resource, parent->resource); + AST_VECTOR_APPEND(&parent->children, current); + } else { + ast_debug(2, "Subscription to leaf resource %s resulted in error response %d\n", + resource, resp); + } + } else { + ast_debug(2, "Resource %s (child of %s) is a list\n", resource, parent->resource); + current = tree_node_alloc(resource, visited, child_list->full_state); + if (!current) { + ast_debug(1, "Cannot build children of resource %s due to allocation failure\n", resource); + continue; + } + build_node_children(endpoint, handler, child_list, current, visited); + if (AST_VECTOR_SIZE(¤t->children) > 0) { + ast_debug(1, "List %s had no successful children.\n", resource); + AST_VECTOR_APPEND(&parent->children, current); + } else { + ast_debug(2, "List %s had successful children. Adding to parent %s\n", + resource, parent->resource); + tree_node_destroy(current); + } + ao2_cleanup(child_list); + } + } +} + +/*! + * \brief A resource tree + * + * When an inbound SUBSCRIBE arrives, the resource being subscribed to may + * be a resource list. If this is the case, the resource list may contain resources + * that are themselves lists. The structure needed to hold the resources is + * a tree. + * + * Upon receipt of the SUBSCRIBE, the tree is built by determining if subscriptions + * to the individual resources in the tree would be successful or not. Any successful + * subscriptions result in a node in the tree being created. Any unsuccessful subscriptions + * result in no node being created. + * + * This tree can be seen as a bare-bones analog of the tree of ast_sip_subscriptions that + * will end up being created to actually carry out the duties of a SIP SUBSCRIBE dialog. + */ +struct resource_tree { + struct tree_node *root; + unsigned int notification_batch_interval; +}; + +/*! + * \brief Destroy a resource tree. + * + * This function makes no assumptions about how the tree itself was + * allocated and does not attempt to free the tree itself. Callers + * of this function are responsible for freeing the tree. + * + * \param tree The tree to destroy. + */ +static void resource_tree_destroy(struct resource_tree *tree) { - int num = 0; - struct ast_sip_subscription *i; - SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + if (tree) { + tree_node_destroy(tree->root); + } +} - if (!on_subscription) { - return num; +/*! + * \brief Build a resource tree + * + * This function builds a resource tree based on the requested resource in a SUBSCRIBE request. + * + * This function also creates a container that has all resources that have been visited during + * creation of the tree, whether those resources resulted in a tree node being created or not. + * Keeping this container of visited resources allows for misconfigurations such as loops in + * the tree or duplicated resources to be detected. + * + * \param endpoint The endpoint that sent the SUBSCRIBE request. + * \param handler The subscription handler for leaf nodes in the tree. + * \param resource The resource requested in the SUBSCRIBE request. + * \param tree The tree that is to be built. + * \param has_eventlist_support + * + * \retval 200-299 Successfully subscribed to at least one resource. + * \retval 300-699 Failure to subscribe to requested resource. + */ +static int build_resource_tree(struct ast_sip_endpoint *endpoint, const struct ast_sip_subscription_handler *handler, + const char *resource, struct resource_tree *tree, int has_eventlist_support) +{ + RAII_VAR(struct resource_list *, list, NULL, ao2_cleanup); + struct resources visited; + + if (!has_eventlist_support || !(list = retrieve_resource_list(resource, handler->event_name))) { + ast_debug(2, "Subscription '%s->%s' is not to a list\n", + ast_sorcery_object_get_id(endpoint), resource); + tree->root = tree_node_alloc(resource, NULL, 0); + if (!tree->root) { + return 500; + } + return handler->notifier->new_subscribe(endpoint, resource); } - AST_RWLIST_TRAVERSE(&subscriptions, i, next) { - if (on_subscription(i, arg)) { + ast_debug(2, "Subscription '%s->%s' is a list\n", + ast_sorcery_object_get_id(endpoint), resource); + if (AST_VECTOR_INIT(&visited, AST_VECTOR_SIZE(&list->items))) { + return 500; + } + + tree->root = tree_node_alloc(resource, &visited, list->full_state); + if (!tree->root) { + AST_VECTOR_FREE(&visited); + return 500; + } + + tree->notification_batch_interval = list->notification_batch_interval; + + build_node_children(endpoint, handler, list, tree->root, &visited); + AST_VECTOR_FREE(&visited); + + if (AST_VECTOR_SIZE(&tree->root->children) > 0) { + return 200; + } else { + return 500; + } +} + +static void add_subscription(struct sip_subscription_tree *obj) +{ + AST_RWLIST_WRLOCK(&subscriptions); + AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next); + AST_RWLIST_UNLOCK(&subscriptions); +} + +static void remove_subscription(struct sip_subscription_tree *obj) +{ + struct sip_subscription_tree *i; + + AST_RWLIST_WRLOCK(&subscriptions); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) { + if (i == obj) { + AST_RWLIST_REMOVE_CURRENT(next); + if (i->root) { + ast_debug(2, "Removing subscription '%s->%s' from list of subscriptions\n", + ast_sorcery_object_get_id(i->endpoint), ast_sip_subscription_get_resource_name(i->root)); + } break; } - ++num; } - return num; + AST_RWLIST_TRAVERSE_SAFE_END; + AST_RWLIST_UNLOCK(&subscriptions); } -static void sip_subscription_to_ami(struct ast_sip_subscription *sub, - struct ast_str **buf) +static void destroy_subscription(struct ast_sip_subscription *sub) { - char str[256]; - struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id; + ast_debug(3, "Destroying SIP subscription from '%s->%s'\n", + ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource); + ast_free(sub->body_text); - ast_str_append(buf, 0, "Role: %s\r\n", - sip_subscription_roles_map[sub->role]); - ast_str_append(buf, 0, "Endpoint: %s\r\n", - ast_sorcery_object_get_id(sub->endpoint)); - - ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str)); - ast_str_append(buf, 0, "Callid: %s\r\n", str); + AST_VECTOR_FREE(&sub->children); + ao2_cleanup(sub->datastores); + ast_free(sub); +} - ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name( - sip_subscription_get_evsub(sub))); +static void destroy_subscriptions(struct ast_sip_subscription *root) +{ + int i; - ast_callerid_merge(str, sizeof(str), - S_COR(id->self.name.valid, id->self.name.str, NULL), - S_COR(id->self.number.valid, id->self.number.str, NULL), - "Unknown"); + if (!root) { + return; + } - ast_str_append(buf, 0, "Callerid: %s\r\n", str); + for (i = 0; i < AST_VECTOR_SIZE(&root->children); ++i) { + struct ast_sip_subscription *child; - if (sub->handler->to_ami) { - sub->handler->to_ami(sub, buf); + child = AST_VECTOR_GET(&root->children, i); + destroy_subscriptions(child); } + + destroy_subscription(root); } -#define DATASTORE_BUCKETS 53 +static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler, + const char *resource, struct sip_subscription_tree *tree) +{ + struct ast_sip_subscription *sub; + pjsip_sip_uri *contact_uri; -#define DEFAULT_EXPIRES 3600 + sub = ast_calloc(1, sizeof(*sub) + strlen(resource) + 1); + if (!sub) { + return NULL; + } + strcpy(sub->resource, resource); /* Safe */ -static int datastore_hash(const void *obj, int flags) -{ - const struct ast_datastore *datastore = obj; - const char *uid = flags & OBJ_KEY ? obj : datastore->uid; + sub->datastores = ast_datastores_alloc(); + if (!sub->datastores) { + destroy_subscription(sub); + return NULL; + } + + sub->body_text = ast_str_create(128); + if (!sub->body_text) { + destroy_subscription(sub); + return NULL; + } - ast_assert(uid != NULL); + sub->uri = pjsip_sip_uri_create(tree->dlg->pool, PJ_FALSE); + contact_uri = pjsip_uri_get_uri(tree->dlg->local.contact->uri); + pjsip_sip_uri_assign(tree->dlg->pool, sub->uri, contact_uri); + pj_strdup2(tree->dlg->pool, &sub->uri->user, resource); - return ast_str_hash(uid); + sub->handler = handler; + sub->subscription_state = PJSIP_EVSUB_STATE_ACTIVE; + sub->tree = ao2_bump(tree); + + return sub; } -static int datastore_cmp(void *obj, void *arg, int flags) +/*! + * \brief Create a tree of virtual subscriptions based on a resource tree node. + * + * \param handler The handler to supply to leaf subscriptions. + * \param resource The requested resource for this subscription. + * \param generator Body generator to use for leaf subscriptions. + * \param tree The root of the subscription tree. + * \param current The tree node that corresponds to the subscription being created. + */ +static struct ast_sip_subscription *create_virtual_subscriptions(const struct ast_sip_subscription_handler *handler, + const char *resource, struct ast_sip_pubsub_body_generator *generator, + struct sip_subscription_tree *tree, struct tree_node *current) { - const struct ast_datastore *datastore1 = obj; - const struct ast_datastore *datastore2 = arg; - const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid; + int i; + struct ast_sip_subscription *sub; - ast_assert(datastore1->uid != NULL); - ast_assert(uid2 != NULL); + sub = allocate_subscription(handler, resource, tree); + if (!sub) { + return NULL; + } + + sub->full_state = current->full_state; + sub->body_generator = generator; + AST_VECTOR_INIT(&sub->children, AST_VECTOR_SIZE(¤t->children)); + + for (i = 0; i < AST_VECTOR_SIZE(¤t->children); ++i) { + struct ast_sip_subscription *child; + struct tree_node *child_node = AST_VECTOR_GET(¤t->children, i); + + child = create_virtual_subscriptions(handler, child_node->resource, generator, + tree, child_node); + + if (!child) { + ast_debug(1, "Child subscription to resource %s could not be created\n", + child_node->resource); + continue; + } + + if (AST_VECTOR_APPEND(&sub->children, child)) { + ast_debug(1, "Child subscription to resource %s could not be appended\n", + child_node->resource); + } + } - return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP; + return sub; } -static int subscription_remove_serializer(void *obj) +static void shutdown_subscriptions(struct ast_sip_subscription *sub) +{ + int i; + + if (!sub) { + return; + } + + if (AST_VECTOR_SIZE(&sub->children) > 0) { + for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) { + shutdown_subscriptions(AST_VECTOR_GET(&sub->children, i)); + } + return; + } + + /* We notify subscription shutdown only on the tree leaves. */ + if (sub->handler->subscription_shutdown) { + sub->handler->subscription_shutdown(sub); + } +} +static int subscription_unreference_dialog(void *obj) { - struct ast_sip_subscription *sub = obj; + struct sip_subscription_tree *sub_tree = obj; /* This is why we keep the dialog on the subscription. When the subscription * is destroyed, there is no guarantee that the underlying dialog is ready @@ -736,107 +1228,131 @@ static int subscription_remove_serializer(void *obj) * subscription is destroyed so that we can guarantee that our attempt to * remove the serializer will be successful. */ - ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL); - pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module); + pjsip_dlg_dec_session(sub_tree->dlg, &pubsub_module); + sub_tree->dlg = NULL; return 0; } -static void subscription_destructor(void *obj) +static void subscription_tree_destructor(void *obj) { - struct ast_sip_subscription *sub = obj; - - ast_debug(3, "Destroying SIP subscription\n"); + struct sip_subscription_tree *sub_tree = obj; - subscription_persistence_remove(sub); + ast_debug(3, "Destroying subscription tree %p '%s->%s'\n", + sub_tree, + sub_tree->endpoint ? ast_sorcery_object_get_id(sub_tree->endpoint) : "Unknown", + sub_tree->root ? sub_tree->root->resource : "Unknown"); - remove_subscription(sub); + ao2_cleanup(sub_tree->endpoint); - ao2_cleanup(sub->datastores); - ao2_cleanup(sub->endpoint); + destroy_subscriptions(sub_tree->root); - if (sip_subscription_get_dlg(sub)) { - ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub); + if (sub_tree->dlg) { + ast_sip_push_task_synchronous(sub_tree->serializer, subscription_unreference_dialog, sub_tree); } - ast_taskprocessor_unreference(sub->serializer); -} + ast_taskprocessor_unreference(sub_tree->serializer); + ast_module_unref(ast_module_info->self); +} -static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event); -static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata, - int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); -static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code, - pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body); -static void pubsub_on_client_refresh(pjsip_evsub *sub); -static void pubsub_on_server_timeout(pjsip_evsub *sub); - +void ast_sip_subscription_destroy(struct ast_sip_subscription *sub) +{ + ast_debug(3, "Removing subscription %p '%s->%s' reference to subscription tree %p\n", + sub, ast_sorcery_object_get_id(sub->tree->endpoint), sub->resource, sub->tree); + ao2_cleanup(sub->tree); +} -static pjsip_evsub_user pubsub_cb = { - .on_evsub_state = pubsub_on_evsub_state, - .on_rx_refresh = pubsub_on_rx_refresh, - .on_rx_notify = pubsub_on_rx_notify, - .on_client_refresh = pubsub_on_client_refresh, - .on_server_timeout = pubsub_on_server_timeout, -}; +static void subscription_setup_dialog(struct sip_subscription_tree *sub_tree, pjsip_dialog *dlg) +{ + sub_tree->dlg = dlg; + ast_sip_dialog_set_serializer(dlg, sub_tree->serializer); + ast_sip_dialog_set_endpoint(dlg, sub_tree->endpoint); + pjsip_evsub_set_mod_data(sub_tree->evsub, pubsub_module.id, sub_tree); + pjsip_dlg_inc_session(dlg, &pubsub_module); +} -static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler, - struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role) +static struct sip_subscription_tree *allocate_subscription_tree(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) { - struct ast_sip_subscription *sub; + struct sip_subscription_tree *sub_tree; - sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor); - if (!sub) { + sub_tree = ao2_alloc(sizeof *sub_tree, subscription_tree_destructor); + if (!sub_tree) { return NULL; } - strcpy(sub->resource, resource); /* Safe */ - sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp); - if (!sub->datastores) { - ao2_ref(sub, -1); - return NULL; + ast_module_ref(ast_module_info->self); + + if (rdata) { + /* + * We must continue using the serializer that the original + * SUBSCRIBE came in on for the dialog. There may be + * retransmissions already enqueued in the original + * serializer that can result in reentrancy and message + * sequencing problems. + */ + sub_tree->serializer = ast_sip_get_distributor_serializer(rdata); + } else { + char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1]; + + /* Create name with seq number appended. */ + ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/pubsub/%s", + ast_sorcery_object_get_id(endpoint)); + + sub_tree->serializer = ast_sip_create_serializer(tps_name); } - sub->serializer = ast_sip_create_serializer(); - if (!sub->serializer) { - ao2_ref(sub, -1); + if (!sub_tree->serializer) { + ao2_ref(sub_tree, -1); return NULL; } - sub->role = role; - sub->type = SIP_SUBSCRIPTION_REAL; - sub->endpoint = ao2_bump(endpoint); - sub->handler = handler; - return sub; -} + sub_tree->endpoint = ao2_bump(endpoint); + sub_tree->notify_sched_id = -1; -static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg) -{ - /* We keep a reference to the dialog until our subscription is destroyed. See - * the subscription_destructor for more details - */ - pjsip_dlg_inc_session(dlg, &pubsub_module); - sub->reality.real.dlg = dlg; - ast_sip_dialog_set_serializer(dlg, sub->serializer); - pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub); + return sub_tree; } -static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler, +/*! + * \brief Create a subscription tree based on a resource tree. + * + * Using the previously-determined valid resources in the provided resource tree, + * a corresponding tree of ast_sip_subscriptions are created. The root of the + * subscription tree is a real subscription, and the rest in the tree are + * virtual subscriptions. + * + * \param handler The handler to use for leaf subscriptions + * \param endpoint The endpoint that sent the SUBSCRIBE request + * \param rdata The SUBSCRIBE content + * \param resource The requested resource in the SUBSCRIBE request + * \param generator The body generator to use in leaf subscriptions + * \param tree The resource tree on which the subscription tree is based + * \param dlg_status[out] The result of attempting to create a dialog. + * + * \retval NULL Could not create the subscription tree + * \retval non-NULL The root of the created subscription tree + */ + +static struct sip_subscription_tree *create_subscription_tree(const struct ast_sip_subscription_handler *handler, struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource, - struct ast_sip_pubsub_body_generator *generator) + struct ast_sip_pubsub_body_generator *generator, struct resource_tree *tree, + pj_status_t *dlg_status) { - struct ast_sip_subscription *sub; + struct sip_subscription_tree *sub_tree; pjsip_dialog *dlg; struct subscription_persistence *persistence; - sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER); - if (!sub) { + sub_tree = allocate_subscription_tree(endpoint, rdata); + if (!sub_tree) { + *dlg_status = PJ_ENOMEM; return NULL; } + sub_tree->role = AST_SIP_NOTIFIER; - sub->body_generator = generator; - dlg = ast_sip_create_dialog_uas(endpoint, rdata); + dlg = ast_sip_create_dialog_uas(endpoint, rdata, dlg_status); if (!dlg) { - ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); - ao2_ref(sub, -1); + if (*dlg_status != PJ_EEXISTS) { + ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); + } + ao2_ref(sub_tree, -1); return NULL; } @@ -849,1222 +1365,3873 @@ static struct ast_sip_subscription *notifier_create_subscription(const struct as dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag); pjsip_ua_register_dlg(pjsip_ua_instance(), dlg); dlg->local.cseq = persistence->cseq; - dlg->remote.cseq = persistence->cseq; } - pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub); - subscription_setup_dialog(sub, dlg); + pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub_tree->evsub); + subscription_setup_dialog(sub_tree, dlg); + +#ifdef HAVE_PJSIP_EVSUB_GRP_LOCK + pjsip_evsub_add_ref(sub_tree->evsub); +#endif ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG, pjsip_msg_clone(dlg->pool, rdata->msg_info.msg)); - add_subscription(sub); - return sub; -} + sub_tree->notification_batch_interval = tree->notification_batch_interval; -void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header) -{ - pjsip_dialog *dlg = sip_subscription_get_dlg(sub); - pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG); - pj_str_t name; + sub_tree->root = create_virtual_subscriptions(handler, resource, generator, sub_tree, tree->root); + if (AST_VECTOR_SIZE(&sub_tree->root->children) > 0) { + sub_tree->is_list = 1; + } - pj_cstr(&name, header); + add_subscription(sub_tree); - return pjsip_msg_find_hdr_by_name(msg, &name, NULL); + return sub_tree; } -struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, - struct ast_sip_endpoint *endpoint, const char *resource) -{ - struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor); - pjsip_dialog *dlg; - struct ast_sip_contact *contact; - pj_str_t event; - pjsip_tx_data *tdata; - pjsip_evsub *evsub; +/*! Wrapper structure for initial_notify_task */ +struct initial_notify_data { + struct sip_subscription_tree *sub_tree; + int expires; +}; - sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER); - if (!sub) { - return NULL; - } +static int initial_notify_task(void *obj); +static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state); - contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors); - if (!contact || ast_strlen_zero(contact->uri)) { - ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n", - ast_sorcery_object_get_id(endpoint)); - ao2_ref(sub, -1); - ao2_cleanup(contact); - return NULL; - } +/*! Persistent subscription recreation continuation under distributor serializer data */ +struct persistence_recreate_data { + struct subscription_persistence *persistence; + pjsip_rx_data *rdata; +}; - dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL); - ao2_cleanup(contact); - if (!dlg) { - ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); - ao2_ref(sub, -1); - return NULL; - } +/*! + * \internal + * \brief subscription_persistence_recreate continuation under distributor serializer. + * \since 13.10.0 + * + * \retval 0 on success. + * \retval -1 on error. + */ +static int sub_persistence_recreate(void *obj) +{ + struct persistence_recreate_data *recreate_data = obj; + struct subscription_persistence *persistence = recreate_data->persistence; + pjsip_rx_data *rdata = recreate_data->rdata; + struct ast_sip_endpoint *endpoint; + struct sip_subscription_tree *sub_tree; + struct ast_sip_pubsub_body_generator *generator; + struct ast_sip_subscription_handler *handler; + char *resource; + pjsip_sip_uri *request_uri; + size_t resource_size; + int resp; + struct resource_tree tree; + pjsip_expires_hdr *expires_header; - pj_cstr(&event, handler->event_name); - pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub); - subscription_setup_dialog(sub, dlg); + request_uri = pjsip_uri_get_uri(rdata->msg_info.msg->line.req.uri); + resource_size = pj_strlen(&request_uri->user) + 1; + resource = ast_alloca(resource_size); + ast_copy_pj_str(resource, &request_uri->user, resource_size); - add_subscription(sub); + /* + * We may want to match without any user options getting + * in the way. + */ + AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource); - evsub = sip_subscription_get_evsub(sub); + handler = subscription_get_handler_from_rdata(rdata); + if (!handler || !handler->notifier) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get subscription handler.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } - if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) { - pjsip_evsub_send_request(evsub, tdata); - } else { - /* pjsip_evsub_terminate will result in pubsub_on_evsub_state, - * being called and terminating the subscription. Therefore, we don't - * need to decrease the reference count of sub here. - */ - pjsip_evsub_terminate(evsub, PJ_TRUE); - return NULL; + generator = subscription_get_generator_from_rdata(rdata, handler); + if (!generator) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Body generator not available.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; } - return sub; -} + ast_sip_mod_data_set(rdata->tp_info.pool, rdata->endpt_info.mod_data, + pubsub_module.id, MOD_DATA_PERSISTENCE, persistence); -struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) -{ - ast_assert(sub->endpoint != NULL); - ao2_ref(sub->endpoint, +1); - return sub->endpoint; -} + /* Getting the endpoint may take some time that can affect the expiration. */ + endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", + persistence->endpoint); + if (!endpoint) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The endpoint was not found\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; + } -struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) -{ - ast_assert(sub->serializer != NULL); - return sub->serializer; -} + /* Update the expiration header with the new expiration */ + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, + rdata->msg_info.msg->hdr.next); + if (!expires_header) { + expires_header = pjsip_expires_hdr_create(rdata->tp_info.pool, 0); + if (!expires_header) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not update expires header.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); + return 0; + } + pjsip_msg_add_hdr(rdata->msg_info.msg, (pjsip_hdr *) expires_header); + } -static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata) -{ - struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub); - int res; + expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000); + if (expires_header->ivalue <= 0) { + /* The subscription expired since we started recreating the subscription. */ + ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n", + persistence->endpoint, persistence->tag); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + ao2_ref(endpoint, -1); + return 0; + } - ao2_ref(sub, +1); - res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub), - tdata) == PJ_SUCCESS ? 0 : -1; + memset(&tree, 0, sizeof(tree)); + resp = build_resource_tree(endpoint, handler, resource, &tree, + ast_sip_pubsub_has_eventlist_support(rdata)); + if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + pj_status_t dlg_status; + + sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, + &tree, &dlg_status); + if (!sub_tree) { + if (dlg_status != PJ_EEXISTS) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not create subscription tree.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + } else { + struct initial_notify_data *ind = ast_malloc(sizeof(*ind)); - subscription_persistence_update(sub, NULL); + if (!ind) { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + goto error; + } - ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", - "StateText: %s\r\n" - "Endpoint: %s\r\n", - pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)), - ast_sorcery_object_get_id(endpoint)); - ao2_cleanup(sub); - ao2_cleanup(endpoint); + ind->sub_tree = ao2_bump(sub_tree); + ind->expires = expires_header->ivalue; - return res; + sub_tree->persistence = ao2_bump(persistence); + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_RECREATED); + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) { + /* Could not send initial subscribe NOTIFY */ + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + ao2_ref(sub_tree, -1); + ast_free(ind); + } + } + } else { + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + } + +error: + resource_tree_destroy(&tree); + ao2_ref(endpoint, -1); + + return 0; } -int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data, - int terminate) +/*! \brief Callback function to perform the actual recreation of a subscription */ +static int subscription_persistence_recreate(void *obj, void *arg, int flags) { - struct ast_sip_body body = { - .type = ast_sip_subscription_get_body_type(sub), - .subtype = ast_sip_subscription_get_body_subtype(sub), - }; - struct ast_str *body_text = ast_str_create(64); - pjsip_evsub *evsub = sip_subscription_get_evsub(sub); - pjsip_tx_data *tdata; - pjsip_evsub_state state; - - if (!body_text) { - return -1; - } + struct subscription_persistence *persistence = obj; + pj_pool_t *pool = arg; + struct ast_taskprocessor *serializer; + pjsip_rx_data rdata; + struct persistence_recreate_data recreate_data; - if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) { - ast_free(body_text); - return -1; + /* If this subscription has already expired remove it */ + if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) { + ast_debug(3, "Expired subscription retrived from persistent store '%s' %s\n", + persistence->endpoint, persistence->tag); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; } - body.body_text = ast_str_buffer(body_text); + memset(&rdata, 0, sizeof(rdata)); + pj_pool_reset(pool); + rdata.tp_info.pool = pool; - if (terminate) { - state = PJSIP_EVSUB_STATE_TERMINATED; - } else { - state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ? - PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED; + if (ast_sip_create_rdata_with_contact(&rdata, persistence->packet, persistence->src_name, + persistence->src_port, persistence->transport_key, persistence->local_name, + persistence->local_port, persistence->contact_uri)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: The message could not be parsed\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; } - ast_log_backtrace(); - - if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) { - ast_free(body_text); - return -1; + if (rdata.msg_info.msg->type != PJSIP_REQUEST_MSG) { + ast_log(LOG_NOTICE, "Failed recreating '%s' subscription: Stored a SIP response instead of a request.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; } - if (ast_sip_add_body(tdata, &body)) { - ast_free(body_text); - pjsip_tx_data_dec_ref(tdata); - return -1; + + /* Continue the remainder in the distributor serializer */ + serializer = ast_sip_get_distributor_serializer(&rdata); + if (!serializer) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not get distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); + return 0; } - if (sip_subscription_send_request(sub, tdata)) { - ast_free(body_text); - pjsip_tx_data_dec_ref(tdata); - return -1; + recreate_data.persistence = persistence; + recreate_data.rdata = &rdata; + if (ast_sip_push_task_synchronous(serializer, sub_persistence_recreate, &recreate_data)) { + ast_log(LOG_WARNING, "Failed recreating '%s' subscription: Could not continue under distributor serializer.\n", + persistence->endpoint); + ast_sorcery_delete(ast_sip_get_sorcery(), persistence); } + ast_taskprocessor_unreference(serializer); return 0; } -void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size) +/*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */ +static int subscription_persistence_load(void *data) { - pjsip_dialog *dlg = sip_subscription_get_dlg(sub); - ast_copy_pj_str(buf, &dlg->local.info_str, size); + struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), + "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + pj_pool_t *pool; + + pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN, + PJSIP_POOL_RDATA_INC); + if (!pool) { + ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n"); + return 0; + } + + ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool); + + pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool); + + ao2_ref(persisted_subscriptions, -1); + return 0; } -void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size) +/*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */ +static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message) { - pjsip_dialog *dlg = sip_subscription_get_dlg(sub); - ast_copy_pj_str(buf, &dlg->remote.info_str, size); + struct ast_json_payload *payload; + const char *type; + + if (stasis_message_type(message) != ast_manager_get_generic_type()) { + return; + } + + payload = stasis_message_data(message); + type = ast_json_string_get(ast_json_object_get(payload->json, "type")); + + /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we + * recreate SIP subscriptions. + */ + if (strcmp(type, "FullyBooted")) { + return; + } + + /* This has to be here so the subscription is recreated when the body generator is available */ + ast_sip_push_task(NULL, subscription_persistence_load, NULL); + + /* Once the system is fully booted we don't care anymore */ + stasis_unsubscribe(sub); } -const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub) +typedef int (*on_subscription_t)(struct sip_subscription_tree *sub, void *arg); + +static int for_each_subscription(on_subscription_t on_subscription, void *arg) { - return sub->resource; + int num = 0; + struct sip_subscription_tree *i; + + if (!on_subscription) { + return num; + } + + AST_RWLIST_RDLOCK(&subscriptions); + AST_RWLIST_TRAVERSE(&subscriptions, i, next) { + if (on_subscription(i, arg)) { + break; + } + ++num; + } + AST_RWLIST_UNLOCK(&subscriptions); + return num; } -static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response) +static void sip_subscription_to_ami(struct sip_subscription_tree *sub_tree, + struct ast_str **buf) { - /* If this is a persistence recreation the subscription has already been accepted */ - if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { - return 0; + char str[256]; + struct ast_sip_endpoint_id_configuration *id = &sub_tree->endpoint->id; + + ast_str_append(buf, 0, "Role: %s\r\n", + sip_subscription_roles_map[sub_tree->role]); + ast_str_append(buf, 0, "Endpoint: %s\r\n", + ast_sorcery_object_get_id(sub_tree->endpoint)); + + if (sub_tree->dlg) { + ast_copy_pj_str(str, &sub_tree->dlg->call_id->id, sizeof(str)); + } else { + ast_copy_string(str, "", sizeof(str)); } + ast_str_append(buf, 0, "Callid: %s\r\n", str); + + ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(sub_tree->evsub)); - return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1; + ast_callerid_merge(str, sizeof(str), + S_COR(id->self.name.valid, id->self.name.str, NULL), + S_COR(id->self.number.valid, id->self.number.str, NULL), + "Unknown"); + + ast_str_append(buf, 0, "Callerid: %s\r\n", str); + + /* XXX This needs to be done recursively for lists */ + if (sub_tree->root->handler->to_ami) { + sub_tree->root->handler->to_ami(sub_tree->root, buf); + } } -static void subscription_datastore_destroy(void *obj) + +void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header) { - struct ast_datastore *datastore = obj; + pjsip_dialog *dlg; + pjsip_msg *msg; + pj_str_t name; - /* Using the destroy function (if present) destroy the data */ - if (datastore->info->destroy != NULL && datastore->data != NULL) { - datastore->info->destroy(datastore->data); - datastore->data = NULL; - } + dlg = sub->tree->dlg; + msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG); + pj_cstr(&name, header); - ast_free((void *) datastore->uid); - datastore->uid = NULL; + return pjsip_msg_find_hdr_by_name(msg, &name, NULL); } -struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid) +/* XXX This function is not used. */ +struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler, + struct ast_sip_endpoint *endpoint, const char *resource) { - RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup); - const char *uid_ptr = uid; + struct ast_sip_subscription *sub; + pjsip_dialog *dlg; + struct ast_sip_contact *contact; + pj_str_t event; + pjsip_tx_data *tdata; + pjsip_evsub *evsub; + struct sip_subscription_tree *sub_tree = NULL; - if (!info) { + sub_tree = allocate_subscription_tree(endpoint, NULL); + if (!sub_tree) { return NULL; } - datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy); - if (!datastore) { + sub = allocate_subscription(handler, resource, sub_tree); + if (!sub) { + ao2_cleanup(sub_tree); return NULL; } - datastore->info = info; - if (ast_strlen_zero(uid)) { - /* They didn't provide an ID so we'll provide one ourself */ - struct ast_uuid *uuid = ast_uuid_generate(); - char uuid_buf[AST_UUID_STR_LEN]; - if (!uuid) { - return NULL; - } - uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf)); - ast_free(uuid); + contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors); + if (!contact || ast_strlen_zero(contact->uri)) { + ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n", + ast_sorcery_object_get_id(endpoint)); + ao2_ref(sub_tree, -1); + ao2_cleanup(contact); + return NULL; } - datastore->uid = ast_strdup(uid_ptr); - if (!datastore->uid) { + dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL); + ao2_cleanup(contact); + if (!dlg) { + ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n"); + ao2_ref(sub_tree, -1); return NULL; } - ao2_ref(datastore, +1); - return datastore; -} + pj_cstr(&event, handler->event_name); + pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub_tree->evsub); + subscription_setup_dialog(sub_tree, dlg); -int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore) -{ - ast_assert(datastore != NULL); - ast_assert(datastore->info != NULL); - ast_assert(!ast_strlen_zero(datastore->uid)); + evsub = sub_tree->evsub; - if (!ao2_link(subscription->datastores, datastore)) { - return -1; + if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) { + pjsip_evsub_send_request(sub_tree->evsub, tdata); + } else { + /* pjsip_evsub_terminate will result in pubsub_on_evsub_state, + * being called and terminating the subscription. Therefore, we don't + * need to decrease the reference count of sub here. + */ + pjsip_evsub_terminate(evsub, PJ_TRUE); + ao2_ref(sub_tree, -1); + return NULL; } - return 0; + + add_subscription(sub_tree); + + return sub; } -struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name) +pjsip_dialog *ast_sip_subscription_get_dialog(struct ast_sip_subscription *sub) { - return ao2_find(subscription->datastores, name, OBJ_KEY); + ast_assert(sub->tree->dlg != NULL); + return sub->tree->dlg; } -void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name) +struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub) { - ao2_callback(subscription->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); + ast_assert(sub->tree->endpoint != NULL); + return ao2_bump(sub->tree->endpoint); } -int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore) +struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub) +{ + ast_assert(sub->tree->serializer != NULL); + return sub->tree->serializer; +} + +/*! + * \brief Pre-allocate a buffer for the transmission + * + * Typically, we let PJSIP do this step for us when we send a request. PJSIP's buffer + * allocation algorithm is to allocate a buffer of PJSIP_MAX_PKT_LEN bytes and attempt + * to write the packet to the allocated buffer. If the buffer is too small to hold the + * packet, then we get told the message is too long to be sent. + * + * When dealing with SIP NOTIFY, especially with RLS, it is possible to exceed + * PJSIP_MAX_PKT_LEN. Rather than accepting the limitation imposed on us by default, + * we instead take the strategy of pre-allocating the buffer, testing for ourselves + * if the message will fit, and resizing the buffer as required. + * + * RFC 3261 says that a SIP UDP request can be up to 65535 bytes long. We're capping + * it at 64000 for a couple of reasons: + * 1) Allocating more than 64K at a time is hard to justify + * 2) If the message goes through proxies, those proxies will want to add Via and + * Record-Route headers, making the message even larger. Giving some space for + * those headers is a nice thing to do. + * + * RFC 3261 does not place an upper limit on the size of TCP requests, but we are + * going to impose the same 64K limit as a memory savings. + * + * \param tdata The tdata onto which to allocate a buffer + * \retval 0 Success + * \retval -1 The message is too large + */ +static int allocate_tdata_buffer(pjsip_tx_data *tdata) { - ast_assert(datastore != NULL); - ast_assert(datastore->info != NULL); - ast_assert(!ast_strlen_zero(datastore->uid)); + int buf_size; + int size = -1; + char *buf; - if (!ao2_link(publication->datastores, datastore)) { + for (buf_size = PJSIP_MAX_PKT_LEN; size == -1 && buf_size < 64000; buf_size *= 2) { + buf = pj_pool_alloc(tdata->pool, buf_size); + size = pjsip_msg_print(tdata->msg, buf, buf_size); + } + + if (size == -1) { return -1; } + + tdata->buf.start = buf; + tdata->buf.cur = tdata->buf.start; + tdata->buf.end = tdata->buf.start + buf_size; + return 0; } -struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name) +static int sip_subscription_send_request(struct sip_subscription_tree *sub_tree, pjsip_tx_data *tdata) { - return ao2_find(publication->datastores, name, OBJ_KEY); -} +#ifdef TEST_FRAMEWORK + struct ast_sip_endpoint *endpoint = sub_tree->endpoint; + pjsip_evsub *evsub = sub_tree->evsub; +#endif + int res; -void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name) -{ - ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name); -} + if (allocate_tdata_buffer(tdata)) { + ast_log(LOG_ERROR, "SIP request %s is too large to send.\n", tdata->info); + pjsip_tx_data_dec_ref(tdata); + return -1; + } -AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler); + res = pjsip_evsub_send_request(sub_tree->evsub, tdata); -static int publication_hash_fn(const void *obj, const int flags) -{ - const struct ast_sip_publication *publication = obj; - const int *entity_tag = obj; + subscription_persistence_update(sub_tree, NULL, SUBSCRIPTION_PERSISTENCE_SEND_REQUEST); - return flags & OBJ_KEY ? *entity_tag : publication->entity_tag; -} - -static int publication_cmp_fn(void *obj, void *arg, int flags) -{ - const struct ast_sip_publication *publication1 = obj; - const struct ast_sip_publication *publication2 = arg; - const int *entity_tag = arg; + ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET", + "StateText: %s\r\n" + "Endpoint: %s\r\n", + pjsip_evsub_get_state_name(evsub), + ast_sorcery_object_get_id(endpoint)); - return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ? - CMP_MATCH | CMP_STOP : 0); + return (res == PJ_SUCCESS ? 0 : -1); } -static void publish_add_handler(struct ast_sip_publish_handler *handler) +/*! + * \brief Add a resource XML element to an RLMI body + * + * Each resource element represents a subscribed resource in the list. This function currently + * will unconditionally add an instance element to each created resource element. Instance + * elements refer to later parts in the multipart body. + * + * \param pool PJLIB allocation pool + * \param cid Content-ID header of the resource + * \param resource_name Name of the resource + * \param resource_uri URI of the resource + * \param state State of the subscribed resource + */ +static void add_rlmi_resource(pj_pool_t *pool, pj_xml_node *rlmi, const pjsip_generic_string_hdr *cid, + const char *resource_name, const pjsip_sip_uri *resource_uri, pjsip_evsub_state state) { - SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next); -} + static pj_str_t cid_name = { "cid", 3 }; + pj_xml_node *resource; + pj_xml_node *name; + pj_xml_node *instance; + pj_xml_attr *cid_attr; + char id[6]; + char uri[PJSIP_MAX_URL_SIZE]; + + /* This creates a string representing the Content-ID without the enclosing < > */ + const pj_str_t cid_stripped = { + .ptr = cid->hvalue.ptr + 1, + .slen = cid->hvalue.slen - 2, + }; -int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler) -{ - if (ast_strlen_zero(handler->event_name)) { - ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n"); - return -1; - } + resource = ast_sip_presence_xml_create_node(pool, rlmi, "resource"); + name = ast_sip_presence_xml_create_node(pool, resource, "name"); + instance = ast_sip_presence_xml_create_node(pool, resource, "instance"); - if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS, - publication_hash_fn, publication_cmp_fn))) { - ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n", - handler->event_name); - return -1; - } + pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, resource_uri, uri, sizeof(uri)); + ast_sip_presence_xml_create_attr(pool, resource, "uri", uri); - publish_add_handler(handler); + pj_strdup2(pool, &name->content, resource_name); - ast_module_ref(ast_module_info->self); + ast_generate_random_string(id, sizeof(id)); - return 0; -} + ast_sip_presence_xml_create_attr(pool, instance, "id", id); + ast_sip_presence_xml_create_attr(pool, instance, "state", + state == PJSIP_EVSUB_STATE_TERMINATED ? "terminated" : "active"); -void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler) -{ - struct ast_sip_publish_handler *iter; - SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) { - if (handler == iter) { - AST_RWLIST_REMOVE_CURRENT(next); - ao2_cleanup(handler->publications); - ast_module_unref(ast_module_info->self); - break; - } - } - AST_RWLIST_TRAVERSE_SAFE_END; + /* Use the PJLIB-util XML library directly here since we are using a + * pj_str_t + */ + + cid_attr = pj_xml_attr_new(pool, &cid_name, &cid_stripped); + pj_xml_add_attr(instance, cid_attr); } -AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler); +/*! + * \brief A multipart body part and meta-information + * + * When creating a multipart body part, the end result (the + * pjsip_multipart_part) is hard to inspect without undoing + * a lot of what was done to create it. Therefore, we use this + * structure to store meta-information about the body part. + * + * The main consumer of this is the creator of the RLMI body + * part of a multipart resource list body. + */ +struct body_part { + /*! Content-ID header for the body part */ + pjsip_generic_string_hdr *cid; + /*! Subscribed resource represented in the body part */ + const char *resource; + /*! URI for the subscribed body part */ + pjsip_sip_uri *uri; + /*! Subscription state of the resource represented in the body part */ + pjsip_evsub_state state; + /*! The actual body part that will be present in the multipart body */ + pjsip_multipart_part *part; +}; -static void sub_add_handler(struct ast_sip_subscription_handler *handler) +/*! + * \brief Type declaration for container of body part structures + */ +AST_VECTOR(body_part_list, struct body_part *); + +/*! + * \brief Create a Content-ID header + * + * Content-ID headers are required by RFC2387 for multipart/related + * bodies. They serve as identifiers for each part of the multipart body. + * + * \param pool PJLIB allocation pool + * \param sub Subscription to a resource + */ +static pjsip_generic_string_hdr *generate_content_id_hdr(pj_pool_t *pool, + const struct ast_sip_subscription *sub) { - SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next); - ast_module_ref(ast_module_info->self); + static const pj_str_t cid_name = { "Content-ID", 10 }; + pjsip_generic_string_hdr *cid; + char id[6]; + size_t alloc_size; + pj_str_t cid_value; + + /* '<' + '@' + '>' = 3. pj_str_t does not require a null-terminator */ + alloc_size = sizeof(id) + pj_strlen(&sub->uri->host) + 3; + cid_value.ptr = pj_pool_alloc(pool, alloc_size); + cid_value.slen = sprintf(cid_value.ptr, "<%s@%.*s>", + ast_generate_random_string(id, sizeof(id)), + (int) pj_strlen(&sub->uri->host), pj_strbuf(&sub->uri->host)); + cid = pjsip_generic_string_hdr_create(pool, &cid_name, &cid_value); + + return cid; } -static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name) +static int rlmi_print_body(struct pjsip_msg_body *msg_body, char *buf, pj_size_t size) { - struct ast_sip_subscription_handler *iter; - SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + int num_printed; + pj_xml_node *rlmi = msg_body->data; - AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) { - if (!strcmp(iter->event_name, event_name)) { - break; - } + num_printed = pj_xml_print(rlmi, buf, size, PJ_TRUE); + if (num_printed <= AST_PJSIP_XML_PROLOG_LEN) { + return -1; } - return iter; + + return num_printed; } -int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler) +static void *rlmi_clone_data(pj_pool_t *pool, const void *data, unsigned len) { - pj_str_t event; - pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, }; - struct ast_sip_subscription_handler *existing; - int i = 0; + const pj_xml_node *rlmi = data; - if (ast_strlen_zero(handler->event_name)) { - ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n"); - return -1; - } + return pj_xml_clone(pool, rlmi); +} - existing = find_sub_handler_for_event_name(handler->event_name); - if (existing) { - ast_log(LOG_ERROR, "Unable to register subscription handler for event %s." - "A handler is already registered\n", handler->event_name); - return -1; - } +/*! + * \brief Create an RLMI body part for a multipart resource list body + * + * RLMI (Resource list meta information) is a special body type that lists + * the subscribed resources and tells subscribers the number of subscribed + * resources and what other body parts are in the multipart body. The + * RLMI body also has a version number that a subscriber can use to ensure + * that the locally-stored state corresponds to server state. + * + * \param pool The allocation pool + * \param sub The subscription representing the subscribed resource list + * \param body_parts A container of body parts that RLMI will refer to + * \param full_state Indicates whether this is a full or partial state notification + * \return The multipart part representing the RLMI body + */ +static pjsip_multipart_part *build_rlmi_body(pj_pool_t *pool, struct ast_sip_subscription *sub, + struct body_part_list *body_parts, unsigned int full_state) +{ + static const pj_str_t rlmi_type = { "application", 11 }; + static const pj_str_t rlmi_subtype = { "rlmi+xml", 8 }; + pj_xml_node *rlmi; + pj_xml_node *name; + pjsip_multipart_part *rlmi_part; + char version_str[32]; + char uri[PJSIP_MAX_URL_SIZE]; + pjsip_generic_string_hdr *cid; + int i; - for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) { - pj_cstr(&accept[i], handler->accept[i]); - } + rlmi = ast_sip_presence_xml_create_node(pool, NULL, "list"); + ast_sip_presence_xml_create_attr(pool, rlmi, "xmlns", "urn:ietf:params:xml:ns:rlmi"); - pj_cstr(&event, handler->event_name); + ast_sip_subscription_get_local_uri(sub, uri, sizeof(uri)); + ast_sip_presence_xml_create_attr(pool, rlmi, "uri", uri); - pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); + snprintf(version_str, sizeof(version_str), "%u", sub->version++); + ast_sip_presence_xml_create_attr(pool, rlmi, "version", version_str); + ast_sip_presence_xml_create_attr(pool, rlmi, "fullState", full_state ? "true" : "false"); - sub_add_handler(handler); + name = ast_sip_presence_xml_create_node(pool, rlmi, "name"); + pj_strdup2(pool, &name->content, ast_sip_subscription_get_resource_name(sub)); - return 0; -} + for (i = 0; i < AST_VECTOR_SIZE(body_parts); ++i) { + const struct body_part *part = AST_VECTOR_GET(body_parts, i); -void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler) -{ - struct ast_sip_subscription_handler *iter; - SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) { - if (handler == iter) { - AST_RWLIST_REMOVE_CURRENT(next); - ast_module_unref(ast_module_info->self); - break; - } + add_rlmi_resource(pool, rlmi, part->cid, part->resource, part->uri, part->state); } - AST_RWLIST_TRAVERSE_SAFE_END; + + rlmi_part = pjsip_multipart_create_part(pool); + + rlmi_part->body = PJ_POOL_ZALLOC_T(pool, pjsip_msg_body); + pj_strdup(pool, &rlmi_part->body->content_type.type, &rlmi_type); + pj_strdup(pool, &rlmi_part->body->content_type.subtype, &rlmi_subtype); + pj_list_init(&rlmi_part->body->content_type.param); + + rlmi_part->body->data = pj_xml_clone(pool, rlmi); + rlmi_part->body->clone_data = rlmi_clone_data; + rlmi_part->body->print_body = rlmi_print_body; + + cid = generate_content_id_hdr(pool, sub); + pj_list_insert_before(&rlmi_part->hdr, cid); + + return rlmi_part; } -static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type, - const char *content_subtype) +static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root, + unsigned int force_full_state); + +/*! + * \brief Destroy a list of body parts + * + * \param parts The container of parts to destroy + */ +static void free_body_parts(struct body_part_list *parts) { - struct ast_sip_pubsub_body_generator *iter; - SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); + int i; - AST_LIST_TRAVERSE(&body_generators, iter, list) { - if (!strcmp(iter->type, content_type) && - !strcmp(iter->subtype, content_subtype)) { - break; - } - }; + for (i = 0; i < AST_VECTOR_SIZE(parts); ++i) { + struct body_part *part = AST_VECTOR_GET(parts, i); + ast_free(part); + } - return iter; + AST_VECTOR_FREE(parts); } -static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept) +/*! + * \brief Allocate and initialize a body part structure + * + * \param pool PJLIB allocation pool + * \param sub Subscription representing a subscribed resource + */ +static struct body_part *allocate_body_part(pj_pool_t *pool, const struct ast_sip_subscription *sub) { - char *accept_copy = ast_strdupa(accept); - char *subtype = accept_copy; - char *type = strsep(&subtype, "/"); + struct body_part *bp; - if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) { + bp = ast_calloc(1, sizeof(*bp)); + if (!bp) { return NULL; } - return find_body_generator_type_subtype(type, subtype); + bp->cid = generate_content_id_hdr(pool, sub); + bp->resource = sub->resource; + bp->state = sub->subscription_state; + bp->uri = sub->uri; + + return bp; } -static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64], - size_t num_accept) +/*! + * \brief Create a multipart body part for a subscribed resource + * + * \param pool PJLIB allocation pool + * \param sub The subscription representing a subscribed resource + * \param parts A vector of parts to append the created part to. + * \param use_full_state Unused locally, but may be passed to other functions + */ +static void build_body_part(pj_pool_t *pool, struct ast_sip_subscription *sub, + struct body_part_list *parts, unsigned int use_full_state) { - int i; - struct ast_sip_pubsub_body_generator *generator = NULL; + struct body_part *bp; + pjsip_msg_body *body; - for (i = 0; i < num_accept; ++i) { - generator = find_body_generator_accept(accept[i]); - if (generator) { - ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]); - break; - } else { - ast_debug(3, "No body generator found for accept type %s\n", accept[i]); - } + bp = allocate_body_part(pool, sub); + if (!bp) { + return; } - return generator; + body = generate_notify_body(pool, sub, use_full_state); + if (!body) { + /* Partial state was requested and the resource has not changed state */ + ast_free(bp); + return; + } + + bp->part = pjsip_multipart_create_part(pool); + bp->part->body = body; + pj_list_insert_before(&bp->part->hdr, bp->cid); + + AST_VECTOR_APPEND(parts, bp); } -static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) +/*! + * \brief Create and initialize the PJSIP multipart body structure for a resource list subscription + * + * \param pool + * \return The multipart message body + */ +static pjsip_msg_body *create_multipart_body(pj_pool_t *pool) { - pjsip_expires_hdr *expires_header; - struct ast_sip_subscription_handler *handler; - RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); - struct ast_sip_subscription *sub; - struct ast_sip_pubsub_body_generator *generator; - char *resource; - pjsip_uri *request_uri; - pjsip_sip_uri *request_uri_sip; - size_t resource_size; - int resp; - - endpoint = ast_pjsip_rdata_get_endpoint(rdata); - ast_assert(endpoint != NULL); + pjsip_media_type media_type; + pjsip_param *media_type_param; + char boundary[6]; + pj_str_t pj_boundary; - if (!endpoint->subscription.allow) { - ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint)); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL); - return PJ_TRUE; - } + pjsip_media_type_init2(&media_type, "multipart", "related"); - request_uri = rdata->msg_info.msg->line.req.uri; + media_type_param = pj_pool_alloc(pool, sizeof(*media_type_param)); + pj_list_init(media_type_param); - if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { - char uri_str[PJSIP_MAX_URL_SIZE]; + pj_strdup2(pool, &media_type_param->name, "type"); + pj_strdup2(pool, &media_type_param->value, "\"application/rlmi+xml\""); - pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); - ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); - return PJ_TRUE; - } + pj_list_insert_before(&media_type.param, media_type_param); - request_uri_sip = pjsip_uri_get_uri(request_uri); - resource_size = pj_strlen(&request_uri_sip->user) + 1; - resource = alloca(resource_size); - ast_copy_pj_str(resource, &request_uri_sip->user, resource_size); + pj_cstr(&pj_boundary, ast_generate_random_string(boundary, sizeof(boundary))); + return pjsip_multipart_create(pool, &media_type, &pj_boundary); +} - expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next); +/*! + * \brief Create a resource list body for NOTIFY requests + * + * Resource list bodies are multipart/related bodies. The first part of the multipart body + * is an RLMI body that describes the rest of the parts to come. The other parts of the body + * convey state of individual subscribed resources. + * + * \param pool PJLIB allocation pool + * \param sub Subscription details from which to generate body + * \param force_full_state If true, ignore resource list settings and send a full state notification + * \return The generated multipart/related body + */ +static pjsip_msg_body *generate_list_body(pj_pool_t *pool, struct ast_sip_subscription *sub, + unsigned int force_full_state) +{ + int i; + pjsip_multipart_part *rlmi_part; + pjsip_msg_body *multipart; + struct body_part_list body_parts; + unsigned int use_full_state = force_full_state ? 1 : sub->full_state; - if (expires_header) { - if (expires_header->ivalue == 0) { - ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n", - ast_sorcery_object_get_id(endpoint)); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); - return PJ_TRUE; - } - if (expires_header->ivalue < endpoint->subscription.minexpiry) { - ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n", - expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL); - return PJ_TRUE; - } + if (AST_VECTOR_INIT(&body_parts, AST_VECTOR_SIZE(&sub->children))) { + return NULL; } - handler = subscription_get_handler_from_rdata(rdata); - if (!handler) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; + for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) { + build_body_part(pool, AST_VECTOR_GET(&sub->children, i), &body_parts, use_full_state); } - generator = subscription_get_generator_from_rdata(rdata, handler); - if (!generator) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; + /* This can happen if issuing partial state and no children of the list have changed state */ + if (AST_VECTOR_SIZE(&body_parts) == 0) { + return NULL; } - resp = handler->notifier->new_subscribe(endpoint, resource); - if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); - return PJ_TRUE; + multipart = create_multipart_body(pool); + + rlmi_part = build_rlmi_body(pool, sub, &body_parts, use_full_state); + if (!rlmi_part) { + return NULL; } + pjsip_multipart_add_part(pool, multipart, rlmi_part); - sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator); - if (!sub) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); - } else { - sub->persistence = subscription_persistence_create(sub); - subscription_persistence_update(sub, rdata); - sip_subscription_accept(sub, rdata, resp); - if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) { - pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE); - } + for (i = 0; i < AST_VECTOR_SIZE(&body_parts); ++i) { + pjsip_multipart_add_part(pool, multipart, AST_VECTOR_GET(&body_parts, i)->part); } - return PJ_TRUE; + free_body_parts(&body_parts); + return multipart; } -static struct ast_sip_publish_handler *find_pub_handler(const char *event) +/*! + * \brief Create the body for a NOTIFY request. + * + * \param pool The pool used for allocations + * \param root The root of the subscription tree + * \param force_full_state If true, ignore resource list settings and send a full state notification + */ +static pjsip_msg_body *generate_notify_body(pj_pool_t *pool, struct ast_sip_subscription *root, + unsigned int force_full_state) { - struct ast_sip_publish_handler *iter = NULL; - SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK); - - AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) { - if (strcmp(event, iter->event_name)) { - ast_debug(3, "Event %s does not match %s\n", event, iter->event_name); - continue; + pjsip_msg_body *body; + + if (AST_VECTOR_SIZE(&root->children) == 0) { + if (force_full_state || root->body_changed) { + /* Not a list. We've already generated the body and saved it on the subscription. + * Use that directly. + */ + pj_str_t type; + pj_str_t subtype; + pj_str_t text; + + pj_cstr(&type, ast_sip_subscription_get_body_type(root)); + pj_cstr(&subtype, ast_sip_subscription_get_body_subtype(root)); + pj_cstr(&text, ast_str_buffer(root->body_text)); + + body = pjsip_msg_body_create(pool, &type, &subtype, &text); + root->body_changed = 0; + } else { + body = NULL; } - ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name); - break; + } else { + body = generate_list_body(pool, root, force_full_state); } - return iter; + return body; } -static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata, - pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id) +/*! + * \brief Shortcut method to create a Require: eventlist header + */ +static pjsip_require_hdr *create_require_eventlist(pj_pool_t *pool) { - pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + pjsip_require_hdr *require; - if (etag_hdr) { - char etag[pj_strlen(&etag_hdr->hvalue) + 1]; + require = pjsip_require_hdr_create(pool); + pj_strdup2(pool, &require->values[0], "eventlist"); + require->count = 1; - ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag)); + return require; +} - if (sscanf(etag, "%30d", entity_id) != 1) { - return SIP_PUBLISH_UNKNOWN; - } +/*! + * \brief Send a NOTIFY request to a subscriber + * + * \pre sub_tree->dlg is locked + * + * \param sub_tree The subscription tree representing the subscription + * \param force_full_state If true, ignore resource list settings and send full resource list state. + * \retval 0 Success + * \retval non-zero Failure + */ +static int send_notify(struct sip_subscription_tree *sub_tree, unsigned int force_full_state) +{ + pjsip_evsub *evsub = sub_tree->evsub; + pjsip_tx_data *tdata; + + if (ast_shutdown_final() + && sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED + && sub_tree->persistence) { + return 0; } - *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + if (pjsip_evsub_notify(evsub, sub_tree->root->subscription_state, + NULL, NULL, &tdata) != PJ_SUCCESS) { + return -1; + } - if (!(*expires)) { - return SIP_PUBLISH_REMOVE; - } else if (!etag_hdr && rdata->msg_info.msg->body) { - return SIP_PUBLISH_INITIAL; - } else if (etag_hdr && !rdata->msg_info.msg->body) { - return SIP_PUBLISH_REFRESH; - } else if (etag_hdr && rdata->msg_info.msg->body) { - return SIP_PUBLISH_MODIFY; + tdata->msg->body = generate_notify_body(tdata->pool, sub_tree->root, force_full_state); + if (!tdata->msg->body) { + pjsip_tx_data_dec_ref(tdata); + return -1; } - return SIP_PUBLISH_UNKNOWN; -} + if (sub_tree->is_list) { + pjsip_require_hdr *require = create_require_eventlist(tdata->pool); + pjsip_msg_add_hdr(tdata->msg, (pjsip_hdr *) require); + } -/*! \brief Internal destructor for publications */ -static void publication_destroy_fn(void *obj) -{ - struct ast_sip_publication *publication = obj; + if (sip_subscription_send_request(sub_tree, tdata)) { + /* do not call pjsip_tx_data_dec_ref(tdata). The pjsip_dlg_send_request deletes the message on error */ + return -1; + } - ast_debug(3, "Destroying SIP publication\n"); + sub_tree->send_scheduled_notify = 0; - ao2_cleanup(publication->datastores); - ao2_cleanup(publication->endpoint); + return 0; } -static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata) +static int serialized_send_notify(void *userdata) { - struct ast_sip_publication *publication; - pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + struct sip_subscription_tree *sub_tree = userdata; + pjsip_dialog *dlg = sub_tree->dlg; - ast_assert(endpoint != NULL); + pjsip_dlg_inc_lock(dlg); - if (!(publication = ao2_alloc(sizeof(*publication), publication_destroy_fn))) { - return NULL; + /* It's possible that between when the notification was scheduled + * and now a new SUBSCRIBE arrived requiring full state to be + * sent out in an immediate NOTIFY. It's also possible that we're + * already processing a terminate. If that has happened, we need to + * bail out here instead of sending the batched NOTIFY. + */ + + if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS + || !sub_tree->send_scheduled_notify) { + pjsip_dlg_dec_lock(dlg); + ao2_cleanup(sub_tree); + return 0; } - if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) { - ao2_ref(publication, -1); - return NULL; + if (sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED) { + sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS; } - publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); - ao2_ref(endpoint, +1); - publication->endpoint = endpoint; - publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; - publication->sched_id = -1; + send_notify(sub_tree, 0); - return publication; + ast_test_suite_event_notify( + sub_tree->state == SIP_SUB_TREE_TERMINATED + ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED", + "Resource: %s", sub_tree->root->resource); + + sub_tree->notify_sched_id = -1; + pjsip_dlg_dec_lock(dlg); + ao2_cleanup(sub_tree); + return 0; } -static int sip_publication_respond(struct ast_sip_publication *pub, int status_code, - pjsip_rx_data *rdata) +static int sched_cb(const void *data) { - pj_status_t status; - pjsip_tx_data *tdata; - pjsip_transaction *tsx; + struct sip_subscription_tree *sub_tree = (struct sip_subscription_tree *) data; - if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) { - return -1; + /* We don't need to bump the refcount of sub_tree since we bumped it when scheduling this task */ + if (ast_sip_push_task(sub_tree->serializer, serialized_send_notify, sub_tree)) { + ao2_cleanup(sub_tree); } - if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { - RAII_VAR(char *, entity_tag, NULL, ast_free_ptr); - RAII_VAR(char *, expires, NULL, ast_free_ptr); - - if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) || - (ast_asprintf(&expires, "%d", pub->expires) < 0)) { - pjsip_tx_data_dec_ref(tdata); - return -1; - } - - ast_sip_add_header(tdata, "SIP-ETag", entity_tag); - ast_sip_add_header(tdata, "Expires", expires); - } + return 0; +} - if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) { - return -1; +static int schedule_notification(struct sip_subscription_tree *sub_tree) +{ + /* There's already a notification scheduled */ + if (sub_tree->notify_sched_id > -1) { + return 0; } - pjsip_tsx_recv_msg(tsx, rdata); - - if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) { + sub_tree->send_scheduled_notify = 1; + sub_tree->notify_sched_id = ast_sched_add(sched, sub_tree->notification_batch_interval, sched_cb, ao2_bump(sub_tree)); + if (sub_tree->notify_sched_id < 0) { + ao2_cleanup(sub_tree); return -1; } return 0; } -static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, - struct ast_sip_publish_handler *handler) +int ast_sip_subscription_notify(struct ast_sip_subscription *sub, struct ast_sip_body_data *notify_data, + int terminate) { - struct ast_sip_publication *publication; - char *resource; - size_t resource_size; - pjsip_uri *request_uri; - pjsip_sip_uri *request_uri_sip; - int resp; - - request_uri = rdata->msg_info.msg->line.req.uri; + int res; + pjsip_dialog *dlg = sub->tree->dlg; - if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { - char uri_str[PJSIP_MAX_URL_SIZE]; + pjsip_dlg_inc_lock(dlg); - pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); - ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); - return NULL; + if (sub->tree->state != SIP_SUB_TREE_NORMAL) { + pjsip_dlg_dec_lock(dlg); + return 0; } - request_uri_sip = pjsip_uri_get_uri(request_uri); - resource_size = pj_strlen(&request_uri_sip->user) + 1; - resource = alloca(resource_size); - ast_copy_pj_str(resource, &request_uri_sip->user, resource_size); - - resp = handler->new_publication(endpoint, resource); - - if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); - return NULL; + if (ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub), + ast_sip_subscription_get_body_subtype(sub), notify_data, &sub->body_text)) { + pjsip_dlg_dec_lock(dlg); + return -1; } - publication = sip_create_publication(endpoint, rdata); - - if (!publication) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); - return NULL; + sub->body_changed = 1; + if (terminate) { + sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED; + sub->tree->state = SIP_SUB_TREE_TERMINATE_PENDING; } - publication->handler = handler; - if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body, - AST_SIP_PUBLISH_STATE_INITIALIZED)) { - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); - ao2_cleanup(publication); - return NULL; + if (sub->tree->notification_batch_interval) { + res = schedule_notification(sub->tree); + } else { + /* See the note in pubsub_on_rx_refresh() for why sub->tree is refbumped here */ + ao2_ref(sub->tree, +1); + if (terminate) { + sub->tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS; + } + res = send_notify(sub->tree, 0); + ast_test_suite_event_notify(terminate ? "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_STATE_CHANGED", + "Resource: %s", + sub->tree->root->resource); + ao2_ref(sub->tree, -1); } - sip_publication_respond(publication, resp, rdata); - - return publication; + pjsip_dlg_dec_lock(dlg); + return res; } -static int publish_expire_callback(void *data) +pjsip_sip_uri *ast_sip_subscription_get_sip_uri(struct ast_sip_subscription *sub) { - RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup); - - publication->handler->publish_expire(publication); - - return 0; + return sub->uri; } -static int publish_expire(const void *data) +void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size) { - struct ast_sip_publication *publication = (struct ast_sip_publication*)data; + pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, sub->uri, buf, size); +} - ao2_unlink(publication->handler->publications, publication); - publication->sched_id = -1; +void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size) +{ + pjsip_dialog *dlg; - if (ast_sip_push_task(NULL, publish_expire_callback, publication)) { - ao2_cleanup(publication); - } + dlg = sub->tree->dlg; + ast_copy_pj_str(buf, &dlg->remote.info_str, size); +} - return 0; +const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub) +{ + return sub->resource; } -static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) +int ast_sip_subscription_is_terminated(const struct ast_sip_subscription *sub) { - pjsip_event_hdr *event_header; - struct ast_sip_publish_handler *handler; - RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); - char event[32]; - static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 }; - pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL); - enum sip_publish_type publish_type; - RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup); - int expires = 0, entity_id; + return sub->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? 1 : 0; +} - endpoint = ast_pjsip_rdata_get_endpoint(rdata); - ast_assert(endpoint != NULL); +static int sip_subscription_accept(struct sip_subscription_tree *sub_tree, pjsip_rx_data *rdata, int response) +{ + pjsip_hdr res_hdr; - event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); - if (!event_header) { - ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n"); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; + /* If this is a persistence recreation the subscription has already been accepted */ + if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) { + return 0; } - ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); - handler = find_pub_handler(event); - if (!handler) { - ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event); - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); - return PJ_TRUE; + pj_list_init(&res_hdr); + if (sub_tree->is_list) { + /* If subscribing to a list, our response has to have a Require: eventlist header in it */ + pj_list_insert_before(&res_hdr, create_require_eventlist(rdata->tp_info.pool)); } - publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id); + return pjsip_evsub_accept(sub_tree->evsub, rdata, response, &res_hdr) == PJ_SUCCESS ? 0 : -1; +} - /* If this is not an initial publish ensure that a publication is present */ - if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) { - if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) { - static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 }; +struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid) +{ + return ast_datastores_alloc_datastore(info, uid); +} - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed, - NULL, NULL); - return PJ_TRUE; - } +int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore) +{ + return ast_datastores_add(subscription->datastores, datastore); +} - /* Per the RFC every response has to have a new entity tag */ - publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); +struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name) +{ + return ast_datastores_find(subscription->datastores, name); +} - /* Update the expires here so that the created responses will contain the correct value */ - publication->expires = expires; - } +void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name) +{ + ast_datastores_remove(subscription->datastores, name); +} - switch (publish_type) { - case SIP_PUBLISH_INITIAL: - publication = publish_request_initial(endpoint, rdata, handler); - break; - case SIP_PUBLISH_REFRESH: - sip_publication_respond(publication, 200, rdata); - case SIP_PUBLISH_MODIFY: - if (handler->publication_state_change(publication, rdata->msg_info.msg->body, - AST_SIP_PUBLISH_STATE_ACTIVE)) { - /* If an error occurs we want to terminate the publication */ - expires = 0; - } - sip_publication_respond(publication, 200, rdata); - break; - case SIP_PUBLISH_REMOVE: - handler->publication_state_change(publication, rdata->msg_info.msg->body, - AST_SIP_PUBLISH_STATE_TERMINATED); - sip_publication_respond(publication, 200, rdata); - break; - case SIP_PUBLISH_UNKNOWN: - default: - pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); - break; - } +struct ao2_container *ast_sip_subscription_get_datastores(const struct ast_sip_subscription *subscription) +{ + return subscription->datastores; +} - if (publication) { - if (expires) { - ao2_link(handler->publications, publication); +int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore) +{ + return ast_datastores_add(publication->datastores, datastore); +} - AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication, - ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1)); - } else { - AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1)); - } - } +struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name) +{ + return ast_datastores_find(publication->datastores, name); +} - return PJ_TRUE; +void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name) +{ + ast_datastores_remove(publication->datastores, name); } -struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub) +struct ao2_container *ast_sip_publication_get_datastores(const struct ast_sip_publication *publication) { - return pub->endpoint; + return publication->datastores; } +AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler); -int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator) +static int publication_hash_fn(const void *obj, const int flags) { - struct ast_sip_pubsub_body_generator *existing; - pj_str_t accept; - pj_size_t accept_len; + const struct ast_sip_publication *publication = obj; + const int *entity_tag = obj; - existing = find_body_generator_type_subtype(generator->type, generator->subtype); - if (existing) { - ast_log(LOG_WARNING, "Cannot register body generator of %s/%s." - "One is already registered.\n", generator->type, generator->subtype); - return -1; - } + return flags & OBJ_KEY ? *entity_tag : publication->entity_tag; +} - AST_RWLIST_WRLOCK(&body_generators); - AST_LIST_INSERT_HEAD(&body_generators, generator, list); - AST_RWLIST_UNLOCK(&body_generators); +static int publication_cmp_fn(void *obj, void *arg, int flags) +{ + const struct ast_sip_publication *publication1 = obj; + const struct ast_sip_publication *publication2 = arg; + const int *entity_tag = arg; - /* Lengths of type and subtype plus space for a slash. pj_str_t is not - * null-terminated, so there is no need to allocate for the extra null - * byte - */ - accept_len = strlen(generator->type) + strlen(generator->subtype) + 1; + return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ? + CMP_MATCH | CMP_STOP : 0); +} - accept.ptr = alloca(accept_len); - accept.slen = accept_len; - /* Safe use of sprintf */ - sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype); - pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, - PJSIP_H_ACCEPT, NULL, 1, &accept); +static void publish_add_handler(struct ast_sip_publish_handler *handler) +{ + AST_RWLIST_WRLOCK(&publish_handlers); + AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next); + AST_RWLIST_UNLOCK(&publish_handlers); +} + +int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler) +{ + if (ast_strlen_zero(handler->event_name)) { + ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n"); + return -1; + } + + if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS, + publication_hash_fn, publication_cmp_fn))) { + ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n", + handler->event_name); + return -1; + } + + publish_add_handler(handler); + + ast_module_ref(ast_module_info->self); return 0; } -void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator) +void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler) { - struct ast_sip_pubsub_body_generator *iter; - SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + struct ast_sip_publish_handler *iter; - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) { - if (iter == generator) { - AST_LIST_REMOVE_CURRENT(list); + AST_RWLIST_WRLOCK(&publish_handlers); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) { + if (handler == iter) { + AST_RWLIST_REMOVE_CURRENT(next); + ao2_cleanup(handler->publications); + ast_module_unref(ast_module_info->self); break; } } AST_RWLIST_TRAVERSE_SAFE_END; + AST_RWLIST_UNLOCK(&publish_handlers); } -int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement) +AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler); + +static void sub_add_handler(struct ast_sip_subscription_handler *handler) { - AST_RWLIST_WRLOCK(&body_supplements); - AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list); - AST_RWLIST_UNLOCK(&body_supplements); + AST_RWLIST_WRLOCK(&subscription_handlers); + AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next); + ast_module_ref(ast_module_info->self); + AST_RWLIST_UNLOCK(&subscription_handlers); +} + +static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name) +{ + struct ast_sip_subscription_handler *iter; + + AST_RWLIST_RDLOCK(&subscription_handlers); + AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) { + if (!strcmp(iter->event_name, event_name)) { + break; + } + } + AST_RWLIST_UNLOCK(&subscription_handlers); + return iter; +} + +int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler) +{ + pj_str_t event; + pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, }; + struct ast_sip_subscription_handler *existing; + int i = 0; + + if (ast_strlen_zero(handler->event_name)) { + ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n"); + return -1; + } + + existing = find_sub_handler_for_event_name(handler->event_name); + if (existing) { + ast_log(LOG_ERROR, + "Unable to register subscription handler for event %s. A handler is already registered\n", + handler->event_name); + return -1; + } + + for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) { + pj_cstr(&accept[i], handler->accept[i]); + } + + pj_cstr(&event, handler->event_name); + + pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept); + + sub_add_handler(handler); + + return 0; +} + +void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler) +{ + struct ast_sip_subscription_handler *iter; + + AST_RWLIST_WRLOCK(&subscription_handlers); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) { + if (handler == iter) { + AST_RWLIST_REMOVE_CURRENT(next); + ast_module_unref(ast_module_info->self); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; + AST_RWLIST_UNLOCK(&subscription_handlers); +} + +static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype_nolock(const char *type, const char *subtype) +{ + struct ast_sip_pubsub_body_generator *gen; + + AST_LIST_TRAVERSE(&body_generators, gen, list) { + if (!strcmp(gen->type, type) + && !strcmp(gen->subtype, subtype)) { + break; + } + } + + return gen; +} + +static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *type, const char *subtype) +{ + struct ast_sip_pubsub_body_generator *gen; + + AST_RWLIST_RDLOCK(&body_generators); + gen = find_body_generator_type_subtype_nolock(type, subtype); + AST_RWLIST_UNLOCK(&body_generators); + return gen; +} + +static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept) +{ + char *accept_copy = ast_strdupa(accept); + char *subtype = accept_copy; + char *type = strsep(&subtype, "/"); + + if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) { + return NULL; + } + + return find_body_generator_type_subtype(type, subtype); +} + +static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64], + size_t num_accept, const char *body_type) +{ + int i; + struct ast_sip_pubsub_body_generator *generator = NULL; + + for (i = 0; i < num_accept; ++i) { + generator = find_body_generator_accept(accept[i]); + if (generator) { + ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]); + if (strcmp(generator->body_type, body_type)) { + ast_log(LOG_WARNING, "Body generator '%s/%s'(%p) does not accept the type of data this event generates\n", + generator->type, generator->subtype, generator); + generator = NULL; + continue; + } + break; + } else { + ast_debug(3, "No body generator found for accept type %s\n", accept[i]); + } + } + + return generator; +} + +static int generate_initial_notify(struct ast_sip_subscription *sub) +{ + void *notify_data; + int res; + struct ast_sip_body_data data = { + .body_type = sub->handler->body_type, + }; + + if (AST_VECTOR_SIZE(&sub->children) > 0) { + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) { + if (generate_initial_notify(AST_VECTOR_GET(&sub->children, i))) { + return -1; + } + } + + return 0; + } + + /* We notify subscription establishment only on the tree leaves. */ + if (sub->handler->notifier->subscription_established(sub)) { + return -1; + } + + notify_data = sub->handler->notifier->get_notify_data(sub); + if (!notify_data) { + return -1; + } + + data.body_data = notify_data; + + res = ast_sip_pubsub_generate_body_content(ast_sip_subscription_get_body_type(sub), + ast_sip_subscription_get_body_subtype(sub), &data, &sub->body_text); + + ao2_cleanup(notify_data); + + return res; +} + +static int pubsub_on_refresh_timeout(void *userdata); + +static int initial_notify_task(void * obj) +{ + struct initial_notify_data *ind = obj; + + if (generate_initial_notify(ind->sub_tree->root)) { + pjsip_evsub_terminate(ind->sub_tree->evsub, PJ_TRUE); + } else { + send_notify(ind->sub_tree, 1); + ast_test_suite_event_notify("SUBSCRIPTION_ESTABLISHED", + "Resource: %s", + ind->sub_tree->root->resource); + } + + if (ind->expires > -1) { + char *name = ast_alloca(strlen("->/ ") + + strlen(ind->sub_tree->persistence->endpoint) + + strlen(ind->sub_tree->root->resource) + + strlen(ind->sub_tree->root->handler->event_name) + + ind->sub_tree->dlg->call_id->id.slen + 1); + + sprintf(name, "%s->%s/%s %.*s", ind->sub_tree->persistence->endpoint, + ind->sub_tree->root->resource, ind->sub_tree->root->handler->event_name, + (int)ind->sub_tree->dlg->call_id->id.slen, ind->sub_tree->dlg->call_id->id.ptr); + + ast_debug(3, "Scheduling timer: %s\n", name); + ind->sub_tree->expiration_task = ast_sip_schedule_task(ind->sub_tree->serializer, + ind->expires * 1000, pubsub_on_refresh_timeout, name, + ind->sub_tree, AST_SIP_SCHED_TASK_FIXED | AST_SIP_SCHED_TASK_DATA_AO2); + if (!ind->sub_tree->expiration_task) { + ast_log(LOG_ERROR, "Unable to create expiration timer of %d seconds for %s\n", + ind->expires, name); + } + } + + ao2_ref(ind->sub_tree, -1); + ast_free(ind); + + return 0; +} + +static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata) +{ + pjsip_expires_hdr *expires_header; + struct ast_sip_subscription_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + struct sip_subscription_tree *sub_tree; + struct ast_sip_pubsub_body_generator *generator; + char *resource; + pjsip_uri *request_uri; + pjsip_sip_uri *request_uri_sip; + size_t resource_size; + int resp; + struct resource_tree tree; + pj_status_t dlg_status; + + endpoint = ast_pjsip_rdata_get_endpoint(rdata); + ast_assert(endpoint != NULL); + + if (!endpoint->subscription.allow) { + ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint)); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL); + return PJ_TRUE; + } + + request_uri = rdata->msg_info.msg->line.req.uri; + + if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { + char uri_str[PJSIP_MAX_URL_SIZE]; + + pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); + ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); + return PJ_TRUE; + } + + request_uri_sip = pjsip_uri_get_uri(request_uri); + resource_size = pj_strlen(&request_uri_sip->user) + 1; + resource = ast_alloca(resource_size); + ast_copy_pj_str(resource, &request_uri_sip->user, resource_size); + + /* + * We may want to match without any user options getting + * in the way. + */ + AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource); + + expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next); + if (expires_header) { + if (expires_header->ivalue == 0) { + ast_log(LOG_WARNING, "Subscription request from endpoint %s rejected. Expiration of 0 is invalid\n", + ast_sorcery_object_get_id(endpoint)); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + return PJ_TRUE; + } + if (expires_header->ivalue < endpoint->subscription.minexpiry) { + ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n", + expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL); + return PJ_TRUE; + } + } + + handler = subscription_get_handler_from_rdata(rdata); + if (!handler) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + generator = subscription_get_generator_from_rdata(rdata, handler); + if (!generator) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + memset(&tree, 0, sizeof(tree)); + resp = build_resource_tree(endpoint, handler, resource, &tree, + ast_sip_pubsub_has_eventlist_support(rdata)); + if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); + resource_tree_destroy(&tree); + return PJ_TRUE; + } + + sub_tree = create_subscription_tree(handler, endpoint, rdata, resource, generator, &tree, &dlg_status); + if (!sub_tree) { + if (dlg_status != PJ_EEXISTS) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); + } + } else { + struct initial_notify_data *ind = ast_malloc(sizeof(*ind)); + + if (!ind) { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + resource_tree_destroy(&tree); + return PJ_TRUE; + } + + ind->sub_tree = ao2_bump(sub_tree); + /* Since this is a normal subscribe, pjproject takes care of the timer */ + ind->expires = -1; + + sub_tree->persistence = subscription_persistence_create(sub_tree); + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_CREATED); + sip_subscription_accept(sub_tree, rdata, resp); + if (ast_sip_push_task(sub_tree->serializer, initial_notify_task, ind)) { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + ao2_ref(sub_tree, -1); + ast_free(ind); + } + } + + resource_tree_destroy(&tree); + return PJ_TRUE; +} + +static struct ast_sip_publish_handler *find_pub_handler(const char *event) +{ + struct ast_sip_publish_handler *iter = NULL; + + AST_RWLIST_RDLOCK(&publish_handlers); + AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) { + if (strcmp(event, iter->event_name)) { + ast_debug(3, "Event %s does not match %s\n", event, iter->event_name); + continue; + } + ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name); + break; + } + AST_RWLIST_UNLOCK(&publish_handlers); + + return iter; +} + +static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata, + pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id) +{ + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + + if (etag_hdr) { + char etag[pj_strlen(&etag_hdr->hvalue) + 1]; + + ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag)); + + if (sscanf(etag, "%30d", entity_id) != 1) { + return SIP_PUBLISH_UNKNOWN; + } + } + + *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + + if (!(*expires)) { + return SIP_PUBLISH_REMOVE; + } else if (!etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_INITIAL; + } else if (etag_hdr && !rdata->msg_info.msg->body) { + return SIP_PUBLISH_REFRESH; + } else if (etag_hdr && rdata->msg_info.msg->body) { + return SIP_PUBLISH_MODIFY; + } + + return SIP_PUBLISH_UNKNOWN; +} + +/*! \brief Internal destructor for publications */ +static void publication_destroy_fn(void *obj) +{ + struct ast_sip_publication *publication = obj; + + ast_debug(3, "Destroying SIP publication\n"); + + ao2_cleanup(publication->datastores); + ao2_cleanup(publication->endpoint); +} + +static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, + const char *resource, const char *event_configuration_name) +{ + struct ast_sip_publication *publication; + pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL); + size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1; + char *dst; + + ast_assert(endpoint != NULL); + + if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) { + return NULL; + } + + if (!(publication->datastores = ast_datastores_alloc())) { + ao2_ref(publication, -1); + return NULL; + } + + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + ao2_ref(endpoint, +1); + publication->endpoint = endpoint; + publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES; + publication->sched_id = -1; + dst = publication->data; + publication->resource = strcpy(dst, resource); + dst += resource_len; + publication->event_configuration_name = strcpy(dst, event_configuration_name); + + return publication; +} + +static int sip_publication_respond(struct ast_sip_publication *pub, int status_code, + pjsip_rx_data *rdata) +{ + pjsip_tx_data *tdata; + pjsip_transaction *tsx; + + if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) { + return -1; + } + + if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) { + char buf[30]; + + snprintf(buf, sizeof(buf), "%d", pub->entity_tag); + ast_sip_add_header(tdata, "SIP-ETag", buf); + + snprintf(buf, sizeof(buf), "%d", pub->expires); + ast_sip_add_header(tdata, "Expires", buf); + } + + if (pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx) != PJ_SUCCESS) { + pjsip_tx_data_dec_ref(tdata); + return -1; + } + + pjsip_tsx_recv_msg(tsx, rdata); + + if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) { + pjsip_tx_data_dec_ref(tdata); + return -1; + } + + return 0; +} + +static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, + struct ast_sip_publish_handler *handler) +{ + struct ast_sip_publication *publication; + char *resource_name; + size_t resource_size; + RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup); + struct ast_variable *event_configuration_name = NULL; + pjsip_uri *request_uri; + pjsip_sip_uri *request_uri_sip; + int resp; + + request_uri = rdata->msg_info.msg->line.req.uri; + + if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) { + char uri_str[PJSIP_MAX_URL_SIZE]; + + pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str)); + ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL); + return NULL; + } + + request_uri_sip = pjsip_uri_get_uri(request_uri); + resource_size = pj_strlen(&request_uri_sip->user) + 1; + resource_name = ast_alloca(resource_size); + ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size); + + /* + * We may want to match without any user options getting + * in the way. + */ + AST_SIP_USER_OPTIONS_TRUNCATE_CHECK(resource_name); + + resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name); + if (!resource) { + ast_debug(1, "No 'inbound-publication' defined for resource '%s'\n", resource_name); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL); + return NULL; + } + + if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) { + ast_debug(1, "Resource %s has a defined endpoint '%s', but does not match endpoint '%s' that received the request\n", + resource_name, resource->endpoint, ast_sorcery_object_get_id(endpoint)); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL); + return NULL; + } + + for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) { + if (!strcmp(event_configuration_name->name, handler->event_name)) { + break; + } + } + + if (!event_configuration_name) { + ast_debug(1, "Event '%s' is not configured for '%s'\n", handler->event_name, resource_name); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL); + return NULL; + } + + resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value); + + if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL); + return NULL; + } + + publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value); + + if (!publication) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL); + return NULL; + } + + publication->handler = handler; + if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_INITIALIZED)) { + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL); + ao2_cleanup(publication); + return NULL; + } + + sip_publication_respond(publication, resp, rdata); + + return publication; +} + +static int publish_expire_callback(void *data) +{ + RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup); + + if (publication->handler->publish_expire) { + publication->handler->publish_expire(publication); + } + + return 0; +} + +static int publish_expire(const void *data) +{ + struct ast_sip_publication *publication = (struct ast_sip_publication*)data; + + ao2_unlink(publication->handler->publications, publication); + publication->sched_id = -1; + + if (ast_sip_push_task(NULL, publish_expire_callback, publication)) { + ao2_cleanup(publication); + } + + return 0; +} + +static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata) +{ + pjsip_event_hdr *event_header; + struct ast_sip_publish_handler *handler; + RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup); + char event[32]; + static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 }; + pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL); + enum sip_publish_type publish_type; + RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup); + int expires = 0, entity_id, response = 0; + + endpoint = ast_pjsip_rdata_get_endpoint(rdata); + ast_assert(endpoint != NULL); + + event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next); + if (!event_header) { + ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n"); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + ast_copy_pj_str(event, &event_header->event_type, sizeof(event)); + + handler = find_pub_handler(event); + if (!handler) { + ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event); + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL); + return PJ_TRUE; + } + + publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id); + + /* If this is not an initial publish ensure that a publication is present */ + if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) { + if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) { + static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 }; + + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed, + NULL, NULL); + return PJ_TRUE; + } + + /* Per the RFC every response has to have a new entity tag */ + publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1); + + /* Update the expires here so that the created responses will contain the correct value */ + publication->expires = expires; + } + + switch (publish_type) { + case SIP_PUBLISH_INITIAL: + publication = publish_request_initial(endpoint, rdata, handler); + break; + case SIP_PUBLISH_REFRESH: + case SIP_PUBLISH_MODIFY: + if (handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_ACTIVE)) { + /* If an error occurs we want to terminate the publication */ + expires = 0; + } + response = 200; + break; + case SIP_PUBLISH_REMOVE: + handler->publication_state_change(publication, rdata->msg_info.msg->body, + AST_SIP_PUBLISH_STATE_TERMINATED); + response = 200; + break; + case SIP_PUBLISH_UNKNOWN: + default: + pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL); + break; + } + + if (publication) { + if (expires) { + ao2_link(handler->publications, publication); + + AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication, + ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1)); + } else { + AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1)); + } + } + + if (response) { + sip_publication_respond(publication, response, rdata); + } + + return PJ_TRUE; +} + +struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub) +{ + return pub->endpoint; +} + +const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub) +{ + return pub->resource; +} + +const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub) +{ + return pub->event_configuration_name; +} + +int ast_sip_pubsub_is_body_generator_registered(const char *type, const char *subtype) +{ + return !!find_body_generator_type_subtype(type, subtype); +} + +int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator) +{ + struct ast_sip_pubsub_body_generator *existing; + pj_str_t accept; + pj_size_t accept_len; + + AST_RWLIST_WRLOCK(&body_generators); + existing = find_body_generator_type_subtype_nolock(generator->type, generator->subtype); + if (existing) { + AST_RWLIST_UNLOCK(&body_generators); + ast_log(LOG_WARNING, "A body generator for %s/%s is already registered.\n", + generator->type, generator->subtype); + return -1; + } + AST_LIST_INSERT_HEAD(&body_generators, generator, list); + AST_RWLIST_UNLOCK(&body_generators); + + /* Lengths of type and subtype plus a slash. */ + accept_len = strlen(generator->type) + strlen(generator->subtype) + 1; + + /* Add room for null terminator that sprintf() will set. */ + pj_strset(&accept, ast_alloca(accept_len + 1), accept_len); + sprintf((char *) pj_strbuf(&accept), "%s/%s", generator->type, generator->subtype);/* Safe */ + + pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module, + PJSIP_H_ACCEPT, NULL, 1, &accept); + + return 0; +} + +void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator) +{ + struct ast_sip_pubsub_body_generator *iter; + + AST_RWLIST_WRLOCK(&body_generators); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) { + if (iter == generator) { + AST_LIST_REMOVE_CURRENT(list); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; + AST_RWLIST_UNLOCK(&body_generators); +} + +int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement) +{ + AST_RWLIST_WRLOCK(&body_supplements); + AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list); + AST_RWLIST_UNLOCK(&body_supplements); + + return 0; +} + +void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement) +{ + struct ast_sip_pubsub_body_supplement *iter; + + AST_RWLIST_WRLOCK(&body_supplements); + AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) { + if (iter == supplement) { + AST_LIST_REMOVE_CURRENT(list); + break; + } + } + AST_RWLIST_TRAVERSE_SAFE_END; + AST_RWLIST_UNLOCK(&body_supplements); +} + +const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub) +{ + return sub->body_generator->type; +} + +const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub) +{ + return sub->body_generator->subtype; +} + +int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype, + struct ast_sip_body_data *data, struct ast_str **str) +{ + struct ast_sip_pubsub_body_supplement *supplement; + struct ast_sip_pubsub_body_generator *generator; + int res = 0; + void *body; + + generator = find_body_generator_type_subtype(type, subtype); + if (!generator) { + ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n", + type, subtype); + return -1; + } + + if (strcmp(data->body_type, generator->body_type)) { + ast_log(LOG_WARNING, "%s/%s body generator does not accept the type of data provided\n", + type, subtype); + return -1; + } + + body = generator->allocate_body(data->body_data); + if (!body) { + ast_log(LOG_WARNING, "%s/%s body generator could not to allocate a body\n", + type, subtype); + return -1; + } + + if (generator->generate_body_content(body, data->body_data)) { + res = -1; + goto end; + } + + AST_RWLIST_RDLOCK(&body_supplements); + AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) { + if (!strcmp(generator->type, supplement->type) && + !strcmp(generator->subtype, supplement->subtype)) { + res = supplement->supplement_body(body, data->body_data); + if (res) { + break; + } + } + } + AST_RWLIST_UNLOCK(&body_supplements); + + if (!res) { + generator->to_string(body, str); + } + +end: + if (generator->destroy_body) { + generator->destroy_body(body); + } + + return res; +} + +static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata) +{ + if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) { + return pubsub_on_rx_subscribe_request(rdata); + } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) { + return pubsub_on_rx_publish_request(rdata); + } + + return PJ_FALSE; +} + +static void set_state_terminated(struct ast_sip_subscription *sub) +{ + int i; + + sub->subscription_state = PJSIP_EVSUB_STATE_TERMINATED; + for (i = 0; i < AST_VECTOR_SIZE(&sub->children); ++i) { + set_state_terminated(AST_VECTOR_GET(&sub->children, i)); + } +} + +/*! + * \brief Callback sequence for subscription terminate: + * + * * Client initiated: + * pjproject receives SUBSCRIBE on the subscription's serializer thread + * calls pubsub_on_rx_refresh with dialog locked + * pubsub_on_rx_refresh sets TERMINATE_PENDING + * pushes serialized_pubsub_on_refresh_timeout + * returns to pjproject + * pjproject calls pubsub_on_evsub_state + * pubsub_evsub_set_state checks state == TERMINATE_IN_PROGRESS (no) + * ignore and return + * pjproject unlocks dialog + * serialized_pubsub_on_refresh_timeout starts (1) + * locks dialog + * checks state == TERMINATE_PENDING + * sets TERMINATE_IN_PROGRESS + * calls send_notify (2) + * send_notify ultimately calls pjsip_evsub_send_request + * pjsip_evsub_send_request calls evsub's set_state + * set_state calls pubsub_evsub_set_state + * pubsub_on_evsub_state checks state == TERMINATE_IN_PROGRESS + * removes the subscriptions + * cleans up references to evsub + * sets state = TERMINATED + * serialized_pubsub_on_refresh_timeout unlocks dialog + * + * * Subscription timer expires: + * pjproject timer expires + * locks dialog + * calls pubsub_on_server_timeout + * pubsub_on_server_timeout checks state == NORMAL + * sets TERMINATE_PENDING + * pushes serialized_pubsub_on_refresh_timeout + * returns to pjproject + * pjproject unlocks dialog + * serialized_pubsub_on_refresh_timeout starts + * See (1) Above + * + * * Transmission failure sending NOTIFY or error response from client + * pjproject transaction timer expires or non OK response + * pjproject locks dialog + * calls pubsub_on_evsub_state with event TSX_STATE + * pubsub_on_evsub_state checks event == TSX_STATE + * removes the subscriptions + * cleans up references to evsub + * sets state = TERMINATED + * pjproject unlocks dialog + * + * * ast_sip_subscription_notify is called + * checks state == NORMAL + * if not batched... + * sets TERMINATE_IN_PROGRESS (if terminate is requested) + * calls send_notify + * See (2) Above + * if batched... + * sets TERMINATE_PENDING + * schedules task + * scheduler runs sched_task + * sched_task pushes serialized_send_notify + * serialized_send_notify starts + * checks state <= TERMINATE_PENDING + * if state == TERMINATE_PENDING set state = TERMINATE_IN_PROGRESS + * call send_notify + * See (2) Above + * + */ + +/*! + * \brief PJSIP callback when underlying SIP subscription changes state + * + * Although this function is called for every state change, we only care + * about the TERMINATED state, and only when we're actually processing the final + * notify (SIP_SUB_TREE_TERMINATE_IN_PROGRESS) OR when a transmission failure + * occurs (PJSIP_EVENT_TSX_STATE). In this case, we do all the subscription tree + * cleanup tasks and decrement the evsub reference. + */ +static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) +{ + struct sip_subscription_tree *sub_tree = + pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + + ast_debug(3, "evsub %p state %s event %s sub_tree %p sub_tree state %s\n", evsub, + pjsip_evsub_get_state_name(evsub), pjsip_event_str(event->type), sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + + if (!sub_tree || pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) { + return; + } + + /* It's easier to write this as what we WANT to process, then negate it. */ + if (!(sub_tree->state == SIP_SUB_TREE_TERMINATE_IN_PROGRESS + || (event->type == PJSIP_EVENT_TSX_STATE && sub_tree->state == SIP_SUB_TREE_NORMAL) + )) { + ast_debug(3, "Do nothing.\n"); + return; + } + + if (sub_tree->expiration_task) { + char task_name[256]; + + ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name)); + ast_debug(3, "Cancelling timer: %s\n", task_name); + ast_sip_sched_task_cancel(sub_tree->expiration_task); + ao2_cleanup(sub_tree->expiration_task); + sub_tree->expiration_task = NULL; + } + + remove_subscription(sub_tree); + + pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); + +#ifdef HAVE_PJSIP_EVSUB_GRP_LOCK + pjsip_evsub_dec_ref(sub_tree->evsub); +#endif + + sub_tree->evsub = NULL; + + ast_sip_dialog_set_serializer(sub_tree->dlg, NULL); + ast_sip_dialog_set_endpoint(sub_tree->dlg, NULL); + + subscription_persistence_remove(sub_tree); + shutdown_subscriptions(sub_tree->root); + + sub_tree->state = SIP_SUB_TREE_TERMINATED; + /* Remove evsub's reference to the sub_tree */ + ao2_ref(sub_tree, -1); +} + +static int pubsub_on_refresh_timeout(void *userdata) +{ + struct sip_subscription_tree *sub_tree = userdata; + pjsip_dialog *dlg = sub_tree->dlg; + + ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + + pjsip_dlg_inc_lock(dlg); + if (sub_tree->state >= SIP_SUB_TREE_TERMINATE_IN_PROGRESS) { + pjsip_dlg_dec_lock(dlg); + return 0; + } + + if (sub_tree->state == SIP_SUB_TREE_TERMINATE_PENDING) { + sub_tree->state = SIP_SUB_TREE_TERMINATE_IN_PROGRESS; + set_state_terminated(sub_tree->root); + } + + send_notify(sub_tree, 1); + + ast_test_suite_event_notify(sub_tree->root->subscription_state == PJSIP_EVSUB_STATE_TERMINATED ? + "SUBSCRIPTION_TERMINATED" : "SUBSCRIPTION_REFRESHED", + "Resource: %s", sub_tree->root->resource); + + pjsip_dlg_dec_lock(dlg); + + return 0; +} + +static int serialized_pubsub_on_refresh_timeout(void *userdata) +{ + struct sip_subscription_tree *sub_tree = userdata; + + ast_debug(3, "sub_tree %p sub_tree state %s\n", sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + + pubsub_on_refresh_timeout(userdata); + ao2_cleanup(sub_tree); + + return 0; +} + +/*! + * \brief Called whenever an in-dialog SUBSCRIBE is received + * + * This includes both SUBSCRIBE requests that actually refresh the subscription + * as well as SUBSCRIBE requests that end the subscription. + * + * In either case we push serialized_pubsub_on_refresh_timeout to send an + * appropriate NOTIFY request. + */ +static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, + int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) +{ + struct sip_subscription_tree *sub_tree; + + sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + ast_debug(3, "evsub %p sub_tree %p sub_tree state %s\n", evsub, sub_tree, + (sub_tree ? sub_tree_state_description[sub_tree->state] : "UNKNOWN")); + + if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) { + return; + } + + if (sub_tree->expiration_task) { + char task_name[256]; + + ast_sip_sched_task_get_name(sub_tree->expiration_task, task_name, sizeof(task_name)); + ast_debug(3, "Cancelling timer: %s\n", task_name); + ast_sip_sched_task_cancel(sub_tree->expiration_task); + ao2_cleanup(sub_tree->expiration_task); + sub_tree->expiration_task = NULL; + } + + /* PJSIP will set the evsub's state to terminated before calling into this function + * if the Expires value of the incoming SUBSCRIBE is 0. + */ + + if (pjsip_evsub_get_state(sub_tree->evsub) == PJSIP_EVSUB_STATE_TERMINATED) { + sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING; + } + + subscription_persistence_update(sub_tree, rdata, SUBSCRIPTION_PERSISTENCE_REFRESHED); + + if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) { + /* If we can't push the NOTIFY refreshing task...we'll just go with it. */ + ast_log(LOG_ERROR, "Failed to push task to send NOTIFY.\n"); + sub_tree->state = SIP_SUB_TREE_NORMAL; + ao2_ref(sub_tree, -1); + } + + if (sub_tree->is_list) { + pj_list_insert_before(res_hdr, create_require_eventlist(rdata->tp_info.pool)); + } +} + +static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, + pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) +{ + struct ast_sip_subscription *sub; + + if (!(sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) { + return; + } + + sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body, + pjsip_evsub_get_state(evsub)); +} + +static int serialized_pubsub_on_client_refresh(void *userdata) +{ + struct sip_subscription_tree *sub_tree = userdata; + pjsip_tx_data *tdata; + + if (!sub_tree->evsub) { + ao2_cleanup(sub_tree); + return 0; + } + + if (pjsip_evsub_initiate(sub_tree->evsub, NULL, -1, &tdata) == PJ_SUCCESS) { + pjsip_evsub_send_request(sub_tree->evsub, tdata); + } else { + pjsip_evsub_terminate(sub_tree->evsub, PJ_TRUE); + } + + ao2_cleanup(sub_tree); + return 0; +} + +static void pubsub_on_client_refresh(pjsip_evsub *evsub) +{ + struct sip_subscription_tree *sub_tree; + + if (!(sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id))) { + return; + } + + if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_client_refresh, ao2_bump(sub_tree))) { + ao2_cleanup(sub_tree); + } +} + +static void pubsub_on_server_timeout(pjsip_evsub *evsub) +{ + struct sip_subscription_tree *sub_tree; + + /* PJSIP does not terminate the server timeout timer when a SUBSCRIBE + * with Expires: 0 arrives to end a subscription, nor does it terminate + * this timer when we send a NOTIFY request in response to receiving such + * a SUBSCRIBE. PJSIP does not stop the server timeout timer until the + * NOTIFY transaction has finished (either through receiving a response + * or through a transaction timeout). + * + * Therefore, it is possible that we can be told that a server timeout + * occurred after we already thought that the subscription had been + * terminated. In such a case, we will have already removed the sub_tree + * from the evsub's mod_data array. + */ + + sub_tree = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + if (!sub_tree || sub_tree->state != SIP_SUB_TREE_NORMAL) { + return; + } + + sub_tree->state = SIP_SUB_TREE_TERMINATE_PENDING; + if (ast_sip_push_task(sub_tree->serializer, serialized_pubsub_on_refresh_timeout, ao2_bump(sub_tree))) { + sub_tree->state = SIP_SUB_TREE_NORMAL; + ao2_cleanup(sub_tree); + } +} + +static int ami_subscription_detail(struct sip_subscription_tree *sub_tree, + struct ast_sip_ami *ami, + const char *event) +{ + struct ast_str *buf; + + buf = ast_sip_create_ami_event(event, ami); + if (!buf) { + return -1; + } + + sip_subscription_to_ami(sub_tree, &buf); + astman_append(ami->s, "%s\r\n", ast_str_buffer(buf)); + ast_free(buf); + + ++ami->count; + return 0; +} + +static int ami_subscription_detail_inbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_NOTIFIER ? ami_subscription_detail( + sub_tree, arg, "InboundSubscriptionDetail") : 0; +} + +static int ami_subscription_detail_outbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail( + sub_tree, arg, "OutboundSubscriptionDetail") : 0; +} + +static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m) +{ + struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), }; + + astman_send_listack(s, m, "Following are Events for each inbound Subscription", + "start"); + + for_each_subscription(ami_subscription_detail_inbound, &ami); + + astman_send_list_complete_start(s, m, "InboundSubscriptionDetailComplete", ami.count); + astman_send_list_complete_end(s); + return 0; +} + +static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m) +{ + struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), }; + + astman_send_listack(s, m, "Following are Events for each outbound Subscription", + "start"); + + for_each_subscription(ami_subscription_detail_outbound, &ami); + + astman_send_list_complete_start(s, m, "OutboundSubscriptionDetailComplete", ami.count); + astman_send_list_complete_end(s); + return 0; +} + +static int format_ami_resource_lists(void *obj, void *arg, int flags) +{ + struct resource_list *list = obj; + struct ast_sip_ami *ami = arg; + struct ast_str *buf; + + buf = ast_sip_create_ami_event("ResourceListDetail", ami); + if (!buf) { + return CMP_STOP; + } + + if (ast_sip_sorcery_object_to_ami(list, &buf)) { + ast_free(buf); + return CMP_STOP; + } + astman_append(ami->s, "%s\r\n", ast_str_buffer(buf)); + ast_free(buf); + + ++ami->count; + return 0; +} + +static int ami_show_resource_lists(struct mansession *s, const struct message *m) +{ + struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), }; + struct ao2_container *lists; + + lists = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "resource_list", + AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL); + + if (!lists || !ao2_container_count(lists)) { + astman_send_error(s, m, "No resource lists found\n"); + return 0; + } + + astman_send_listack(s, m, "A listing of resource lists follows, presented as ResourceListDetail events", + "start"); + + ao2_callback(lists, OBJ_NODATA, format_ami_resource_lists, &ami); + + astman_send_list_complete_start(s, m, "ResourceListDetailComplete", ami.count); + astman_send_list_complete_end(s); + return 0; +} + +#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound" +#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound" + +#define MAX_REGEX_ERROR_LEN 128 + +struct cli_sub_parms { + /*! CLI handler entry e parameter */ + struct ast_cli_entry *e; + /*! CLI handler entry a parameter */ + struct ast_cli_args *a; + /*! CLI subscription entry output line(s) */ + struct ast_str *buf; + /*! Compiled regular expression to select if buf is written to CLI when not NULL. */ + regex_t *like; + int count; +}; + +struct cli_sub_complete_parms { + struct ast_cli_args *a; + /*! Found callid for search position */ + char *callid; + int wordlen; + int which; +}; + +static int cli_complete_subscription_common(struct sip_subscription_tree *sub_tree, struct cli_sub_complete_parms *cli) +{ + pj_str_t *callid; + + if (!sub_tree->dlg) { + return 0; + } + + callid = &sub_tree->dlg->call_id->id; + if (cli->wordlen <= pj_strlen(callid) + && !strncasecmp(cli->a->word, pj_strbuf(callid), cli->wordlen) + && (++cli->which > cli->a->n)) { + cli->callid = ast_malloc(pj_strlen(callid) + 1); + if (cli->callid) { + ast_copy_pj_str(cli->callid, callid, pj_strlen(callid) + 1); + } + return -1; + } + return 0; +} + +static int cli_complete_subscription_inbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_NOTIFIER + ? cli_complete_subscription_common(sub_tree, arg) : 0; +} + +static int cli_complete_subscription_outbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_SUBSCRIBER + ? cli_complete_subscription_common(sub_tree, arg) : 0; +} + +static char *cli_complete_subscription_callid(struct ast_cli_args *a) +{ + struct cli_sub_complete_parms cli; + on_subscription_t on_subscription; + + if (a->pos != 4) { + return NULL; + } + + if (!strcasecmp(a->argv[3], "inbound")) { + on_subscription = cli_complete_subscription_inbound; + } else if (!strcasecmp(a->argv[3], "outbound")) { + on_subscription = cli_complete_subscription_outbound; + } else { + /* Should never get here */ + ast_assert(0); + return NULL; + } + + cli.a = a; + cli.callid = NULL; + cli.wordlen = strlen(a->word); + cli.which = 0; + for_each_subscription(on_subscription, &cli); + + return cli.callid; +} + +static int cli_subscription_expiry(struct sip_subscription_tree *sub_tree) +{ + int expiry; + + expiry = sub_tree->persistence + ? ast_tvdiff_ms(sub_tree->persistence->expires, ast_tvnow()) / 1000 + : 0; + if (expiry < 0) { + /* Subscription expired */ + expiry = 0; + } + return expiry; +} + +static int cli_show_subscription_common(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli) +{ + const char *callid = (const char *) cli->buf;/* Member repurposed to pass in callid */ + pj_str_t *sub_callid; + struct ast_str *buf; + char *src; + char *dest; + char *key; + char *value; + char *value_end; + int key_len; + int key_filler_width; + int value_len; + + if (!sub_tree->dlg) { + return 0; + } + sub_callid = &sub_tree->dlg->call_id->id; + if (pj_strcmp2(sub_callid, callid)) { + return 0; + } + + buf = ast_str_create(512); + if (!buf) { + return -1; + } + + ast_cli(cli->a->fd, + "%-20s: %s\n" + "===========================================================================\n", + "ParameterName", "ParameterValue"); + + ast_str_append(&buf, 0, "Resource: %s\n", sub_tree->root->resource); + ast_str_append(&buf, 0, "Event: %s\n", sub_tree->root->handler->event_name); + ast_str_append(&buf, 0, "Expiry: %d\n", cli_subscription_expiry(sub_tree)); + + sip_subscription_to_ami(sub_tree, &buf); + + /* Convert AMI \r\n to \n line terminators. */ + src = strchr(ast_str_buffer(buf), '\r'); + if (src) { + dest = src; + ++src; + while (*src) { + if (*src == '\r') { + ++src; + continue; + } + *dest++ = *src++; + } + *dest = '\0'; + ast_str_update(buf); + } + + /* Reformat AMI key value pairs to pretty columns */ + key = ast_str_buffer(buf); + do { + value = strchr(key, ':'); + if (!value) { + break; + } + value_end = strchr(value, '\n'); + if (!value_end) { + break; + } + + /* Calculate field lengths */ + key_len = value - key; + key_filler_width = 20 - key_len; + if (key_filler_width < 0) { + key_filler_width = 0; + } + value_len = value_end - value; + + ast_cli(cli->a->fd, "%.*s%*s%.*s\n", + key_len, key, key_filler_width, "", + value_len, value); + + key = value_end + 1; + } while (*key); + ast_cli(cli->a->fd, "\n"); + + ast_free(buf); + + return -1; +} + +static int cli_show_subscription_inbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_NOTIFIER + ? cli_show_subscription_common(sub_tree, arg) : 0; +} + +static int cli_show_subscription_outbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_SUBSCRIBER + ? cli_show_subscription_common(sub_tree, arg) : 0; +} + +static char *cli_show_subscription_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + on_subscription_t on_subscription; + struct cli_sub_parms cli; + + switch (cmd) { + case CLI_INIT: + e->command = "pjsip show subscription {inbound|outbound}"; + e->usage = "Usage:\n" + " pjsip show subscription inbound \n" + " pjsip show subscription outbound \n" + " Show active subscription with the dialog call-id\n"; + return NULL; + case CLI_GENERATE: + return cli_complete_subscription_callid(a); + } + + if (a->argc != 5) { + return CLI_SHOWUSAGE; + } + + if (!strcasecmp(a->argv[3], "inbound")) { + on_subscription = cli_show_subscription_inbound; + } else if (!strcasecmp(a->argv[3], "outbound")) { + on_subscription = cli_show_subscription_outbound; + } else { + /* Should never get here */ + ast_assert(0); + return NULL; + } + + /* Find the subscription with the specified call-id */ + cli.a = a; + cli.e = e; + cli.buf = (void *) a->argv[4];/* Repurpose the buf member to pass in callid */ + for_each_subscription(on_subscription, &cli); + + return CLI_SUCCESS; +} + +#define CLI_SHOW_SUB_FORMAT_HEADER \ + "Endpoint: \n" \ + "Resource: \n" \ + " Expiry: \n" \ + "===========================================================================\n\n" +#define CLI_SHOW_SUB_FORMAT_ENTRY \ + "Endpoint: %s/%s\n" \ + "Resource: %s/%s\n" \ + " Expiry: %8d %s\n\n" + +static int cli_show_subscriptions_detail(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli) +{ + char caller_id[256]; + char callid[256]; + + ast_callerid_merge(caller_id, sizeof(caller_id), + S_COR(sub_tree->endpoint->id.self.name.valid, + sub_tree->endpoint->id.self.name.str, NULL), + S_COR(sub_tree->endpoint->id.self.number.valid, + sub_tree->endpoint->id.self.number.str, NULL), + ""); + + /* Call-id */ + if (sub_tree->dlg) { + ast_copy_pj_str(callid, &sub_tree->dlg->call_id->id, sizeof(callid)); + } else { + ast_copy_string(callid, "", sizeof(callid)); + } + + ast_str_set(&cli->buf, 0, CLI_SHOW_SUB_FORMAT_ENTRY, + ast_sorcery_object_get_id(sub_tree->endpoint), caller_id, + sub_tree->root->resource, sub_tree->root->handler->event_name, + cli_subscription_expiry(sub_tree), callid); + + if (cli->like) { + if (regexec(cli->like, ast_str_buffer(cli->buf), 0, NULL, 0)) { + /* Output line did not match the regex */ + return 0; + } + } + + ast_cli(cli->a->fd, "%s", ast_str_buffer(cli->buf)); + ++cli->count; + + return 0; +} + +static int cli_show_subscriptions_inbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_NOTIFIER + ? cli_show_subscriptions_detail(sub_tree, arg) : 0; +} + +static int cli_show_subscriptions_outbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_SUBSCRIBER + ? cli_show_subscriptions_detail(sub_tree, arg) : 0; +} + +static char *cli_show_subscriptions_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + on_subscription_t on_subscription; + struct cli_sub_parms cli; + regex_t like; + const char *regex; + + switch (cmd) { + case CLI_INIT: + e->command = "pjsip show subscriptions {inbound|outbound} [like]"; + e->usage = "Usage:\n" + " pjsip show subscriptions inbound [like ]\n" + " Show active inbound subscriptions\n" + " pjsip show subscriptions outbound [like ]\n" + " Show active outbound subscriptions\n" + "\n" + " The regex selects a subscriptions output that matches.\n" + " i.e., All output lines for a subscription are checked\n" + " as a block by the regex.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != 4 && a->argc != 6) { + return CLI_SHOWUSAGE; + } + if (!strcasecmp(a->argv[3], "inbound")) { + on_subscription = cli_show_subscriptions_inbound; + } else if (!strcasecmp(a->argv[3], "outbound")) { + on_subscription = cli_show_subscriptions_outbound; + } else { + /* Should never get here */ + ast_assert(0); + return CLI_SHOWUSAGE; + } + if (a->argc == 6) { + int rc; + + if (strcasecmp(a->argv[4], "like")) { + return CLI_SHOWUSAGE; + } + + /* Setup regular expression */ + memset(&like, 0, sizeof(like)); + cli.like = &like; + regex = a->argv[5]; + rc = regcomp(cli.like, regex, REG_EXTENDED | REG_NOSUB); + if (rc) { + char *regerr = ast_alloca(MAX_REGEX_ERROR_LEN); + + regerror(rc, cli.like, regerr, MAX_REGEX_ERROR_LEN); + ast_cli(a->fd, "Regular expression '%s' failed to compile: %s\n", + regex, regerr); + return CLI_FAILURE; + } + } else { + cli.like = NULL; + regex = NULL; + } + + cli.a = a; + cli.e = e; + cli.count = 0; + cli.buf = ast_str_create(256); + if (!cli.buf) { + if (cli.like) { + regfree(cli.like); + } + return CLI_FAILURE; + } + + ast_cli(a->fd, CLI_SHOW_SUB_FORMAT_HEADER); + for_each_subscription(on_subscription, &cli); + ast_cli(a->fd, "%d active subscriptions%s%s%s\n", + cli.count, + regex ? " matched \"" : "", + regex ?: "", + regex ? "\"" : ""); + + ast_free(cli.buf); + if (cli.like) { + regfree(cli.like); + } + + return CLI_SUCCESS; +} + +#define CLI_LIST_SUB_FORMAT_HEADER "%-30.30s %-30.30s %6.6s %s\n" +#define CLI_LIST_SUB_FORMAT_ENTRY "%-30.30s %-30.30s %6d %s\n" + +static int cli_list_subscriptions_detail(struct sip_subscription_tree *sub_tree, struct cli_sub_parms *cli) +{ + char ep_cid_buf[50]; + char res_evt_buf[50]; + char callid[256]; + + /* Endpoint/CID column */ + snprintf(ep_cid_buf, sizeof(ep_cid_buf), "%s/%s", + ast_sorcery_object_get_id(sub_tree->endpoint), + S_COR(sub_tree->endpoint->id.self.name.valid, sub_tree->endpoint->id.self.name.str, + S_COR(sub_tree->endpoint->id.self.number.valid, + sub_tree->endpoint->id.self.number.str, ""))); + + /* Resource/Event column */ + snprintf(res_evt_buf, sizeof(res_evt_buf), "%s/%s", + sub_tree->root->resource, + sub_tree->root->handler->event_name); + + /* Call-id column */ + if (sub_tree->dlg) { + ast_copy_pj_str(callid, &sub_tree->dlg->call_id->id, sizeof(callid)); + } else { + ast_copy_string(callid, "", sizeof(callid)); + } + + ast_str_set(&cli->buf, 0, CLI_LIST_SUB_FORMAT_ENTRY, + ep_cid_buf, + res_evt_buf, + cli_subscription_expiry(sub_tree), + callid); + + if (cli->like) { + if (regexec(cli->like, ast_str_buffer(cli->buf), 0, NULL, 0)) { + /* Output line did not match the regex */ + return 0; + } + } + + ast_cli(cli->a->fd, "%s", ast_str_buffer(cli->buf)); + ++cli->count; + + return 0; +} + +static int cli_list_subscriptions_inbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_NOTIFIER + ? cli_list_subscriptions_detail(sub_tree, arg) : 0; +} + +static int cli_list_subscriptions_outbound(struct sip_subscription_tree *sub_tree, void *arg) +{ + return sub_tree->role == AST_SIP_SUBSCRIBER + ? cli_list_subscriptions_detail(sub_tree, arg) : 0; +} + +static char *cli_list_subscriptions_inout(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a) +{ + on_subscription_t on_subscription; + struct cli_sub_parms cli; + regex_t like; + const char *regex; + + switch (cmd) { + case CLI_INIT: + e->command = "pjsip list subscriptions {inbound|outbound} [like]"; + e->usage = "Usage:\n" + " pjsip list subscriptions inbound [like ]\n" + " List active inbound subscriptions\n" + " pjsip list subscriptions outbound [like ]\n" + " List active outbound subscriptions\n" + "\n" + " The regex selects output lines that match.\n"; + return NULL; + case CLI_GENERATE: + return NULL; + } + + if (a->argc != 4 && a->argc != 6) { + return CLI_SHOWUSAGE; + } + if (!strcasecmp(a->argv[3], "inbound")) { + on_subscription = cli_list_subscriptions_inbound; + } else if (!strcasecmp(a->argv[3], "outbound")) { + on_subscription = cli_list_subscriptions_outbound; + } else { + /* Should never get here */ + ast_assert(0); + return CLI_SHOWUSAGE; + } + if (a->argc == 6) { + int rc; + + if (strcasecmp(a->argv[4], "like")) { + return CLI_SHOWUSAGE; + } + + /* Setup regular expression */ + memset(&like, 0, sizeof(like)); + cli.like = &like; + regex = a->argv[5]; + rc = regcomp(cli.like, regex, REG_EXTENDED | REG_NOSUB); + if (rc) { + char *regerr = ast_alloca(MAX_REGEX_ERROR_LEN); + + regerror(rc, cli.like, regerr, MAX_REGEX_ERROR_LEN); + ast_cli(a->fd, "Regular expression '%s' failed to compile: %s\n", + regex, regerr); + return CLI_FAILURE; + } + } else { + cli.like = NULL; + regex = NULL; + } + + cli.a = a; + cli.e = e; + cli.count = 0; + cli.buf = ast_str_create(256); + if (!cli.buf) { + if (cli.like) { + regfree(cli.like); + } + return CLI_FAILURE; + } + + ast_cli(a->fd, CLI_LIST_SUB_FORMAT_HEADER, + "Endpoint/CLI", "Resource/Event", "Expiry", "Call-id"); + for_each_subscription(on_subscription, &cli); + ast_cli(a->fd, "\n%d active subscriptions%s%s%s\n", + cli.count, + regex ? " matched \"" : "", + regex ?: "", + regex ? "\"" : ""); + + ast_free(cli.buf); + if (cli.like) { + regfree(cli.like); + } + + return CLI_SUCCESS; +} + +static struct ast_cli_entry cli_commands[] = { + AST_CLI_DEFINE(cli_list_subscriptions_inout, "List active inbound/outbound subscriptions"), + AST_CLI_DEFINE(cli_show_subscription_inout, "Show active subscription details"), + AST_CLI_DEFINE(cli_show_subscriptions_inout, "Show active inbound/outbound subscriptions"), +}; + +static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->endpoint = ast_strdup(var->value); + return 0; +} + +static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->endpoint); + return 0; +} + +static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + + persistence->tag = ast_strdup(var->value); + return 0; +} + +static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + + *buf = ast_strdup(persistence->tag); + return 0; +} + +static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +{ + struct subscription_persistence *persistence = obj; + return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL); +} + +static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf) +{ + const struct subscription_persistence *persistence = obj; + return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0; +} + +#define RESOURCE_LIST_INIT_SIZE 4 + +static void resource_list_destructor(void *obj) +{ + struct resource_list *list = obj; + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) { + ast_free((char *) AST_VECTOR_GET(&list->items, i)); + } + + AST_VECTOR_FREE(&list->items); +} + +static void *resource_list_alloc(const char *name) +{ + struct resource_list *list; + + list = ast_sorcery_generic_alloc(sizeof(*list), resource_list_destructor); + if (!list) { + return NULL; + } + + if (AST_VECTOR_INIT(&list->items, RESOURCE_LIST_INIT_SIZE)) { + ao2_cleanup(list); + return NULL; + } + + return list; +} + +static int item_in_vector(const struct resource_list *list, const char *item) +{ + int i; + + for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) { + if (!strcmp(item, AST_VECTOR_GET(&list->items, i))) { + return 1; + } + } + + return 0; +} + +static int list_item_handler(const struct aco_option *opt, + struct ast_variable *var, void *obj) +{ + struct resource_list *list = obj; + char *items = ast_strdupa(var->value); + char *item; + + while ((item = ast_strip(strsep(&items, ",")))) { + if (ast_strlen_zero(item)) { + continue; + } + + if (item_in_vector(list, item)) { + ast_log(LOG_WARNING, "Ignoring duplicated list item '%s'\n", item); + continue; + } + if (AST_VECTOR_APPEND(&list->items, ast_strdup(item))) { + return -1; + } + } + + return 0; +} + +static int list_item_to_str(const void *obj, const intptr_t *args, char **buf) +{ + const struct resource_list *list = obj; + int i; + struct ast_str *str = ast_str_create(32); + + for (i = 0; i < AST_VECTOR_SIZE(&list->items); ++i) { + ast_str_append(&str, 0, "%s,", AST_VECTOR_GET(&list->items, i)); + } + + /* Chop off trailing comma */ + ast_str_truncate(str, -1); + *buf = ast_strdup(ast_str_buffer(str)); + ast_free(str); + return 0; +} + +static int resource_list_apply_handler(const struct ast_sorcery *sorcery, void *obj) +{ + struct resource_list *list = obj; + + if (ast_strlen_zero(list->event)) { + ast_log(LOG_WARNING, "Resource list '%s' has no event set\n", + ast_sorcery_object_get_id(list)); + return -1; + } + + if (AST_VECTOR_SIZE(&list->items) == 0) { + ast_log(LOG_WARNING, "Resource list '%s' has no list items\n", + ast_sorcery_object_get_id(list)); + return -1; + } + + return 0; +} + +static int apply_list_configuration(struct ast_sorcery *sorcery) +{ + ast_sorcery_apply_default(sorcery, "resource_list", "config", + "pjsip.conf,criteria=type=resource_list"); + if (ast_sorcery_object_register(sorcery, "resource_list", resource_list_alloc, + NULL, resource_list_apply_handler)) { + return -1; + } + + ast_sorcery_object_field_register(sorcery, "resource_list", "type", "", + OPT_NOOP_T, 0, 0); + ast_sorcery_object_field_register(sorcery, "resource_list", "event", "", + OPT_CHAR_ARRAY_T, 1, CHARFLDSET(struct resource_list, event)); + ast_sorcery_object_field_register(sorcery, "resource_list", "full_state", "no", + OPT_BOOL_T, 1, FLDSET(struct resource_list, full_state)); + ast_sorcery_object_field_register(sorcery, "resource_list", "notification_batch_interval", + "0", OPT_UINT_T, 0, FLDSET(struct resource_list, notification_batch_interval)); + ast_sorcery_object_field_register_custom(sorcery, "resource_list", "list_item", + "", list_item_handler, list_item_to_str, NULL, 0, 0); + + ast_sorcery_reload_object(sorcery, "resource_list"); + + return 0; +} + +#ifdef TEST_FRAMEWORK + +/*! + * \brief "bad" resources + * + * These are resources that the test handler will reject subscriptions to. + */ +const char *bad_resources[] = { + "coconut", + "cilantro", + "olive", + "cheese", +}; + +/*! + * \brief new_subscribe callback for unit tests + * + * Will give a 200 OK response to any resource except the "bad" ones. + */ +static int test_new_subscribe(struct ast_sip_endpoint *endpoint, const char *resource) +{ + int i; + + for (i = 0; i < ARRAY_LEN(bad_resources); ++i) { + if (!strcmp(resource, bad_resources[i])) { + return 400; + } + } + + return 200; +} + +/*! + * \brief Subscription notifier for unit tests. + * + * Since unit tests are only concerned with building a resource tree, + * only the new_subscribe callback needs to be defined. + */ +struct ast_sip_notifier test_notifier = { + .new_subscribe = test_new_subscribe, +}; + +/*! + * \brief Subscription handler for unit tests. + */ +struct ast_sip_subscription_handler test_handler = { + .event_name = "test", + .notifier = &test_notifier, +}; + +/*! + * \brief Set properties on an allocated resource list + * + * \param list The list to set details on. + * \param event The list's event. + * \param resources Array of resources to add to the list. + * \param num_resources Number of resources in the array. + * \retval 0 Success + * \retval non-zero Failure + */ +static int populate_list(struct resource_list *list, const char *event, const char **resources, size_t num_resources) +{ + int i; + + ast_copy_string(list->event, event, sizeof(list->event)); + + for (i = 0; i < num_resources; ++i) { + if (AST_VECTOR_APPEND(&list->items, ast_strdup(resources[i]))) { + return -1; + } + } + return 0; +} + +/*! + * \brief RAII callback to destroy a resource list + */ +static void cleanup_resource_list(struct resource_list *list) +{ + if (!list) { + return; + } + + ast_sorcery_delete(ast_sip_get_sorcery(), list); + ao2_cleanup(list); +} + +/*! + * \brief allocate a resource list, store it in sorcery, and set its details + * + * \param test The unit test. Used for logging status messages. + * \param list_name The name of the list to create. + * \param event The event the list services + * \param resources Array of resources to apply to the list + * \param num_resources The number of resources in the array + * \retval NULL Failed to allocate or populate list + * \retval non-NULL The created list + */ +static struct resource_list *create_resource_list(struct ast_test *test, + const char *list_name, const char *event, const char **resources, size_t num_resources) +{ + struct resource_list *list; + + list = ast_sorcery_alloc(ast_sip_get_sorcery(), "resource_list", list_name); + if (!list) { + ast_test_status_update(test, "Could not allocate resource list in sorcery\n"); + return NULL; + } + + if (ast_sorcery_create(ast_sip_get_sorcery(), list)) { + ast_test_status_update(test, "Could not store the resource list in sorcery\n"); + ao2_cleanup(list); + return NULL; + } - return 0; + if (populate_list(list, event, resources, num_resources)) { + ast_test_status_update(test, "Could not add resources to the resource list\n"); + cleanup_resource_list(list); + return NULL; + } + + return list; } -void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement) +/*! + * \brief Check the integrity of a tree node against a set of resources. + * + * The tree node's resources must be in the same order as the resources in + * the supplied resources array. Because of this constraint, tests can misrepresent + * the size of the resources array as being smaller than it really is if resources + * at the end of the array should not be present in the tree node. + * + * \param test The unit test. Used for printing status messages. + * \param node The constructed tree node whose integrity is under question. + * \param resources Array of expected resource values + * \param num_resources The number of resources to check in the array. + */ +static int check_node(struct ast_test *test, struct tree_node *node, + const char **resources, size_t num_resources) { - struct ast_sip_pubsub_body_supplement *iter; - SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK); + int i; - AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) { - if (iter == supplement) { - AST_LIST_REMOVE_CURRENT(list); - break; + if (AST_VECTOR_SIZE(&node->children) != num_resources) { + ast_test_status_update(test, "Unexpected number of resources in tree. Expected %zu, got %zu\n", + num_resources, AST_VECTOR_SIZE(&node->children)); + return -1; + } + + for (i = 0; i < num_resources; ++i) { + if (strcmp(resources[i], AST_VECTOR_GET(&node->children, i)->resource)) { + ast_test_status_update(test, "Mismatched resources. Expected '%s' but got '%s'\n", + resources[i], AST_VECTOR_GET(&node->children, i)->resource); + return -1; } } - AST_RWLIST_TRAVERSE_SAFE_END; + + return 0; } -const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub) +/*! + * \brief RAII_VAR callback to destroy an allocated resource tree + */ +static void test_resource_tree_destroy(struct resource_tree *tree) { - return sub->body_generator->type; + resource_tree_destroy(tree); + ast_free(tree); } -const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub) +static int ineligible_configuration(void) { - return sub->body_generator->subtype; + struct ast_config *config; + struct ast_flags flags = {0,}; + const char *value; + + config = ast_config_load("sorcery.conf", flags); + if (!config) { + return 1; + } + + value = ast_variable_retrieve(config, "res_pjsip_pubsub", "resource_list"); + if (ast_strlen_zero(value)) { + ast_config_destroy(config); + return 1; + } + + if (strcasecmp(value, "memory") && strcasecmp(value, "astdb")) { + ast_config_destroy(config); + return 1; + } + + return 0; } -int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype, - void *data, struct ast_str **str) +AST_TEST_DEFINE(resource_tree) { - struct ast_sip_pubsub_body_supplement *supplement; - struct ast_sip_pubsub_body_generator *generator; - int res = 0; - void *body; + RAII_VAR(struct resource_list *, list, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources[] = { + "huey", + "dewey", + "louie", + }; + int resp; - generator = find_body_generator_type_subtype(type, subtype); - if (!generator) { - ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n", - type, subtype); - return -1; + switch (cmd) { + case TEST_INIT: + info->name = "resource_tree"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Basic resource tree integrity check"; + info->description = + "Create a resource list and ensure that our attempt to build a tree works as expected."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; } - body = generator->allocate_body(data); - if (!body) { - ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n", - type, subtype); - return -1; + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; } - if (generator->generate_body_content(body, data)) { - res = -1; - goto end; + list = create_resource_list(test, "foo", "test", resources, ARRAY_LEN(resources)); + if (!list) { + return AST_TEST_FAIL; } - AST_RWLIST_RDLOCK(&body_supplements); - AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) { - if (!strcmp(generator->type, supplement->type) && - !strcmp(generator->subtype, supplement->subtype)) { - res = supplement->supplement_body(body, data); - if (res) { - break; - } - } + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; } - AST_RWLIST_UNLOCK(&body_supplements); - if (!res) { - generator->to_string(body, str); + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; } -end: - if (generator->destroy_body) { - generator->destroy_body(body); + if (check_node(test, tree->root, resources, ARRAY_LEN(resources))) { + return AST_TEST_FAIL; } - return res; + return AST_TEST_PASS; } -static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata) +AST_TEST_DEFINE(complex_resource_tree) { - if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) { - return pubsub_on_rx_subscribe_request(rdata); - } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) { - return pubsub_on_rx_publish_request(rdata); + RAII_VAR(struct resource_list *, list_1, NULL, cleanup_resource_list); + RAII_VAR(struct resource_list *, list_2, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources_1[] = { + "huey", + "dewey", + "louie", + "dwarves", + }; + const char *resources_2[] = { + "happy", + "grumpy", + "doc", + "bashful", + "dopey", + "sneezy", + "sleepy", + }; + int resp; + struct tree_node *node; + + switch (cmd) { + case TEST_INIT: + info->name = "complex_resource_tree"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Complex resource tree integrity check"; + info->description = + "Create a complex resource list and ensure that our attempt to build a tree works as expected."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; } - return PJ_FALSE; -} + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; + } -static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event) -{ - struct ast_sip_subscription *sub; - if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) { - return; + list_1 = create_resource_list(test, "foo", "test", resources_1, ARRAY_LEN(resources_1)); + if (!list_1) { + return AST_TEST_FAIL; } - sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - if (!sub) { - return; + list_2 = create_resource_list(test, "dwarves", "test", resources_2, ARRAY_LEN(resources_2)); + if (!list_2) { + return AST_TEST_FAIL; } - if (sub->handler->subscription_shutdown) { - sub->handler->subscription_shutdown(sub); + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; } - pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL); + + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; + } + + node = tree->root; + if (check_node(test, node, resources_1, ARRAY_LEN(resources_1))) { + return AST_TEST_FAIL; + } + + /* The embedded list is at index 3 in the root node's children */ + node = AST_VECTOR_GET(&node->children, 3); + if (check_node(test, node, resources_2, ARRAY_LEN(resources_2))) { + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; } -static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata, - int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) +AST_TEST_DEFINE(bad_resource) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); - enum ast_sip_subscription_notify_reason reason; + RAII_VAR(struct resource_list *, list, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources[] = { + "huey", + "dewey", + "louie", + "coconut", /* A "bad" resource */ + }; + int resp; - if (!sub) { - return; + switch (cmd) { + case TEST_INIT: + info->name = "bad_resource"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Ensure bad resources do not end up in the tree"; + info->description = + "Create a resource list with a single bad resource. Ensure the bad resource does not end up in the tree."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; } - if (pjsip_evsub_get_state(sip_subscription_get_evsub(sub)) == PJSIP_EVSUB_STATE_TERMINATED) { - reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED; - } else { - reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED; + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; } - if (sub->handler->notifier->notify_required(sub, reason)) { - *p_st_code = 500; + + list = create_resource_list(test, "foo", "test", resources, ARRAY_LEN(resources)); + if (!list) { + return AST_TEST_FAIL; } -} -static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code, - pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body) -{ - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; + } - if (!sub) { - return; + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; } - sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body, - pjsip_evsub_get_state(evsub)); + /* We check against all but the final resource since we expect it not to be in the tree */ + if (check_node(test, tree->root, resources, ARRAY_LEN(resources) - 1)) { + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; + } -static int serialized_pubsub_on_client_refresh(void *userdata) +AST_TEST_DEFINE(bad_branch) { - struct ast_sip_subscription *sub = userdata; - pjsip_evsub *evsub; - pjsip_tx_data *tdata; + RAII_VAR(struct resource_list *, list_1, NULL, cleanup_resource_list); + RAII_VAR(struct resource_list *, list_2, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources_1[] = { + "huey", + "dewey", + "louie", + "gross", + }; + /* This list has nothing but bad resources */ + const char *resources_2[] = { + "coconut", + "cilantro", + "olive", + "cheese", + }; + int resp; + + switch (cmd) { + case TEST_INIT: + info->name = "bad_branch"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Ensure bad branches are pruned from the tree"; + info->description = + "Create a resource list that makes a tree with an entire branch of bad resources.\n" + "Ensure the bad branch is pruned from the tree."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } - evsub = sip_subscription_get_evsub(sub); + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; + } - if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) { - pjsip_evsub_send_request(evsub, tdata); - } else { - pjsip_evsub_terminate(evsub, PJ_TRUE); - return 0; + list_1 = create_resource_list(test, "foo", "test", resources_1, ARRAY_LEN(resources_1)); + if (!list_1) { + return AST_TEST_FAIL; + } + list_2 = create_resource_list(test, "gross", "test", resources_2, ARRAY_LEN(resources_2)); + if (!list_2) { + return AST_TEST_FAIL; } - ao2_cleanup(sub); - return 0; -} -static void pubsub_on_client_refresh(pjsip_evsub *evsub) -{ - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; + } - ao2_ref(sub, +1); - ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub); -} + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; + } -static int serialized_pubsub_on_server_timeout(void *userdata) -{ - struct ast_sip_subscription *sub = userdata; + /* We check against all but the final resource of the list since the entire branch should + * be pruned from the tree + */ + if (check_node(test, tree->root, resources_1, ARRAY_LEN(resources_1) - 1)) { + return AST_TEST_FAIL; + } - sub->handler->notifier->notify_required(sub, - AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED); + return AST_TEST_PASS; - ao2_cleanup(sub); - return 0; } -static void pubsub_on_server_timeout(pjsip_evsub *evsub) +AST_TEST_DEFINE(duplicate_resource) { - struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id); + RAII_VAR(struct resource_list *, list_1, NULL, cleanup_resource_list); + RAII_VAR(struct resource_list *, list_2, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources_1[] = { + "huey", + "ducks", + "dewey", + "louie", + }; + const char *resources_2[] = { + "donald", + "daisy", + "scrooge", + "dewey", + "louie", + "huey", + }; + int resp; + struct tree_node *node; + + switch (cmd) { + case TEST_INIT: + info->name = "duplicate_resource"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Ensure duplicated resources do not end up in the tree"; + info->description = + "Create a resource list with a single duplicated resource. Ensure the duplicated resource does not end up in the tree."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } - if (!sub) { - /* if a subscription has been terminated and the subscription - timeout/expires is less than the time it takes for all pending - transactions to end then the subscription timer will not have - been canceled yet and sub will be null, so do nothing since - the subscription has already been terminated. */ - return; + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; } - ao2_ref(sub, +1); - ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub); -} + list_1 = create_resource_list(test, "foo", "test", resources_1, ARRAY_LEN(resources_1)); + if (!list_1) { + return AST_TEST_FAIL; + } -static int ami_subscription_detail(struct ast_sip_subscription *sub, - struct ast_sip_ami *ami, - const char *event) -{ - RAII_VAR(struct ast_str *, buf, - ast_sip_create_ami_event(event, ami), ast_free); + list_2 = create_resource_list(test, "ducks", "test", resources_2, ARRAY_LEN(resources_2)); + if (!list_2) { + return AST_TEST_FAIL; + } - if (!buf) { - return -1; + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; } - sip_subscription_to_ami(sub, &buf); - astman_append(ami->s, "%s\r\n", ast_str_buffer(buf)); - return 0; -} + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; + } -static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg) -{ - return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail( - sub, arg, "InboundSubscriptionDetail") : 0; -} + node = tree->root; + /* This node should have "huey" and "ducks". "dewey" and "louie" should not + * be present since they were found in the "ducks" list. + */ + if (check_node(test, node, resources_1, ARRAY_LEN(resources_1) - 2)) { + return AST_TEST_FAIL; + } -static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg) -{ - return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail( - sub, arg, "OutboundSubscriptionDetail") : 0; + /* This node should have "donald", "daisy", "scrooge", "dewey", and "louie". + * "huey" is not here since that was already encountered in the parent list + */ + node = AST_VECTOR_GET(&node->children, 1); + if (check_node(test, node, resources_2, ARRAY_LEN(resources_2) - 1)) { + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; } -static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m) +AST_TEST_DEFINE(loop) { - struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), }; - int num; + RAII_VAR(struct resource_list *, list_1, NULL, cleanup_resource_list); + RAII_VAR(struct resource_list *, list_2, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources_1[] = { + "derp", + }; + const char *resources_2[] = { + "herp", + }; + int resp; - astman_send_listack(s, m, "Following are Events for " - "each inbound Subscription", "start"); + switch (cmd) { + case TEST_INIT: + info->name = "loop"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Test that loops are properly detected."; + info->description = + "Create two resource lists that refer to each other. Ensure that attempting to build a tree\n" + "results in an empty tree."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } - num = for_each_subscription(ami_subscription_detail_inbound, &ami); + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; + } - astman_append(s, "Event: InboundSubscriptionDetailComplete\r\n"); - if (!ast_strlen_zero(ami.action_id)) { - astman_append(s, "ActionID: %s\r\n", ami.action_id); + list_1 = create_resource_list(test, "herp", "test", resources_1, ARRAY_LEN(resources_1)); + if (!list_1) { + return AST_TEST_FAIL; } - astman_append(s, "EventList: Complete\r\n" - "ListItems: %d\r\n\r\n", num); - return 0; + list_2 = create_resource_list(test, "derp", "test", resources_2, ARRAY_LEN(resources_2)); + if (!list_2) { + return AST_TEST_FAIL; + } + + tree = ast_calloc(1, sizeof(*tree)); + resp = build_resource_tree(NULL, &test_handler, "herp", tree, 1); + if (resp == 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; + } + + return AST_TEST_PASS; } -static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m) +AST_TEST_DEFINE(bad_event) { - struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), }; - int num; - - astman_send_listack(s, m, "Following are Events for " - "each outbound Subscription", "start"); + RAII_VAR(struct resource_list *, list, NULL, cleanup_resource_list); + RAII_VAR(struct resource_tree *, tree, NULL, test_resource_tree_destroy); + const char *resources[] = { + "huey", + "dewey", + "louie", + }; + int resp; - num = for_each_subscription(ami_subscription_detail_outbound, &ami); + switch (cmd) { + case TEST_INIT: + info->name = "bad_event"; + info->category = "/res/res_pjsip_pubsub/"; + info->summary = "Ensure that list with wrong event specified is not retrieved"; + info->description = + "Create a simple resource list for event 'tsetse'. Ensure that trying to retrieve the list for event 'test' fails."; + return AST_TEST_NOT_RUN; + case TEST_EXECUTE: + break; + } - astman_append(s, "Event: OutboundSubscriptionDetailComplete\r\n"); - if (!ast_strlen_zero(ami.action_id)) { - astman_append(s, "ActionID: %s\r\n", ami.action_id); + if (ineligible_configuration()) { + ast_test_status_update(test, "Ineligible configuration for this test. Please add a " + "'res_pjsip_pubsub' section to sorcery.conf, and set 'resource_list=memory'\n"); + return AST_TEST_NOT_RUN; } - astman_append(s, "EventList: Complete\r\n" - "ListItems: %d\r\n\r\n", num); - return 0; -} -#define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound" -#define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound" + list = create_resource_list(test, "foo", "tsetse", resources, ARRAY_LEN(resources)); + if (!list) { + return AST_TEST_FAIL; + } -static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) -{ - struct subscription_persistence *persistence = obj; + tree = ast_calloc(1, sizeof(*tree)); + /* Since the test_handler is for event "test", this should not build a list, but + * instead result in a single resource being created, called "foo" + */ + resp = build_resource_tree(NULL, &test_handler, "foo", tree, 1); + if (resp != 200) { + ast_test_status_update(test, "Unexpected response %d when building resource tree\n", resp); + return AST_TEST_FAIL; + } - persistence->endpoint = ast_strdup(var->value); - return 0; -} + if (!tree->root) { + ast_test_status_update(test, "Resource tree has no root\n"); + return AST_TEST_FAIL; + } -static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf) -{ - const struct subscription_persistence *persistence = obj; + if (strcmp(tree->root->resource, "foo")) { + ast_test_status_update(test, "Unexpected resource %s found in tree\n", tree->root->resource); + return AST_TEST_FAIL; + } - *buf = ast_strdup(persistence->endpoint); - return 0; + return AST_TEST_PASS; } -static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) +#endif + +static int resource_endpoint_handler(const struct aco_option *opt, struct ast_variable *var, void *obj) { - struct subscription_persistence *persistence = obj; + struct ast_sip_publication_resource *resource = obj; + + ast_free(resource->endpoint); + resource->endpoint = ast_strdup(var->value); - persistence->tag = ast_strdup(var->value); return 0; } -static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf) +static int resource_event_handler(const struct aco_option *opt, struct ast_variable *var, void *obj) { - const struct subscription_persistence *persistence = obj; + struct ast_sip_publication_resource *resource = obj; + /* The event configuration name starts with 'event_' so skip past it to get the real name */ + const char *event = var->name + 6; + struct ast_variable *item; - *buf = ast_strdup(persistence->tag); - return 0; -} + if (ast_strlen_zero(event) || ast_strlen_zero(var->value)) { + return -1; + } -static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj) -{ - struct subscription_persistence *persistence = obj; - return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL); -} + item = ast_variable_new(event, var->value, ""); + if (!item) { + return -1; + } -static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf) -{ - const struct subscription_persistence *persistence = obj; - return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0; + if (resource->events) { + item->next = resource->events; + } + resource->events = item; + + return 0; } static int load_module(void) { static const pj_str_t str_PUBLISH = { "PUBLISH", 7 }; - struct ast_sorcery *sorcery = ast_sip_get_sorcery(); + struct ast_sorcery *sorcery; + + CHECK_PJSIP_MODULE_LOADED(); + + sorcery = ast_sip_get_sorcery(); pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint()); if (!(sched = ast_sched_context_create())) { ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n"); - return AST_MODULE_LOAD_FAILURE; + return AST_MODULE_LOAD_DECLINE; } if (ast_sched_start_thread(sched)) { ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n"); ast_sched_context_destroy(sched); - return AST_MODULE_LOAD_FAILURE; + return AST_MODULE_LOAD_DECLINE; } pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH); @@ -2072,7 +5239,7 @@ static int load_module(void) if (ast_sip_register_service(&pubsub_module)) { ast_log(LOG_ERROR, "Could not register pubsub service\n"); ast_sched_context_destroy(sched); - return AST_MODULE_LOAD_FAILURE; + return AST_MODULE_LOAD_DECLINE; } ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub"); @@ -2082,7 +5249,7 @@ static int load_module(void) ast_log(LOG_ERROR, "Could not register subscription persistence object support\n"); ast_sip_unregister_service(&pubsub_module); ast_sched_context_destroy(sched); - return AST_MODULE_LOAD_FAILURE; + return AST_MODULE_LOAD_DECLINE; } ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0, CHARFLDSET(struct subscription_persistence, packet)); @@ -2104,26 +5271,72 @@ static int load_module(void) persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0); ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "", persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0); + ast_sorcery_object_field_register(sorcery, "subscription_persistence", "contact_uri", "", OPT_CHAR_ARRAY_T, 0, + CHARFLDSET(struct subscription_persistence, contact_uri)); + + if (apply_list_configuration(sorcery)) { + ast_sip_unregister_service(&pubsub_module); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_DECLINE; + } + + ast_sorcery_apply_default(sorcery, "inbound-publication", "config", "pjsip.conf,criteria=type=inbound-publication"); + if (ast_sorcery_object_register(sorcery, "inbound-publication", publication_resource_alloc, + NULL, NULL)) { + ast_log(LOG_ERROR, "Could not register subscription persistence object support\n"); + ast_sip_unregister_service(&pubsub_module); + ast_sched_context_destroy(sched); + return AST_MODULE_LOAD_DECLINE; + } + ast_sorcery_object_field_register(sorcery, "inbound-publication", "type", "", OPT_NOOP_T, 0, 0); + ast_sorcery_object_field_register_custom(sorcery, "inbound-publication", "endpoint", "", + resource_endpoint_handler, NULL, NULL, 0, 0); + ast_sorcery_object_fields_register(sorcery, "inbound-publication", "^event_", resource_event_handler, NULL); + ast_sorcery_reload_object(sorcery, "inbound-publication"); if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) { ast_sip_push_task(NULL, subscription_persistence_load, NULL); } else { - stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); + stasis_subscribe_pool(ast_manager_get_topic(), subscription_persistence_event_cb, NULL); } ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM, ami_show_subscriptions_inbound); ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM, ami_show_subscriptions_outbound); + ast_manager_register_xml("PJSIPShowResourceLists", EVENT_FLAG_SYSTEM, + ami_show_resource_lists); + + ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands)); + + AST_TEST_REGISTER(resource_tree); + AST_TEST_REGISTER(complex_resource_tree); + AST_TEST_REGISTER(bad_resource); + AST_TEST_REGISTER(bad_branch); + AST_TEST_REGISTER(duplicate_resource); + AST_TEST_REGISTER(loop); + AST_TEST_REGISTER(bad_event); return AST_MODULE_LOAD_SUCCESS; } static int unload_module(void) { + AST_TEST_UNREGISTER(resource_tree); + AST_TEST_UNREGISTER(complex_resource_tree); + AST_TEST_UNREGISTER(bad_resource); + AST_TEST_UNREGISTER(bad_branch); + AST_TEST_UNREGISTER(duplicate_resource); + AST_TEST_UNREGISTER(loop); + AST_TEST_UNREGISTER(bad_event); + + ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands)); + ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND); ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND); + ast_manager_unregister("PJSIPShowResourceLists"); + ast_sip_unregister_service(&pubsub_module); if (sched) { ast_sched_context_destroy(sched); } @@ -2132,7 +5345,8 @@ static int unload_module(void) } AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource", - .load = load_module, - .unload = unload_module, - .load_pri = AST_MODPRI_CHANNEL_DEPEND, + .support_level = AST_MODULE_SUPPORT_CORE, + .load = load_module, + .unload = unload_module, + .load_pri = AST_MODPRI_CHANNEL_DEPEND, );