2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Asterisk endpoint API.
23 * \author David M. Lee, II <dlee@digium.com>
27 <support_level>core</support_level>
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34 #include "asterisk/astobj2.h"
35 #include "asterisk/endpoints.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/stringfields.h"
42 /*! Buckets for endpoint->channel mappings. Keep it prime! */
43 #define ENDPOINT_BUCKETS 127
46 AST_DECLARE_STRING_FIELDS(
47 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
48 AST_STRING_FIELD(resource); /*!< Name, unique to the tech. */
49 AST_STRING_FIELD(id); /*!< tech/resource id */
51 /*! Endpoint's current state */
52 enum ast_endpoint_state state;
54 * \brief Max channels for this endpoint. -1 means unlimited or unknown.
56 * Note that this simply documents the limits of an endpoint, and does
57 * nothing to try to enforce the limit.
60 /*! Topic for this endpoint's messages */
61 struct stasis_topic *topic;
63 * Forwarding subscription sending messages to ast_endpoint_topic_all()
65 struct stasis_subscription *forward;
66 /*! Router for handling this endpoint's messages */
67 struct stasis_message_router *router;
68 /*! ast_str_container of channels associated with this endpoint */
69 struct ao2_container *channel_ids;
72 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
75 case AST_ENDPOINT_UNKNOWN:
77 case AST_ENDPOINT_OFFLINE:
79 case AST_ENDPOINT_ONLINE:
85 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
87 RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
88 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
90 ast_assert(endpoint != NULL);
91 ast_assert(endpoint->topic != NULL);
93 snapshot = ast_endpoint_snapshot_create(endpoint);
97 message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
101 stasis_publish(endpoint->topic, message);
104 static void endpoint_dtor(void *obj)
106 struct ast_endpoint *endpoint = obj;
108 /* The router should be shut down already */
109 ast_assert(stasis_message_router_is_done(endpoint->router));
110 ao2_cleanup(endpoint->router);
111 endpoint->router = NULL;
113 stasis_unsubscribe(endpoint->forward);
114 endpoint->forward = NULL;
116 ao2_cleanup(endpoint->topic);
117 endpoint->topic = NULL;
119 ast_string_field_free_memory(endpoint);
122 static void endpoint_channel_snapshot(void *data,
123 struct stasis_subscription *sub, struct stasis_topic *topic,
124 struct stasis_message *message)
126 struct ast_endpoint *endpoint = data;
127 struct ast_channel_snapshot *snapshot = stasis_message_data(message);
128 RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
131 ast_assert(endpoint != NULL);
132 ast_assert(snapshot != NULL);
135 existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid,
138 ast_str_container_add(endpoint->channel_ids,
142 ao2_unlock(endpoint);
144 endpoint_publish_snapshot(endpoint);
148 /*! \brief Handler for channel snapshot cache clears */
149 static void endpoint_cache_clear(void *data,
150 struct stasis_subscription *sub, struct stasis_topic *topic,
151 struct stasis_message *message)
153 struct ast_endpoint *endpoint = data;
154 struct stasis_message *clear_msg = stasis_message_data(message);
155 struct ast_channel_snapshot *clear_snapshot;
157 if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
161 clear_snapshot = stasis_message_data(clear_msg);
163 ast_assert(endpoint != NULL);
166 ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
167 ao2_unlock(endpoint);
168 endpoint_publish_snapshot(endpoint);
171 static void endpoint_default(void *data,
172 struct stasis_subscription *sub, struct stasis_topic *topic,
173 struct stasis_message *message)
175 struct stasis_endpoint *endpoint = data;
177 if (stasis_subscription_final_message(sub, message)) {
178 ao2_cleanup(endpoint);
182 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
184 RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
187 if (ast_strlen_zero(tech)) {
188 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
192 if (ast_strlen_zero(resource)) {
193 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
197 endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
202 endpoint->max_channels = -1;
203 endpoint->state = AST_ENDPOINT_UNKNOWN;
205 if (ast_string_field_init(endpoint, 80) != 0) {
209 ast_string_field_set(endpoint, tech, tech);
210 ast_string_field_set(endpoint, resource, resource);
211 ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
213 /* All access to channel_ids should be covered by the endpoint's
214 * lock; no extra lock needed. */
215 endpoint->channel_ids = ast_str_container_alloc_options(
216 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS);
217 if (!endpoint->channel_ids) {
221 endpoint->topic = stasis_topic_create(endpoint->id);
222 if (!endpoint->topic) {
227 stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
228 if (!endpoint->forward) {
232 endpoint->router = stasis_message_router_create(endpoint->topic);
233 if (!endpoint->router) {
236 r |= stasis_message_router_add(endpoint->router,
237 ast_channel_snapshot_type(), endpoint_channel_snapshot,
239 r |= stasis_message_router_add(endpoint->router,
240 stasis_cache_clear_type(), endpoint_cache_clear,
242 r |= stasis_message_router_set_default(endpoint->router,
243 endpoint_default, endpoint);
245 endpoint_publish_snapshot(endpoint);
247 ao2_ref(endpoint, +1);
251 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
253 ast_assert(endpoint != NULL);
254 return endpoint->tech;
257 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
259 RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
260 snapshot = ast_endpoint_snapshot_create(endpoint);
265 return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
268 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
270 RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
272 if (endpoint == NULL) {
276 clear_msg = create_endpoint_snapshot_message(endpoint);
278 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
279 message = stasis_cache_clear_create(clear_msg);
281 stasis_publish(endpoint->topic, message);
285 /* Bump refcount to hold on to the router */
286 ao2_ref(endpoint->router, +1);
287 stasis_message_router_unsubscribe(endpoint->router);
290 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
292 return endpoint->resource;
295 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
297 return endpoint ? endpoint->topic : ast_endpoint_topic_all();
300 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
301 enum ast_endpoint_state state)
303 ast_assert(endpoint != NULL);
305 endpoint->state = state;
306 ao2_unlock(endpoint);
307 endpoint_publish_snapshot(endpoint);
310 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
313 ast_assert(endpoint != NULL);
315 endpoint->max_channels = max_channels;
316 ao2_unlock(endpoint);
317 endpoint_publish_snapshot(endpoint);
320 static void endpoint_snapshot_dtor(void *obj)
322 struct ast_endpoint_snapshot *snapshot = obj;
324 ast_assert(snapshot != NULL);
325 ast_string_field_free_memory(snapshot);
328 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
329 struct ast_endpoint *endpoint)
331 RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
333 struct ao2_iterator i;
335 SCOPED_AO2LOCK(lock, endpoint);
337 channel_count = ao2_container_count(endpoint->channel_ids);
339 snapshot = ao2_alloc(
340 sizeof(*snapshot) + channel_count * sizeof(char *),
341 endpoint_snapshot_dtor);
343 if (ast_string_field_init(snapshot, 80) != 0) {
347 ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
349 ast_string_field_set(snapshot, tech, endpoint->tech);
350 ast_string_field_set(snapshot, resource, endpoint->resource);
352 snapshot->state = endpoint->state;
353 snapshot->max_channels = endpoint->max_channels;
355 i = ao2_iterator_init(endpoint->channel_ids, 0);
356 while ((obj = ao2_iterator_next(&i))) {
357 RAII_VAR(char *, channel_id, obj, ao2_cleanup);
358 snapshot->channel_ids[snapshot->num_channels++] = channel_id;
361 ao2_ref(snapshot, +1);