2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * Mark Michelson <mmichelson@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
19 * \brief Opaque structure representing an RFC 3265 SIP subscription
23 <depend>pjproject</depend>
24 <depend>res_pjsip</depend>
25 <support_level>core</support_level>
31 #include <pjsip_simple.h>
34 #include "asterisk/res_pjsip_pubsub.h"
35 #include "asterisk/module.h"
36 #include "asterisk/linkedlists.h"
37 #include "asterisk/astobj2.h"
38 #include "asterisk/datastore.h"
39 #include "asterisk/uuid.h"
40 #include "asterisk/taskprocessor.h"
41 #include "asterisk/sched.h"
42 #include "asterisk/res_pjsip.h"
43 #include "asterisk/callerid.h"
44 #include "asterisk/manager.h"
45 #include "asterisk/test.h"
46 #include "res_pjsip/include/res_pjsip_private.h"
49 <manager name="PJSIPShowSubscriptionsInbound" language="en_US">
56 Provides a listing of all inbound subscriptions. An event <literal>InboundSubscriptionDetail</literal>
57 is issued for each subscription object. Once all detail events are completed an
58 <literal>InboundSubscriptionDetailComplete</literal> event is issued.
62 <manager name="PJSIPShowSubscriptionsOutbound" language="en_US">
69 Provides a listing of all outbound subscriptions. An event <literal>OutboundSubscriptionDetail</literal>
70 is issued for each subscription object. Once all detail events are completed an
71 <literal>OutboundSubscriptionDetailComplete</literal> event is issued.
75 <configInfo name="res_pjsip_pubsub" language="en_US">
76 <synopsis>Module that implements publish and subscribe support.</synopsis>
77 <configFile name="pjsip.conf">
78 <configObject name="subscription_persistence">
79 <synopsis>Persists SIP subscriptions so they survive restarts.</synopsis>
80 <configOption name="packet">
81 <synopsis>Entire SIP SUBSCRIBE packet that created the subscription</synopsis>
83 <configOption name="src_name">
84 <synopsis>The source address of the subscription</synopsis>
86 <configOption name="src_port">
87 <synopsis>The source port of the subscription</synopsis>
89 <configOption name="transport_key">
90 <synopsis>The type of transport the subscription was received on</synopsis>
92 <configOption name="local_name">
93 <synopsis>The local address the subscription was received on</synopsis>
95 <configOption name="local_port">
96 <synopsis>The local port the subscription was received on</synopsis>
98 <configOption name="cseq">
99 <synopsis>The sequence number of the next NOTIFY to be sent</synopsis>
101 <configOption name="tag">
102 <synopsis>The local tag of the dialog for the subscription</synopsis>
104 <configOption name="endpoint">
105 <synopsis>The name of the endpoint that subscribed</synopsis>
107 <configOption name="expires">
108 <synopsis>The time at which the subscription expires</synopsis>
111 <configObject name="inbound-publication">
112 <synopsis>The configuration for inbound publications</synopsis>
113 <configOption name="endpoint" default="">
114 <synopsis>Optional name of an endpoint that is only allowed to publish to this resource</synopsis>
116 <configOption name="type">
117 <synopsis>Must be of type 'inbound-publication'.</synopsis>
124 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata);
126 static struct pjsip_module pubsub_module = {
127 .name = { "PubSub Module", 13 },
128 .priority = PJSIP_MOD_PRIORITY_APPLICATION,
129 .on_rx_request = pubsub_on_rx_request,
132 #define MOD_DATA_PERSISTENCE "sub_persistence"
133 #define MOD_DATA_MSG "sub_msg"
135 static const pj_str_t str_event_name = { "Event", 5 };
137 /*! \brief Scheduler used for automatically expiring publications */
138 static struct ast_sched_context *sched;
140 /*! \brief Number of buckets for publications (on a per handler) */
141 #define PUBLICATIONS_BUCKETS 37
143 /*! \brief Default expiration time for PUBLISH if one is not specified */
144 #define DEFAULT_PUBLISH_EXPIRES 3600
146 /*! \brief Defined method for PUBLISH */
147 const pjsip_method pjsip_publish_method =
154 * \brief The types of PUBLISH messages defined in RFC 3903
156 enum sip_publish_type {
161 * This actually is not defined in RFC 3903. We use this as a constant
162 * to indicate that an incoming PUBLISH does not fit into any of the
163 * other categories and is thus invalid.
171 * The first PUBLISH sent. This will contain a non-zero Expires header
172 * as well as a body that indicates the current state of the endpoint
173 * that has sent the message. The initial PUBLISH is the only type
174 * of PUBLISH to not contain a Sip-If-Match header in it.
182 * Used to keep a published state from expiring. This will contain a
183 * non-zero Expires header but no body since its purpose is not to
192 * Used to change state from its previous value. This will contain
193 * a body updating the published state. May or may not contain an
202 * Used to remove published state from an ESC. This will contain
203 * an Expires header set to 0 and likely no body.
209 * Used to create new entity IDs by ESCs.
211 static int esc_etag_counter;
214 * \brief Structure representing a SIP publication
216 struct ast_sip_publication {
217 /*! Publication datastores set up by handlers */
218 struct ao2_container *datastores;
219 /*! \brief Entity tag for the publication */
221 /*! \brief Handler for this publication */
222 struct ast_sip_publish_handler *handler;
223 /*! \brief The endpoint with which the subscription is communicating */
224 struct ast_sip_endpoint *endpoint;
225 /*! \brief Expiration time of the publication */
227 /*! \brief Scheduled item for expiration of publication */
229 /*! \brief The resource the publication is to */
231 /*! \brief The name of the event type configuration */
232 char *event_configuration_name;
233 /*! \brief Data containing the above */
239 * \brief Structure used for persisting an inbound subscription
241 struct subscription_persistence {
242 /*! Sorcery object details */
243 SORCERY_OBJECT(details);
244 /*! The name of the endpoint involved in the subscrption */
246 /*! SIP message that creates the subscription */
247 char packet[PJSIP_MAX_PKT_LEN];
248 /*! Source address of the message */
249 char src_name[PJ_INET6_ADDRSTRLEN];
250 /*! Source port of the message */
252 /*! Local transport key type */
253 char transport_key[32];
254 /*! Local transport address */
255 char local_name[PJ_INET6_ADDRSTRLEN];
256 /*! Local transport port */
258 /*! Next CSeq to use for message */
260 /*! Local tag of the dialog */
262 /*! When this subscription expires */
263 struct timeval expires;
267 * \brief Real subscription details
269 * A real subscription is one that has a direct link to a
270 * PJSIP subscription and dialog.
272 struct ast_sip_real_subscription {
273 /*! The underlying PJSIP event subscription structure */
275 /*! The underlying PJSIP dialog */
280 * \brief Virtual subscription details
282 * A virtual subscription is one that does not have a direct
283 * link to a PJSIP subscription. Instead, it is a descendent
284 * of an ast_sip_subscription. Following the ancestry will
285 * eventually lead to a real subscription.
287 struct ast_sip_virtual_subscription {
288 struct ast_sip_subscription *parent;
292 * \brief Discriminator between real and virtual subscriptions
294 enum sip_subscription_type {
296 * \brief a "real" subscription.
298 * Real subscriptions are at the root of a tree of subscriptions.
299 * A real subscription has a corresponding SIP subscription in the
302 SIP_SUBSCRIPTION_REAL,
304 * \brief a "virtual" subscription.
306 * Virtual subscriptions are the descendents of real subscriptions
307 * in a tree of subscriptions. Virtual subscriptions do not have
308 * a corresponding SIP subscription in the PJSIP stack. Instead,
309 * when a state change happens on a virtual subscription, the
310 * state change is indicated to the virtual subscription's parent.
312 SIP_SUBSCRIPTION_VIRTUAL,
316 * \brief Structure representing a SIP subscription
318 struct ast_sip_subscription {
319 /*! Subscription datastores set up by handlers */
320 struct ao2_container *datastores;
321 /*! The endpoint with which the subscription is communicating */
322 struct ast_sip_endpoint *endpoint;
323 /*! Serializer on which to place operations for this subscription */
324 struct ast_taskprocessor *serializer;
325 /*! The handler for this subscription */
326 const struct ast_sip_subscription_handler *handler;
327 /*! The role for this subscription */
328 enum ast_sip_subscription_role role;
329 /*! Indicator of real or virtual subscription */
330 enum sip_subscription_type type;
331 /*! Real and virtual components of the subscription */
333 struct ast_sip_real_subscription real;
334 struct ast_sip_virtual_subscription virtual;
336 /*! Body generaator for NOTIFYs */
337 struct ast_sip_pubsub_body_generator *body_generator;
338 /*! Persistence information */
339 struct subscription_persistence *persistence;
340 /*! Next item in the list */
341 AST_LIST_ENTRY(ast_sip_subscription) next;
342 /*! List of child subscriptions */
343 AST_LIST_HEAD_NOLOCK(,ast_sip_subscription) children;
344 /*! Name of resource being subscribed to */
349 * \brief Structure representing a publication resource
351 struct ast_sip_publication_resource {
352 /*! \brief Sorcery object details */
353 SORCERY_OBJECT(details);
354 /*! \brief Optional name of an endpoint that is only allowed to publish to this resource */
356 /*! \brief Mapping for event types to configuration */
357 struct ast_variable *events;
360 static const char *sip_subscription_roles_map[] = {
361 [AST_SIP_SUBSCRIBER] = "Subscriber",
362 [AST_SIP_NOTIFIER] = "Notifier"
365 AST_RWLIST_HEAD_STATIC(subscriptions, ast_sip_subscription);
367 AST_RWLIST_HEAD_STATIC(body_generators, ast_sip_pubsub_body_generator);
368 AST_RWLIST_HEAD_STATIC(body_supplements, ast_sip_pubsub_body_supplement);
370 static pjsip_evsub *sip_subscription_get_evsub(const struct ast_sip_subscription *sub)
372 return sub->reality.real.evsub;
375 static pjsip_dialog *sip_subscription_get_dlg(const struct ast_sip_subscription *sub)
377 return sub->reality.real.dlg;
380 /*! \brief Destructor for publication resource */
381 static void publication_resource_destroy(void *obj)
383 struct ast_sip_publication_resource *resource = obj;
385 ast_free(resource->endpoint);
386 ast_variables_destroy(resource->events);
389 /*! \brief Allocator for publication resource */
390 static void *publication_resource_alloc(const char *name)
392 return ast_sorcery_generic_alloc(sizeof(struct ast_sip_publication_resource), publication_resource_destroy);
395 /*! \brief Destructor for subscription persistence */
396 static void subscription_persistence_destroy(void *obj)
398 struct subscription_persistence *persistence = obj;
400 ast_free(persistence->endpoint);
401 ast_free(persistence->tag);
404 /*! \brief Allocator for subscription persistence */
405 static void *subscription_persistence_alloc(const char *name)
407 return ast_sorcery_generic_alloc(sizeof(struct subscription_persistence), subscription_persistence_destroy);
410 /*! \brief Function which creates initial persistence information of a subscription in sorcery */
411 static struct subscription_persistence *subscription_persistence_create(struct ast_sip_subscription *sub)
413 char tag[PJ_GUID_STRING_LENGTH + 1];
415 /* The id of this persistence object doesn't matter as we keep it on the subscription and don't need to
416 * look it up by id at all.
418 struct subscription_persistence *persistence = ast_sorcery_alloc(ast_sip_get_sorcery(),
419 "subscription_persistence", NULL);
421 pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
427 persistence->endpoint = ast_strdup(ast_sorcery_object_get_id(sub->endpoint));
428 ast_copy_pj_str(tag, &dlg->local.info->tag, sizeof(tag));
429 persistence->tag = ast_strdup(tag);
431 ast_sorcery_create(ast_sip_get_sorcery(), persistence);
435 /*! \brief Function which updates persistence information of a subscription in sorcery */
436 static void subscription_persistence_update(struct ast_sip_subscription *sub,
437 pjsip_rx_data *rdata)
441 if (!sub->persistence) {
445 dlg = sip_subscription_get_dlg(sub);
446 sub->persistence->cseq = dlg->local.cseq;
450 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
452 expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
453 sub->persistence->expires = ast_tvadd(ast_tvnow(), ast_samp2tv(expires, 1));
455 ast_copy_string(sub->persistence->packet, rdata->pkt_info.packet, sizeof(sub->persistence->packet));
456 ast_copy_string(sub->persistence->src_name, rdata->pkt_info.src_name, sizeof(sub->persistence->src_name));
457 sub->persistence->src_port = rdata->pkt_info.src_port;
458 ast_copy_string(sub->persistence->transport_key, rdata->tp_info.transport->type_name,
459 sizeof(sub->persistence->transport_key));
460 ast_copy_pj_str(sub->persistence->local_name, &rdata->tp_info.transport->local_name.host,
461 sizeof(sub->persistence->local_name));
462 sub->persistence->local_port = rdata->tp_info.transport->local_name.port;
465 ast_sorcery_update(ast_sip_get_sorcery(), sub->persistence);
468 /*! \brief Function which removes persistence of a subscription from sorcery */
469 static void subscription_persistence_remove(struct ast_sip_subscription *sub)
471 if (!sub->persistence) {
475 ast_sorcery_delete(ast_sip_get_sorcery(), sub->persistence);
476 ao2_ref(sub->persistence, -1);
480 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name);
481 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
484 /*! \brief Retrieve a handler using the Event header of an rdata message */
485 static struct ast_sip_subscription_handler *subscription_get_handler_from_rdata(pjsip_rx_data *rdata)
487 pjsip_event_hdr *event_header;
489 struct ast_sip_subscription_handler *handler;
491 event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
493 ast_log(LOG_WARNING, "Incoming SUBSCRIBE request with no Event header\n");
496 ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
498 handler = find_sub_handler_for_event_name(event);
500 ast_log(LOG_WARNING, "No registered subscribe handler for event %s\n", event);
506 /*! \brief Retrieve a body generator using the Accept header of an rdata message */
507 static struct ast_sip_pubsub_body_generator *subscription_get_generator_from_rdata(pjsip_rx_data *rdata,
508 const struct ast_sip_subscription_handler *handler)
510 pjsip_accept_hdr *accept_header;
511 char accept[AST_SIP_MAX_ACCEPT][64];
512 size_t num_accept_headers;
514 accept_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_ACCEPT, rdata->msg_info.msg->hdr.next);
518 for (i = 0; i < accept_header->count; ++i) {
519 ast_copy_pj_str(accept[i], &accept_header->values[i], sizeof(accept[i]));
521 num_accept_headers = accept_header->count;
523 /* If a SUBSCRIBE contains no Accept headers, then we must assume that
524 * the default accept type for the event package is to be used.
526 ast_copy_string(accept[0], handler->notifier->default_accept, sizeof(accept[0]));
527 num_accept_headers = 1;
530 return find_body_generator(accept, num_accept_headers);
533 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
534 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
535 struct ast_sip_pubsub_body_generator *generator);
537 /*! \brief Callback function to perform the actual recreation of a subscription */
538 static int subscription_persistence_recreate(void *obj, void *arg, int flags)
540 struct subscription_persistence *persistence = obj;
541 pj_pool_t *pool = arg;
542 pjsip_rx_data rdata = { { 0, }, };
543 pjsip_expires_hdr *expires_header;
544 struct ast_sip_subscription_handler *handler;
545 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
546 struct ast_sip_subscription *sub;
547 struct ast_sip_pubsub_body_generator *generator;
550 size_t resource_size;
551 pjsip_sip_uri *request_uri;
553 /* If this subscription has already expired remove it */
554 if (ast_tvdiff_ms(persistence->expires, ast_tvnow()) <= 0) {
555 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
559 endpoint = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "endpoint", persistence->endpoint);
561 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the endpoint was not found\n",
562 persistence->endpoint);
563 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
568 rdata.tp_info.pool = pool;
570 if (ast_sip_create_rdata(&rdata, persistence->packet, persistence->src_name, persistence->src_port,
571 persistence->transport_key, persistence->local_name, persistence->local_port)) {
572 ast_log(LOG_WARNING, "A subscription for '%s' could not be recreated as the message could not be parsed\n",
573 persistence->endpoint);
574 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
578 request_uri = pjsip_uri_get_uri(rdata.msg_info.msg->line.req.uri);
579 resource_size = pj_strlen(&request_uri->user) + 1;
580 resource = alloca(resource_size);
581 ast_copy_pj_str(resource, &request_uri->user, resource_size);
583 /* Update the expiration header with the new expiration */
584 expires_header = pjsip_msg_find_hdr(rdata.msg_info.msg, PJSIP_H_EXPIRES, rdata.msg_info.msg->hdr.next);
585 if (!expires_header) {
586 expires_header = pjsip_expires_hdr_create(pool, 0);
587 if (!expires_header) {
588 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
591 pjsip_msg_add_hdr(rdata.msg_info.msg, (pjsip_hdr*)expires_header);
593 expires_header->ivalue = (ast_tvdiff_ms(persistence->expires, ast_tvnow()) / 1000);
595 handler = subscription_get_handler_from_rdata(&rdata);
596 if (!handler || !handler->notifier) {
597 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
601 generator = subscription_get_generator_from_rdata(&rdata, handler);
603 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
607 ast_sip_mod_data_set(rdata.tp_info.pool, rdata.endpt_info.mod_data,
608 pubsub_module.id, MOD_DATA_PERSISTENCE, persistence);
610 resp = handler->notifier->new_subscribe(endpoint, resource);
611 if (PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
612 sub = notifier_create_subscription(handler, endpoint, &rdata, resource, generator);
613 sub->persistence = ao2_bump(persistence);
614 subscription_persistence_update(sub, &rdata);
616 ast_sorcery_delete(ast_sip_get_sorcery(), persistence);
622 /*! \brief Function which loads and recreates persisted subscriptions upon startup when the system is fully booted */
623 static int subscription_persistence_load(void *data)
625 struct ao2_container *persisted_subscriptions = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(),
626 "subscription_persistence", AST_RETRIEVE_FLAG_MULTIPLE | AST_RETRIEVE_FLAG_ALL, NULL);
629 pool = pjsip_endpt_create_pool(ast_sip_get_pjsip_endpoint(), "rtd%p", PJSIP_POOL_RDATA_LEN,
630 PJSIP_POOL_RDATA_INC);
632 ast_log(LOG_WARNING, "Could not create a memory pool for recreating SIP subscriptions\n");
636 ao2_callback(persisted_subscriptions, OBJ_NODATA, subscription_persistence_recreate, pool);
638 pjsip_endpt_release_pool(ast_sip_get_pjsip_endpoint(), pool);
640 ao2_ref(persisted_subscriptions, -1);
644 /*! \brief Event callback which fires subscription persistence recreation when the system is fully booted */
645 static void subscription_persistence_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
647 struct ast_json_payload *payload;
650 if (stasis_message_type(message) != ast_manager_get_generic_type()) {
654 payload = stasis_message_data(message);
655 type = ast_json_string_get(ast_json_object_get(payload->json, "type"));
657 /* This subscription only responds to the FullyBooted event so that all modules have been loaded when we
658 * recreate SIP subscriptions.
660 if (strcmp(type, "FullyBooted")) {
664 /* This has to be here so the subscription is recreated when the body generator is available */
665 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
667 /* Once the system is fully booted we don't care anymore */
668 stasis_unsubscribe(sub);
671 static void add_subscription(struct ast_sip_subscription *obj)
673 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
674 AST_RWLIST_INSERT_TAIL(&subscriptions, obj, next);
675 ast_module_ref(ast_module_info->self);
678 static void remove_subscription(struct ast_sip_subscription *obj)
680 struct ast_sip_subscription *i;
681 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
682 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscriptions, i, next) {
684 AST_RWLIST_REMOVE_CURRENT(next);
685 ast_module_unref(ast_module_info->self);
689 AST_RWLIST_TRAVERSE_SAFE_END;
692 typedef int (*on_subscription_t)(struct ast_sip_subscription *sub, void *arg);
694 static int for_each_subscription(on_subscription_t on_subscription, void *arg)
697 struct ast_sip_subscription *i;
698 SCOPED_LOCK(lock, &subscriptions, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
700 if (!on_subscription) {
704 AST_RWLIST_TRAVERSE(&subscriptions, i, next) {
705 if (on_subscription(i, arg)) {
713 static void sip_subscription_to_ami(struct ast_sip_subscription *sub,
714 struct ast_str **buf)
717 struct ast_sip_endpoint_id_configuration *id = &sub->endpoint->id;
719 ast_str_append(buf, 0, "Role: %s\r\n",
720 sip_subscription_roles_map[sub->role]);
721 ast_str_append(buf, 0, "Endpoint: %s\r\n",
722 ast_sorcery_object_get_id(sub->endpoint));
724 ast_copy_pj_str(str, &sip_subscription_get_dlg(sub)->call_id->id, sizeof(str));
725 ast_str_append(buf, 0, "Callid: %s\r\n", str);
727 ast_str_append(buf, 0, "State: %s\r\n", pjsip_evsub_get_state_name(
728 sip_subscription_get_evsub(sub)));
730 ast_callerid_merge(str, sizeof(str),
731 S_COR(id->self.name.valid, id->self.name.str, NULL),
732 S_COR(id->self.number.valid, id->self.number.str, NULL),
735 ast_str_append(buf, 0, "Callerid: %s\r\n", str);
737 if (sub->handler->to_ami) {
738 sub->handler->to_ami(sub, buf);
742 #define DATASTORE_BUCKETS 53
744 #define DEFAULT_EXPIRES 3600
746 static int datastore_hash(const void *obj, int flags)
748 const struct ast_datastore *datastore = obj;
749 const char *uid = flags & OBJ_KEY ? obj : datastore->uid;
751 ast_assert(uid != NULL);
753 return ast_str_hash(uid);
756 static int datastore_cmp(void *obj, void *arg, int flags)
758 const struct ast_datastore *datastore1 = obj;
759 const struct ast_datastore *datastore2 = arg;
760 const char *uid2 = flags & OBJ_KEY ? arg : datastore2->uid;
762 ast_assert(datastore1->uid != NULL);
763 ast_assert(uid2 != NULL);
765 return strcmp(datastore1->uid, uid2) ? 0 : CMP_MATCH | CMP_STOP;
768 static int subscription_remove_serializer(void *obj)
770 struct ast_sip_subscription *sub = obj;
772 /* This is why we keep the dialog on the subscription. When the subscription
773 * is destroyed, there is no guarantee that the underlying dialog is ready
774 * to be destroyed. Furthermore, there's no guarantee in the opposite direction
775 * either. The dialog could be destroyed before our subscription is. We fix
776 * this problem by keeping a reference to the dialog until it is time to
777 * destroy the subscription. We need to have the dialog available when the
778 * subscription is destroyed so that we can guarantee that our attempt to
779 * remove the serializer will be successful.
781 ast_sip_dialog_set_serializer(sip_subscription_get_dlg(sub), NULL);
782 pjsip_dlg_dec_session(sip_subscription_get_dlg(sub), &pubsub_module);
787 static void subscription_destructor(void *obj)
789 struct ast_sip_subscription *sub = obj;
791 ast_debug(3, "Destroying SIP subscription\n");
793 subscription_persistence_remove(sub);
795 remove_subscription(sub);
797 ao2_cleanup(sub->datastores);
798 ao2_cleanup(sub->endpoint);
800 if (sip_subscription_get_dlg(sub)) {
801 ast_sip_push_task_synchronous(NULL, subscription_remove_serializer, sub);
803 ast_taskprocessor_unreference(sub->serializer);
807 static void pubsub_on_evsub_state(pjsip_evsub *sub, pjsip_event *event);
808 static void pubsub_on_rx_refresh(pjsip_evsub *sub, pjsip_rx_data *rdata,
809 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
810 static void pubsub_on_rx_notify(pjsip_evsub *sub, pjsip_rx_data *rdata, int *p_st_code,
811 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body);
812 static void pubsub_on_client_refresh(pjsip_evsub *sub);
813 static void pubsub_on_server_timeout(pjsip_evsub *sub);
816 static pjsip_evsub_user pubsub_cb = {
817 .on_evsub_state = pubsub_on_evsub_state,
818 .on_rx_refresh = pubsub_on_rx_refresh,
819 .on_rx_notify = pubsub_on_rx_notify,
820 .on_client_refresh = pubsub_on_client_refresh,
821 .on_server_timeout = pubsub_on_server_timeout,
824 static struct ast_sip_subscription *allocate_subscription(const struct ast_sip_subscription_handler *handler,
825 struct ast_sip_endpoint *endpoint, const char *resource, enum ast_sip_subscription_role role)
827 struct ast_sip_subscription *sub;
829 sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
833 strcpy(sub->resource, resource); /* Safe */
835 sub->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp);
836 if (!sub->datastores) {
840 sub->serializer = ast_sip_create_serializer();
841 if (!sub->serializer) {
846 sub->type = SIP_SUBSCRIPTION_REAL;
847 sub->endpoint = ao2_bump(endpoint);
848 sub->handler = handler;
853 static void subscription_setup_dialog(struct ast_sip_subscription *sub, pjsip_dialog *dlg)
855 /* We keep a reference to the dialog until our subscription is destroyed. See
856 * the subscription_destructor for more details
858 pjsip_dlg_inc_session(dlg, &pubsub_module);
859 sub->reality.real.dlg = dlg;
860 ast_sip_dialog_set_serializer(dlg, sub->serializer);
861 pjsip_evsub_set_mod_data(sip_subscription_get_evsub(sub), pubsub_module.id, sub);
864 static struct ast_sip_subscription *notifier_create_subscription(const struct ast_sip_subscription_handler *handler,
865 struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata, const char *resource,
866 struct ast_sip_pubsub_body_generator *generator)
868 struct ast_sip_subscription *sub;
870 struct subscription_persistence *persistence;
872 sub = allocate_subscription(handler, endpoint, resource, AST_SIP_NOTIFIER);
877 sub->body_generator = generator;
878 dlg = ast_sip_create_dialog_uas(endpoint, rdata);
880 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
885 persistence = ast_sip_mod_data_get(rdata->endpt_info.mod_data,
886 pubsub_module.id, MOD_DATA_PERSISTENCE);
888 /* Update the created dialog with the persisted information */
889 pjsip_ua_unregister_dlg(pjsip_ua_instance(), dlg);
890 pj_strdup2(dlg->pool, &dlg->local.info->tag, persistence->tag);
891 dlg->local.tag_hval = pj_hash_calc_tolower(0, NULL, &dlg->local.info->tag);
892 pjsip_ua_register_dlg(pjsip_ua_instance(), dlg);
893 dlg->local.cseq = persistence->cseq;
894 dlg->remote.cseq = persistence->cseq;
897 pjsip_evsub_create_uas(dlg, &pubsub_cb, rdata, 0, &sub->reality.real.evsub);
898 subscription_setup_dialog(sub, dlg);
900 ast_sip_mod_data_set(dlg->pool, dlg->mod_data, pubsub_module.id, MOD_DATA_MSG,
901 pjsip_msg_clone(dlg->pool, rdata->msg_info.msg));
903 add_subscription(sub);
907 void *ast_sip_subscription_get_header(const struct ast_sip_subscription *sub, const char *header)
909 pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
910 pjsip_msg *msg = ast_sip_mod_data_get(dlg->mod_data, pubsub_module.id, MOD_DATA_MSG);
913 pj_cstr(&name, header);
915 return pjsip_msg_find_hdr_by_name(msg, &name, NULL);
918 struct ast_sip_subscription *ast_sip_create_subscription(const struct ast_sip_subscription_handler *handler,
919 struct ast_sip_endpoint *endpoint, const char *resource)
921 struct ast_sip_subscription *sub = ao2_alloc(sizeof(*sub) + strlen(resource) + 1, subscription_destructor);
923 struct ast_sip_contact *contact;
925 pjsip_tx_data *tdata;
928 sub = allocate_subscription(handler, endpoint, resource, AST_SIP_SUBSCRIBER);
933 contact = ast_sip_location_retrieve_contact_from_aor_list(endpoint->aors);
934 if (!contact || ast_strlen_zero(contact->uri)) {
935 ast_log(LOG_WARNING, "No contacts configured for endpoint %s. Unable to create SIP subsription\n",
936 ast_sorcery_object_get_id(endpoint));
938 ao2_cleanup(contact);
942 dlg = ast_sip_create_dialog_uac(endpoint, contact->uri, NULL);
943 ao2_cleanup(contact);
945 ast_log(LOG_WARNING, "Unable to create dialog for SIP subscription\n");
950 pj_cstr(&event, handler->event_name);
951 pjsip_evsub_create_uac(dlg, &pubsub_cb, &event, 0, &sub->reality.real.evsub);
952 subscription_setup_dialog(sub, dlg);
954 add_subscription(sub);
956 evsub = sip_subscription_get_evsub(sub);
958 if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
959 pjsip_evsub_send_request(evsub, tdata);
961 /* pjsip_evsub_terminate will result in pubsub_on_evsub_state,
962 * being called and terminating the subscription. Therefore, we don't
963 * need to decrease the reference count of sub here.
965 pjsip_evsub_terminate(evsub, PJ_TRUE);
972 struct ast_sip_endpoint *ast_sip_subscription_get_endpoint(struct ast_sip_subscription *sub)
974 ast_assert(sub->endpoint != NULL);
975 ao2_ref(sub->endpoint, +1);
976 return sub->endpoint;
979 struct ast_taskprocessor *ast_sip_subscription_get_serializer(struct ast_sip_subscription *sub)
981 ast_assert(sub->serializer != NULL);
982 return sub->serializer;
985 static int sip_subscription_send_request(struct ast_sip_subscription *sub, pjsip_tx_data *tdata)
987 struct ast_sip_endpoint *endpoint = ast_sip_subscription_get_endpoint(sub);
991 res = pjsip_evsub_send_request(sip_subscription_get_evsub(sub),
992 tdata) == PJ_SUCCESS ? 0 : -1;
994 subscription_persistence_update(sub, NULL);
996 ast_test_suite_event_notify("SUBSCRIPTION_STATE_SET",
999 pjsip_evsub_get_state_name(sip_subscription_get_evsub(sub)),
1000 ast_sorcery_object_get_id(endpoint));
1002 ao2_cleanup(endpoint);
1007 int ast_sip_subscription_notify(struct ast_sip_subscription *sub, void *notify_data,
1010 struct ast_sip_body body = {
1011 .type = ast_sip_subscription_get_body_type(sub),
1012 .subtype = ast_sip_subscription_get_body_subtype(sub),
1014 struct ast_str *body_text = ast_str_create(64);
1015 pjsip_evsub *evsub = sip_subscription_get_evsub(sub);
1016 pjsip_tx_data *tdata;
1017 pjsip_evsub_state state;
1023 if (ast_sip_pubsub_generate_body_content(body.type, body.subtype, notify_data, &body_text)) {
1024 ast_free(body_text);
1028 body.body_text = ast_str_buffer(body_text);
1031 state = PJSIP_EVSUB_STATE_TERMINATED;
1033 state = pjsip_evsub_get_state(evsub) <= PJSIP_EVSUB_STATE_ACTIVE ?
1034 PJSIP_EVSUB_STATE_ACTIVE : PJSIP_EVSUB_STATE_TERMINATED;
1037 if (pjsip_evsub_notify(evsub, state, NULL, NULL, &tdata) != PJ_SUCCESS) {
1038 ast_free(body_text);
1041 if (ast_sip_add_body(tdata, &body)) {
1042 ast_free(body_text);
1043 pjsip_tx_data_dec_ref(tdata);
1046 if (sip_subscription_send_request(sub, tdata)) {
1047 ast_free(body_text);
1048 pjsip_tx_data_dec_ref(tdata);
1055 void ast_sip_subscription_get_local_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1057 pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1058 ast_copy_pj_str(buf, &dlg->local.info_str, size);
1061 void ast_sip_subscription_get_remote_uri(struct ast_sip_subscription *sub, char *buf, size_t size)
1063 pjsip_dialog *dlg = sip_subscription_get_dlg(sub);
1064 ast_copy_pj_str(buf, &dlg->remote.info_str, size);
1067 const char *ast_sip_subscription_get_resource_name(struct ast_sip_subscription *sub)
1069 return sub->resource;
1072 static int sip_subscription_accept(struct ast_sip_subscription *sub, pjsip_rx_data *rdata, int response)
1074 /* If this is a persistence recreation the subscription has already been accepted */
1075 if (ast_sip_mod_data_get(rdata->endpt_info.mod_data, pubsub_module.id, MOD_DATA_PERSISTENCE)) {
1079 return pjsip_evsub_accept(sip_subscription_get_evsub(sub), rdata, response, NULL) == PJ_SUCCESS ? 0 : -1;
1082 static void subscription_datastore_destroy(void *obj)
1084 struct ast_datastore *datastore = obj;
1086 /* Using the destroy function (if present) destroy the data */
1087 if (datastore->info->destroy != NULL && datastore->data != NULL) {
1088 datastore->info->destroy(datastore->data);
1089 datastore->data = NULL;
1092 ast_free((void *) datastore->uid);
1093 datastore->uid = NULL;
1096 struct ast_datastore *ast_sip_subscription_alloc_datastore(const struct ast_datastore_info *info, const char *uid)
1098 RAII_VAR(struct ast_datastore *, datastore, NULL, ao2_cleanup);
1099 const char *uid_ptr = uid;
1105 datastore = ao2_alloc(sizeof(*datastore), subscription_datastore_destroy);
1110 datastore->info = info;
1111 if (ast_strlen_zero(uid)) {
1112 /* They didn't provide an ID so we'll provide one ourself */
1113 struct ast_uuid *uuid = ast_uuid_generate();
1114 char uuid_buf[AST_UUID_STR_LEN];
1118 uid_ptr = ast_uuid_to_str(uuid, uuid_buf, sizeof(uuid_buf));
1122 datastore->uid = ast_strdup(uid_ptr);
1123 if (!datastore->uid) {
1127 ao2_ref(datastore, +1);
1131 int ast_sip_subscription_add_datastore(struct ast_sip_subscription *subscription, struct ast_datastore *datastore)
1133 ast_assert(datastore != NULL);
1134 ast_assert(datastore->info != NULL);
1135 ast_assert(!ast_strlen_zero(datastore->uid));
1137 if (!ao2_link(subscription->datastores, datastore)) {
1143 struct ast_datastore *ast_sip_subscription_get_datastore(struct ast_sip_subscription *subscription, const char *name)
1145 return ao2_find(subscription->datastores, name, OBJ_KEY);
1148 void ast_sip_subscription_remove_datastore(struct ast_sip_subscription *subscription, const char *name)
1150 ao2_find(subscription->datastores, name, OBJ_SEARCH_KEY | OBJ_UNLINK | OBJ_NODATA);
1153 int ast_sip_publication_add_datastore(struct ast_sip_publication *publication, struct ast_datastore *datastore)
1155 ast_assert(datastore != NULL);
1156 ast_assert(datastore->info != NULL);
1157 ast_assert(!ast_strlen_zero(datastore->uid));
1159 if (!ao2_link(publication->datastores, datastore)) {
1165 struct ast_datastore *ast_sip_publication_get_datastore(struct ast_sip_publication *publication, const char *name)
1167 return ao2_find(publication->datastores, name, OBJ_KEY);
1170 void ast_sip_publication_remove_datastore(struct ast_sip_publication *publication, const char *name)
1172 ao2_callback(publication->datastores, OBJ_KEY | OBJ_UNLINK | OBJ_NODATA, NULL, (void *) name);
1175 AST_RWLIST_HEAD_STATIC(publish_handlers, ast_sip_publish_handler);
1177 static int publication_hash_fn(const void *obj, const int flags)
1179 const struct ast_sip_publication *publication = obj;
1180 const int *entity_tag = obj;
1182 return flags & OBJ_KEY ? *entity_tag : publication->entity_tag;
1185 static int publication_cmp_fn(void *obj, void *arg, int flags)
1187 const struct ast_sip_publication *publication1 = obj;
1188 const struct ast_sip_publication *publication2 = arg;
1189 const int *entity_tag = arg;
1191 return (publication1->entity_tag == (flags & OBJ_KEY ? *entity_tag : publication2->entity_tag) ?
1192 CMP_MATCH | CMP_STOP : 0);
1195 static void publish_add_handler(struct ast_sip_publish_handler *handler)
1197 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1198 AST_RWLIST_INSERT_TAIL(&publish_handlers, handler, next);
1201 int ast_sip_register_publish_handler(struct ast_sip_publish_handler *handler)
1203 if (ast_strlen_zero(handler->event_name)) {
1204 ast_log(LOG_ERROR, "No event package specified for publish handler. Cannot register\n");
1208 if (!(handler->publications = ao2_container_alloc(PUBLICATIONS_BUCKETS,
1209 publication_hash_fn, publication_cmp_fn))) {
1210 ast_log(LOG_ERROR, "Could not allocate publications container for event '%s'\n",
1211 handler->event_name);
1215 publish_add_handler(handler);
1217 ast_module_ref(ast_module_info->self);
1222 void ast_sip_unregister_publish_handler(struct ast_sip_publish_handler *handler)
1224 struct ast_sip_publish_handler *iter;
1225 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1226 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&publish_handlers, iter, next) {
1227 if (handler == iter) {
1228 AST_RWLIST_REMOVE_CURRENT(next);
1229 ao2_cleanup(handler->publications);
1230 ast_module_unref(ast_module_info->self);
1234 AST_RWLIST_TRAVERSE_SAFE_END;
1237 AST_RWLIST_HEAD_STATIC(subscription_handlers, ast_sip_subscription_handler);
1239 static void sub_add_handler(struct ast_sip_subscription_handler *handler)
1241 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1242 AST_RWLIST_INSERT_TAIL(&subscription_handlers, handler, next);
1243 ast_module_ref(ast_module_info->self);
1246 static struct ast_sip_subscription_handler *find_sub_handler_for_event_name(const char *event_name)
1248 struct ast_sip_subscription_handler *iter;
1249 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1251 AST_RWLIST_TRAVERSE(&subscription_handlers, iter, next) {
1252 if (!strcmp(iter->event_name, event_name)) {
1259 int ast_sip_register_subscription_handler(struct ast_sip_subscription_handler *handler)
1262 pj_str_t accept[AST_SIP_MAX_ACCEPT] = { {0, }, };
1263 struct ast_sip_subscription_handler *existing;
1266 if (ast_strlen_zero(handler->event_name)) {
1267 ast_log(LOG_ERROR, "No event package specified for subscription handler. Cannot register\n");
1271 existing = find_sub_handler_for_event_name(handler->event_name);
1273 ast_log(LOG_ERROR, "Unable to register subscription handler for event %s."
1274 "A handler is already registered\n", handler->event_name);
1278 for (i = 0; i < AST_SIP_MAX_ACCEPT && !ast_strlen_zero(handler->accept[i]); ++i) {
1279 pj_cstr(&accept[i], handler->accept[i]);
1282 pj_cstr(&event, handler->event_name);
1284 pjsip_evsub_register_pkg(&pubsub_module, &event, DEFAULT_EXPIRES, i, accept);
1286 sub_add_handler(handler);
1291 void ast_sip_unregister_subscription_handler(struct ast_sip_subscription_handler *handler)
1293 struct ast_sip_subscription_handler *iter;
1294 SCOPED_LOCK(lock, &subscription_handlers, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1295 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&subscription_handlers, iter, next) {
1296 if (handler == iter) {
1297 AST_RWLIST_REMOVE_CURRENT(next);
1298 ast_module_unref(ast_module_info->self);
1302 AST_RWLIST_TRAVERSE_SAFE_END;
1305 static struct ast_sip_pubsub_body_generator *find_body_generator_type_subtype(const char *content_type,
1306 const char *content_subtype)
1308 struct ast_sip_pubsub_body_generator *iter;
1309 SCOPED_LOCK(lock, &body_generators, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1311 AST_LIST_TRAVERSE(&body_generators, iter, list) {
1312 if (!strcmp(iter->type, content_type) &&
1313 !strcmp(iter->subtype, content_subtype)) {
1321 static struct ast_sip_pubsub_body_generator *find_body_generator_accept(const char *accept)
1323 char *accept_copy = ast_strdupa(accept);
1324 char *subtype = accept_copy;
1325 char *type = strsep(&subtype, "/");
1327 if (ast_strlen_zero(type) || ast_strlen_zero(subtype)) {
1331 return find_body_generator_type_subtype(type, subtype);
1334 static struct ast_sip_pubsub_body_generator *find_body_generator(char accept[AST_SIP_MAX_ACCEPT][64],
1338 struct ast_sip_pubsub_body_generator *generator = NULL;
1340 for (i = 0; i < num_accept; ++i) {
1341 generator = find_body_generator_accept(accept[i]);
1343 ast_debug(3, "Body generator %p found for accept type %s\n", generator, accept[i]);
1346 ast_debug(3, "No body generator found for accept type %s\n", accept[i]);
1353 static pj_bool_t pubsub_on_rx_subscribe_request(pjsip_rx_data *rdata)
1355 pjsip_expires_hdr *expires_header;
1356 struct ast_sip_subscription_handler *handler;
1357 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1358 struct ast_sip_subscription *sub;
1359 struct ast_sip_pubsub_body_generator *generator;
1361 pjsip_uri *request_uri;
1362 pjsip_sip_uri *request_uri_sip;
1363 size_t resource_size;
1366 endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1367 ast_assert(endpoint != NULL);
1369 if (!endpoint->subscription.allow) {
1370 ast_log(LOG_WARNING, "Subscriptions not permitted for endpoint %s.\n", ast_sorcery_object_get_id(endpoint));
1371 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 603, NULL, NULL, NULL);
1375 request_uri = rdata->msg_info.msg->line.req.uri;
1377 if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1378 char uri_str[PJSIP_MAX_URL_SIZE];
1380 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1381 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1382 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1386 request_uri_sip = pjsip_uri_get_uri(request_uri);
1387 resource_size = pj_strlen(&request_uri_sip->user) + 1;
1388 resource = alloca(resource_size);
1389 ast_copy_pj_str(resource, &request_uri_sip->user, resource_size);
1391 expires_header = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, rdata->msg_info.msg->hdr.next);
1393 if (expires_header) {
1394 if (expires_header->ivalue == 0) {
1395 ast_log(LOG_WARNING, "Susbscription request from endpoint %s rejected. Expiration of 0 is invalid\n",
1396 ast_sorcery_object_get_id(endpoint));
1397 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1400 if (expires_header->ivalue < endpoint->subscription.minexpiry) {
1401 ast_log(LOG_WARNING, "Subscription expiration %d is too brief for endpoint %s. Minimum is %u\n",
1402 expires_header->ivalue, ast_sorcery_object_get_id(endpoint), endpoint->subscription.minexpiry);
1403 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 423, NULL, NULL, NULL);
1408 handler = subscription_get_handler_from_rdata(rdata);
1410 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1414 generator = subscription_get_generator_from_rdata(rdata, handler);
1416 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1420 resp = handler->notifier->new_subscribe(endpoint, resource);
1421 if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1422 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1426 sub = notifier_create_subscription(handler, endpoint, rdata, resource, generator);
1428 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1430 sub->persistence = subscription_persistence_create(sub);
1431 subscription_persistence_update(sub, rdata);
1432 sip_subscription_accept(sub, rdata, resp);
1433 if (handler->notifier->notify_required(sub, AST_SIP_SUBSCRIPTION_NOTIFY_REASON_STARTED)) {
1434 pjsip_evsub_terminate(sip_subscription_get_evsub(sub), PJ_TRUE);
1441 static struct ast_sip_publish_handler *find_pub_handler(const char *event)
1443 struct ast_sip_publish_handler *iter = NULL;
1444 SCOPED_LOCK(lock, &publish_handlers, AST_RWLIST_RDLOCK, AST_RWLIST_UNLOCK);
1446 AST_RWLIST_TRAVERSE(&publish_handlers, iter, next) {
1447 if (strcmp(event, iter->event_name)) {
1448 ast_debug(3, "Event %s does not match %s\n", event, iter->event_name);
1451 ast_debug(3, "Event name match: %s = %s\n", event, iter->event_name);
1458 static enum sip_publish_type determine_sip_publish_type(pjsip_rx_data *rdata,
1459 pjsip_generic_string_hdr *etag_hdr, int *expires, int *entity_id)
1461 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1464 char etag[pj_strlen(&etag_hdr->hvalue) + 1];
1466 ast_copy_pj_str(etag, &etag_hdr->hvalue, sizeof(etag));
1468 if (sscanf(etag, "%30d", entity_id) != 1) {
1469 return SIP_PUBLISH_UNKNOWN;
1473 *expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1476 return SIP_PUBLISH_REMOVE;
1477 } else if (!etag_hdr && rdata->msg_info.msg->body) {
1478 return SIP_PUBLISH_INITIAL;
1479 } else if (etag_hdr && !rdata->msg_info.msg->body) {
1480 return SIP_PUBLISH_REFRESH;
1481 } else if (etag_hdr && rdata->msg_info.msg->body) {
1482 return SIP_PUBLISH_MODIFY;
1485 return SIP_PUBLISH_UNKNOWN;
1488 /*! \brief Internal destructor for publications */
1489 static void publication_destroy_fn(void *obj)
1491 struct ast_sip_publication *publication = obj;
1493 ast_debug(3, "Destroying SIP publication\n");
1495 ao2_cleanup(publication->datastores);
1496 ao2_cleanup(publication->endpoint);
1499 static struct ast_sip_publication *sip_create_publication(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1500 const char *resource, const char *event_configuration_name)
1502 struct ast_sip_publication *publication;
1503 pjsip_expires_hdr *expires_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_EXPIRES, NULL);
1504 size_t resource_len = strlen(resource) + 1, event_configuration_name_len = strlen(event_configuration_name) + 1;
1507 ast_assert(endpoint != NULL);
1509 if (!(publication = ao2_alloc(sizeof(*publication) + resource_len + event_configuration_name_len, publication_destroy_fn))) {
1513 if (!(publication->datastores = ao2_container_alloc(DATASTORE_BUCKETS, datastore_hash, datastore_cmp))) {
1514 ao2_ref(publication, -1);
1518 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1519 ao2_ref(endpoint, +1);
1520 publication->endpoint = endpoint;
1521 publication->expires = expires_hdr ? expires_hdr->ivalue : DEFAULT_PUBLISH_EXPIRES;
1522 publication->sched_id = -1;
1523 dst = publication->data;
1524 publication->resource = strcpy(dst, resource);
1525 dst += resource_len;
1526 publication->event_configuration_name = strcpy(dst, event_configuration_name);
1531 static int sip_publication_respond(struct ast_sip_publication *pub, int status_code,
1532 pjsip_rx_data *rdata)
1535 pjsip_tx_data *tdata;
1536 pjsip_transaction *tsx;
1538 if (pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, status_code, NULL, &tdata) != PJ_SUCCESS) {
1542 if (PJSIP_IS_STATUS_IN_CLASS(status_code, 200)) {
1543 RAII_VAR(char *, entity_tag, NULL, ast_free_ptr);
1544 RAII_VAR(char *, expires, NULL, ast_free_ptr);
1546 if ((ast_asprintf(&entity_tag, "%d", pub->entity_tag) < 0) ||
1547 (ast_asprintf(&expires, "%d", pub->expires) < 0)) {
1548 pjsip_tx_data_dec_ref(tdata);
1552 ast_sip_add_header(tdata, "SIP-ETag", entity_tag);
1553 ast_sip_add_header(tdata, "Expires", expires);
1556 if ((status = pjsip_tsx_create_uas(&pubsub_module, rdata, &tsx)) != PJ_SUCCESS) {
1560 pjsip_tsx_recv_msg(tsx, rdata);
1562 if (pjsip_tsx_send_msg(tsx, tdata) != PJ_SUCCESS) {
1569 static struct ast_sip_publication *publish_request_initial(struct ast_sip_endpoint *endpoint, pjsip_rx_data *rdata,
1570 struct ast_sip_publish_handler *handler)
1572 struct ast_sip_publication *publication;
1573 char *resource_name;
1574 size_t resource_size;
1575 RAII_VAR(struct ast_sip_publication_resource *, resource, NULL, ao2_cleanup);
1576 struct ast_variable *event_configuration_name = NULL;
1577 pjsip_uri *request_uri;
1578 pjsip_sip_uri *request_uri_sip;
1581 request_uri = rdata->msg_info.msg->line.req.uri;
1583 if (!PJSIP_URI_SCHEME_IS_SIP(request_uri) && !PJSIP_URI_SCHEME_IS_SIPS(request_uri)) {
1584 char uri_str[PJSIP_MAX_URL_SIZE];
1586 pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, uri_str, sizeof(uri_str));
1587 ast_log(LOG_WARNING, "Request URI '%s' is not a sip: or sips: URI.\n", uri_str);
1588 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 416, NULL, NULL, NULL);
1592 request_uri_sip = pjsip_uri_get_uri(request_uri);
1593 resource_size = pj_strlen(&request_uri_sip->user) + 1;
1594 resource_name = alloca(resource_size);
1595 ast_copy_pj_str(resource_name, &request_uri_sip->user, resource_size);
1597 resource = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "inbound-publication", resource_name);
1599 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
1603 if (!ast_strlen_zero(resource->endpoint) && strcmp(resource->endpoint, ast_sorcery_object_get_id(endpoint))) {
1604 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 403, NULL, NULL, NULL);
1608 for (event_configuration_name = resource->events; event_configuration_name; event_configuration_name = event_configuration_name->next) {
1609 if (!strcmp(event_configuration_name->name, handler->event_name)) {
1614 if (!event_configuration_name) {
1615 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 404, NULL, NULL, NULL);
1619 resp = handler->new_publication(endpoint, resource_name, event_configuration_name->value);
1621 if (!PJSIP_IS_STATUS_IN_CLASS(resp, 200)) {
1622 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, resp, NULL, NULL, NULL);
1626 publication = sip_create_publication(endpoint, rdata, S_OR(resource_name, ""), event_configuration_name->value);
1629 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
1633 publication->handler = handler;
1634 if (publication->handler->publication_state_change(publication, rdata->msg_info.msg->body,
1635 AST_SIP_PUBLISH_STATE_INITIALIZED)) {
1636 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
1637 ao2_cleanup(publication);
1641 sip_publication_respond(publication, resp, rdata);
1646 static int publish_expire_callback(void *data)
1648 RAII_VAR(struct ast_sip_publication *, publication, data, ao2_cleanup);
1650 if (publication->handler->publish_expire) {
1651 publication->handler->publish_expire(publication);
1657 static int publish_expire(const void *data)
1659 struct ast_sip_publication *publication = (struct ast_sip_publication*)data;
1661 ao2_unlink(publication->handler->publications, publication);
1662 publication->sched_id = -1;
1664 if (ast_sip_push_task(NULL, publish_expire_callback, publication)) {
1665 ao2_cleanup(publication);
1671 static pj_bool_t pubsub_on_rx_publish_request(pjsip_rx_data *rdata)
1673 pjsip_event_hdr *event_header;
1674 struct ast_sip_publish_handler *handler;
1675 RAII_VAR(struct ast_sip_endpoint *, endpoint, NULL, ao2_cleanup);
1677 static const pj_str_t str_sip_if_match = { "SIP-If-Match", 12 };
1678 pjsip_generic_string_hdr *etag_hdr = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_sip_if_match, NULL);
1679 enum sip_publish_type publish_type;
1680 RAII_VAR(struct ast_sip_publication *, publication, NULL, ao2_cleanup);
1681 int expires = 0, entity_id, response = 0;
1683 endpoint = ast_pjsip_rdata_get_endpoint(rdata);
1684 ast_assert(endpoint != NULL);
1686 event_header = pjsip_msg_find_hdr_by_name(rdata->msg_info.msg, &str_event_name, rdata->msg_info.msg->hdr.next);
1687 if (!event_header) {
1688 ast_log(LOG_WARNING, "Incoming PUBLISH request with no Event header\n");
1689 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1692 ast_copy_pj_str(event, &event_header->event_type, sizeof(event));
1694 handler = find_pub_handler(event);
1696 ast_log(LOG_WARNING, "No registered publish handler for event %s\n", event);
1697 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 489, NULL, NULL, NULL);
1701 publish_type = determine_sip_publish_type(rdata, etag_hdr, &expires, &entity_id);
1703 /* If this is not an initial publish ensure that a publication is present */
1704 if ((publish_type != SIP_PUBLISH_INITIAL) && (publish_type != SIP_PUBLISH_UNKNOWN)) {
1705 if (!(publication = ao2_find(handler->publications, &entity_id, OBJ_KEY | OBJ_UNLINK))) {
1706 static const pj_str_t str_conditional_request_failed = { "Conditional Request Failed", 26 };
1708 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 412, &str_conditional_request_failed,
1713 /* Per the RFC every response has to have a new entity tag */
1714 publication->entity_tag = ast_atomic_fetchadd_int(&esc_etag_counter, +1);
1716 /* Update the expires here so that the created responses will contain the correct value */
1717 publication->expires = expires;
1720 switch (publish_type) {
1721 case SIP_PUBLISH_INITIAL:
1722 publication = publish_request_initial(endpoint, rdata, handler);
1724 case SIP_PUBLISH_REFRESH:
1725 case SIP_PUBLISH_MODIFY:
1726 if (handler->publication_state_change(publication, rdata->msg_info.msg->body,
1727 AST_SIP_PUBLISH_STATE_ACTIVE)) {
1728 /* If an error occurs we want to terminate the publication */
1733 case SIP_PUBLISH_REMOVE:
1734 handler->publication_state_change(publication, rdata->msg_info.msg->body,
1735 AST_SIP_PUBLISH_STATE_TERMINATED);
1738 case SIP_PUBLISH_UNKNOWN:
1740 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 400, NULL, NULL, NULL);
1746 ao2_link(handler->publications, publication);
1748 AST_SCHED_REPLACE_UNREF(publication->sched_id, sched, expires * 1000, publish_expire, publication,
1749 ao2_ref(publication, -1), ao2_ref(publication, -1), ao2_ref(publication, +1));
1751 AST_SCHED_DEL_UNREF(sched, publication->sched_id, ao2_ref(publication, -1));
1756 sip_publication_respond(publication, response, rdata);
1762 struct ast_sip_endpoint *ast_sip_publication_get_endpoint(struct ast_sip_publication *pub)
1764 return pub->endpoint;
1767 const char *ast_sip_publication_get_resource(const struct ast_sip_publication *pub)
1769 return pub->resource;
1772 const char *ast_sip_publication_get_event_configuration(const struct ast_sip_publication *pub)
1774 return pub->event_configuration_name;
1777 int ast_sip_pubsub_register_body_generator(struct ast_sip_pubsub_body_generator *generator)
1779 struct ast_sip_pubsub_body_generator *existing;
1781 pj_size_t accept_len;
1783 existing = find_body_generator_type_subtype(generator->type, generator->subtype);
1785 ast_log(LOG_WARNING, "Cannot register body generator of %s/%s."
1786 "One is already registered.\n", generator->type, generator->subtype);
1790 AST_RWLIST_WRLOCK(&body_generators);
1791 AST_LIST_INSERT_HEAD(&body_generators, generator, list);
1792 AST_RWLIST_UNLOCK(&body_generators);
1794 /* Lengths of type and subtype plus space for a slash. pj_str_t is not
1795 * null-terminated, so there is no need to allocate for the extra null
1798 accept_len = strlen(generator->type) + strlen(generator->subtype) + 1;
1800 accept.ptr = alloca(accept_len);
1801 accept.slen = accept_len;
1802 /* Safe use of sprintf */
1803 sprintf(accept.ptr, "%s/%s", generator->type, generator->subtype);
1804 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), &pubsub_module,
1805 PJSIP_H_ACCEPT, NULL, 1, &accept);
1810 void ast_sip_pubsub_unregister_body_generator(struct ast_sip_pubsub_body_generator *generator)
1812 struct ast_sip_pubsub_body_generator *iter;
1813 SCOPED_LOCK(lock, &body_generators, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1815 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_generators, iter, list) {
1816 if (iter == generator) {
1817 AST_LIST_REMOVE_CURRENT(list);
1821 AST_RWLIST_TRAVERSE_SAFE_END;
1824 int ast_sip_pubsub_register_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1826 AST_RWLIST_WRLOCK(&body_supplements);
1827 AST_RWLIST_INSERT_TAIL(&body_supplements, supplement, list);
1828 AST_RWLIST_UNLOCK(&body_supplements);
1833 void ast_sip_pubsub_unregister_body_supplement(struct ast_sip_pubsub_body_supplement *supplement)
1835 struct ast_sip_pubsub_body_supplement *iter;
1836 SCOPED_LOCK(lock, &body_supplements, AST_RWLIST_WRLOCK, AST_RWLIST_UNLOCK);
1838 AST_RWLIST_TRAVERSE_SAFE_BEGIN(&body_supplements, iter, list) {
1839 if (iter == supplement) {
1840 AST_LIST_REMOVE_CURRENT(list);
1844 AST_RWLIST_TRAVERSE_SAFE_END;
1847 const char *ast_sip_subscription_get_body_type(struct ast_sip_subscription *sub)
1849 return sub->body_generator->type;
1852 const char *ast_sip_subscription_get_body_subtype(struct ast_sip_subscription *sub)
1854 return sub->body_generator->subtype;
1857 int ast_sip_pubsub_generate_body_content(const char *type, const char *subtype,
1858 void *data, struct ast_str **str)
1860 struct ast_sip_pubsub_body_supplement *supplement;
1861 struct ast_sip_pubsub_body_generator *generator;
1865 generator = find_body_generator_type_subtype(type, subtype);
1867 ast_log(LOG_WARNING, "Unable to find a body generator for %s/%s\n",
1872 body = generator->allocate_body(data);
1874 ast_log(LOG_WARNING, "Unable to allocate a NOTIFY body of type %s/%s\n",
1879 if (generator->generate_body_content(body, data)) {
1884 AST_RWLIST_RDLOCK(&body_supplements);
1885 AST_RWLIST_TRAVERSE(&body_supplements, supplement, list) {
1886 if (!strcmp(generator->type, supplement->type) &&
1887 !strcmp(generator->subtype, supplement->subtype)) {
1888 res = supplement->supplement_body(body, data);
1894 AST_RWLIST_UNLOCK(&body_supplements);
1897 generator->to_string(body, str);
1901 if (generator->destroy_body) {
1902 generator->destroy_body(body);
1908 static pj_bool_t pubsub_on_rx_request(pjsip_rx_data *rdata)
1910 if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, pjsip_get_subscribe_method())) {
1911 return pubsub_on_rx_subscribe_request(rdata);
1912 } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_publish_method)) {
1913 return pubsub_on_rx_publish_request(rdata);
1919 static void pubsub_on_evsub_state(pjsip_evsub *evsub, pjsip_event *event)
1921 struct ast_sip_subscription *sub;
1922 if (pjsip_evsub_get_state(evsub) != PJSIP_EVSUB_STATE_TERMINATED) {
1926 sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1931 if (sub->handler->subscription_shutdown) {
1932 sub->handler->subscription_shutdown(sub);
1934 pjsip_evsub_set_mod_data(evsub, pubsub_module.id, NULL);
1937 static void pubsub_on_rx_refresh(pjsip_evsub *evsub, pjsip_rx_data *rdata,
1938 int *p_st_code, pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1940 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1941 enum ast_sip_subscription_notify_reason reason;
1947 if (pjsip_evsub_get_state(sip_subscription_get_evsub(sub)) == PJSIP_EVSUB_STATE_TERMINATED) {
1948 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED;
1950 reason = AST_SIP_SUBSCRIPTION_NOTIFY_REASON_RENEWED;
1952 if (sub->handler->notifier->notify_required(sub, reason)) {
1957 static void pubsub_on_rx_notify(pjsip_evsub *evsub, pjsip_rx_data *rdata, int *p_st_code,
1958 pj_str_t **p_st_text, pjsip_hdr *res_hdr, pjsip_msg_body **p_body)
1960 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1966 sub->handler->subscriber->state_change(sub, rdata->msg_info.msg->body,
1967 pjsip_evsub_get_state(evsub));
1970 static int serialized_pubsub_on_client_refresh(void *userdata)
1972 struct ast_sip_subscription *sub = userdata;
1974 pjsip_tx_data *tdata;
1976 evsub = sip_subscription_get_evsub(sub);
1978 if (pjsip_evsub_initiate(evsub, NULL, -1, &tdata) == PJ_SUCCESS) {
1979 pjsip_evsub_send_request(evsub, tdata);
1981 pjsip_evsub_terminate(evsub, PJ_TRUE);
1988 static void pubsub_on_client_refresh(pjsip_evsub *evsub)
1990 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
1993 ast_sip_push_task(sub->serializer, serialized_pubsub_on_client_refresh, sub);
1996 static int serialized_pubsub_on_server_timeout(void *userdata)
1998 struct ast_sip_subscription *sub = userdata;
2000 sub->handler->notifier->notify_required(sub,
2001 AST_SIP_SUBSCRIPTION_NOTIFY_REASON_TERMINATED);
2007 static void pubsub_on_server_timeout(pjsip_evsub *evsub)
2009 struct ast_sip_subscription *sub = pjsip_evsub_get_mod_data(evsub, pubsub_module.id);
2012 /* if a subscription has been terminated and the subscription
2013 timeout/expires is less than the time it takes for all pending
2014 transactions to end then the subscription timer will not have
2015 been canceled yet and sub will be null, so do nothing since
2016 the subscription has already been terminated. */
2021 ast_sip_push_task(sub->serializer, serialized_pubsub_on_server_timeout, sub);
2024 static int ami_subscription_detail(struct ast_sip_subscription *sub,
2025 struct ast_sip_ami *ami,
2028 RAII_VAR(struct ast_str *, buf,
2029 ast_sip_create_ami_event(event, ami), ast_free);
2035 sip_subscription_to_ami(sub, &buf);
2036 astman_append(ami->s, "%s\r\n", ast_str_buffer(buf));
2040 static int ami_subscription_detail_inbound(struct ast_sip_subscription *sub, void *arg)
2042 return sub->role == AST_SIP_NOTIFIER ? ami_subscription_detail(
2043 sub, arg, "InboundSubscriptionDetail") : 0;
2046 static int ami_subscription_detail_outbound(struct ast_sip_subscription *sub, void *arg)
2048 return sub->role == AST_SIP_SUBSCRIBER ? ami_subscription_detail(
2049 sub, arg, "OutboundSubscriptionDetail") : 0;
2052 static int ami_show_subscriptions_inbound(struct mansession *s, const struct message *m)
2054 struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
2057 astman_send_listack(s, m, "Following are Events for "
2058 "each inbound Subscription", "start");
2060 num = for_each_subscription(ami_subscription_detail_inbound, &ami);
2062 astman_append(s, "Event: InboundSubscriptionDetailComplete\r\n");
2063 if (!ast_strlen_zero(ami.action_id)) {
2064 astman_append(s, "ActionID: %s\r\n", ami.action_id);
2066 astman_append(s, "EventList: Complete\r\n"
2067 "ListItems: %d\r\n\r\n", num);
2071 static int ami_show_subscriptions_outbound(struct mansession *s, const struct message *m)
2073 struct ast_sip_ami ami = { .s = s, .m = m, .action_id = astman_get_header(m, "ActionID"), };
2076 astman_send_listack(s, m, "Following are Events for "
2077 "each outbound Subscription", "start");
2079 num = for_each_subscription(ami_subscription_detail_outbound, &ami);
2081 astman_append(s, "Event: OutboundSubscriptionDetailComplete\r\n");
2082 if (!ast_strlen_zero(ami.action_id)) {
2083 astman_append(s, "ActionID: %s\r\n", ami.action_id);
2085 astman_append(s, "EventList: Complete\r\n"
2086 "ListItems: %d\r\n\r\n", num);
2090 #define AMI_SHOW_SUBSCRIPTIONS_INBOUND "PJSIPShowSubscriptionsInbound"
2091 #define AMI_SHOW_SUBSCRIPTIONS_OUTBOUND "PJSIPShowSubscriptionsOutbound"
2093 static int persistence_endpoint_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2095 struct subscription_persistence *persistence = obj;
2097 persistence->endpoint = ast_strdup(var->value);
2101 static int persistence_endpoint_struct2str(const void *obj, const intptr_t *args, char **buf)
2103 const struct subscription_persistence *persistence = obj;
2105 *buf = ast_strdup(persistence->endpoint);
2109 static int persistence_tag_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2111 struct subscription_persistence *persistence = obj;
2113 persistence->tag = ast_strdup(var->value);
2117 static int persistence_tag_struct2str(const void *obj, const intptr_t *args, char **buf)
2119 const struct subscription_persistence *persistence = obj;
2121 *buf = ast_strdup(persistence->tag);
2125 static int persistence_expires_str2struct(const struct aco_option *opt, struct ast_variable *var, void *obj)
2127 struct subscription_persistence *persistence = obj;
2128 return ast_get_timeval(var->value, &persistence->expires, ast_tv(0, 0), NULL);
2131 static int persistence_expires_struct2str(const void *obj, const intptr_t *args, char **buf)
2133 const struct subscription_persistence *persistence = obj;
2134 return (ast_asprintf(buf, "%ld", persistence->expires.tv_sec) < 0) ? -1 : 0;
2137 static int resource_endpoint_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2139 struct ast_sip_publication_resource *resource = obj;
2141 ast_free(resource->endpoint);
2142 resource->endpoint = ast_strdup(var->value);
2147 static int resource_event_handler(const struct aco_option *opt, struct ast_variable *var, void *obj)
2149 struct ast_sip_publication_resource *resource = obj;
2150 /* The event configuration name starts with 'event_' so skip past it to get the real name */
2151 const char *event = var->name + 6;
2152 struct ast_variable *item;
2154 if (ast_strlen_zero(event) || ast_strlen_zero(var->value)) {
2158 item = ast_variable_new(event, var->value, "");
2163 if (resource->events) {
2164 item->next = resource->events;
2166 resource->events = item;
2171 static int load_module(void)
2173 static const pj_str_t str_PUBLISH = { "PUBLISH", 7 };
2174 struct ast_sorcery *sorcery = ast_sip_get_sorcery();
2176 pjsip_evsub_init_module(ast_sip_get_pjsip_endpoint());
2178 if (!(sched = ast_sched_context_create())) {
2179 ast_log(LOG_ERROR, "Could not create scheduler for publication expiration\n");
2180 return AST_MODULE_LOAD_FAILURE;
2183 if (ast_sched_start_thread(sched)) {
2184 ast_log(LOG_ERROR, "Could not start scheduler thread for publication expiration\n");
2185 ast_sched_context_destroy(sched);
2186 return AST_MODULE_LOAD_FAILURE;
2189 pjsip_endpt_add_capability(ast_sip_get_pjsip_endpoint(), NULL, PJSIP_H_ALLOW, NULL, 1, &str_PUBLISH);
2191 if (ast_sip_register_service(&pubsub_module)) {
2192 ast_log(LOG_ERROR, "Could not register pubsub service\n");
2193 ast_sched_context_destroy(sched);
2194 return AST_MODULE_LOAD_FAILURE;
2197 ast_sorcery_apply_config(sorcery, "res_pjsip_pubsub");
2198 ast_sorcery_apply_default(sorcery, "subscription_persistence", "astdb", "subscription_persistence");
2199 if (ast_sorcery_object_register(sorcery, "subscription_persistence", subscription_persistence_alloc,
2201 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
2202 ast_sip_unregister_service(&pubsub_module);
2203 ast_sched_context_destroy(sched);
2204 return AST_MODULE_LOAD_FAILURE;
2206 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "packet", "", OPT_CHAR_ARRAY_T, 0,
2207 CHARFLDSET(struct subscription_persistence, packet));
2208 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_name", "", OPT_CHAR_ARRAY_T, 0,
2209 CHARFLDSET(struct subscription_persistence, src_name));
2210 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "src_port", "0", OPT_UINT_T, 0,
2211 FLDSET(struct subscription_persistence, src_port));
2212 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "transport_key", "0", OPT_CHAR_ARRAY_T, 0,
2213 CHARFLDSET(struct subscription_persistence, transport_key));
2214 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_name", "", OPT_CHAR_ARRAY_T, 0,
2215 CHARFLDSET(struct subscription_persistence, local_name));
2216 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "local_port", "0", OPT_UINT_T, 0,
2217 FLDSET(struct subscription_persistence, local_port));
2218 ast_sorcery_object_field_register(sorcery, "subscription_persistence", "cseq", "0", OPT_UINT_T, 0,
2219 FLDSET(struct subscription_persistence, cseq));
2220 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "endpoint", "",
2221 persistence_endpoint_str2struct, persistence_endpoint_struct2str, NULL, 0, 0);
2222 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "tag", "",
2223 persistence_tag_str2struct, persistence_tag_struct2str, NULL, 0, 0);
2224 ast_sorcery_object_field_register_custom(sorcery, "subscription_persistence", "expires", "",
2225 persistence_expires_str2struct, persistence_expires_struct2str, NULL, 0, 0);
2227 ast_sorcery_apply_default(sorcery, "inbound-publication", "config", "pjsip.conf,criteria=type=inbound-publication");
2228 if (ast_sorcery_object_register(sorcery, "inbound-publication", publication_resource_alloc,
2230 ast_log(LOG_ERROR, "Could not register subscription persistence object support\n");
2231 ast_sip_unregister_service(&pubsub_module);
2232 ast_sched_context_destroy(sched);
2233 return AST_MODULE_LOAD_FAILURE;
2235 ast_sorcery_object_field_register(sorcery, "inbound-publication", "type", "", OPT_NOOP_T, 0, 0);
2236 ast_sorcery_object_field_register_custom(sorcery, "inbound-publication", "endpoint", "",
2237 resource_endpoint_handler, NULL, NULL, 0, 0);
2238 ast_sorcery_object_fields_register(sorcery, "inbound-publication", "^event_", resource_event_handler, NULL);
2239 ast_sorcery_reload_object(sorcery, "inbound-publication");
2241 if (ast_test_flag(&ast_options, AST_OPT_FLAG_FULLY_BOOTED)) {
2242 ast_sip_push_task(NULL, subscription_persistence_load, NULL);
2244 stasis_subscribe(ast_manager_get_topic(), subscription_persistence_event_cb, NULL);
2247 ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_INBOUND, EVENT_FLAG_SYSTEM,
2248 ami_show_subscriptions_inbound);
2249 ast_manager_register_xml(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND, EVENT_FLAG_SYSTEM,
2250 ami_show_subscriptions_outbound);
2252 return AST_MODULE_LOAD_SUCCESS;
2255 static int unload_module(void)
2257 ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_OUTBOUND);
2258 ast_manager_unregister(AMI_SHOW_SUBSCRIPTIONS_INBOUND);
2261 ast_sched_context_destroy(sched);
2267 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "PJSIP event resource",
2268 .support_level = AST_MODULE_SUPPORT_CORE,
2269 .load = load_module,
2270 .unload = unload_module,
2271 .load_pri = AST_MODPRI_CHANNEL_DEPEND,