2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * Kinsey Moore <kmoore@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis Messages and Data Types for Bridge Objects
23 * \author Kinsey Moore <kmoore@digium.com>
27 <support_level>core</support_level>
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/channel.h"
37 #include "asterisk/stasis_bridging.h"
38 #include "asterisk/stasis_channels.h"
39 #include "asterisk/bridging.h"
40 #include "asterisk/bridging_technology.h"
42 #define SNAPSHOT_CHANNELS_BUCKETS 13
45 * @{ \brief Define bridge message types.
47 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
48 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
49 STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
50 STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
53 /*! \brief Aggregate topic for bridge messages */
54 static struct stasis_topic *bridge_topic_all;
56 /*! \brief Caching aggregate topic for bridge snapshots */
57 static struct stasis_caching_topic *bridge_topic_all_cached;
59 /*! \brief Topic pool for individual bridge topics */
60 static struct stasis_topic_pool *bridge_topic_pool;
62 /*! \brief Destructor for bridge snapshots */
63 static void bridge_snapshot_dtor(void *obj)
65 struct ast_bridge_snapshot *snapshot = obj;
66 ast_string_field_free_memory(snapshot);
67 ao2_cleanup(snapshot->channels);
68 snapshot->channels = NULL;
71 struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge)
73 RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
74 struct ast_bridge_channel *bridge_channel;
76 snapshot = ao2_alloc(sizeof(*snapshot), bridge_snapshot_dtor);
77 if (!snapshot || ast_string_field_init(snapshot, 128)) {
81 snapshot->channels = ast_str_container_alloc(SNAPSHOT_CHANNELS_BUCKETS);
82 if (!snapshot->channels) {
86 AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {
87 if (ast_str_container_add(snapshot->channels,
88 ast_channel_uniqueid(bridge_channel->chan))) {
93 ast_string_field_set(snapshot, uniqueid, bridge->uniqueid);
94 ast_string_field_set(snapshot, technology, bridge->technology->name);
96 snapshot->feature_flags = bridge->feature_flags;
97 snapshot->capabilities = bridge->technology->capabilities;
98 snapshot->num_channels = bridge->num_channels;
99 snapshot->num_active = bridge->num_active;
101 ao2_ref(snapshot, +1);
105 struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
107 struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
109 return ast_bridge_topic_all();
114 struct stasis_topic *ast_bridge_topic_all(void)
116 return bridge_topic_all;
119 struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
121 return bridge_topic_all_cached;
124 void ast_bridge_publish_state(struct ast_bridge *bridge)
126 RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
127 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
129 ast_assert(bridge != NULL);
131 snapshot = ast_bridge_snapshot_create(bridge);
136 msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
141 stasis_publish(ast_bridge_topic(bridge), msg);
144 static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
146 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
148 ast_assert(obj != NULL);
150 msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
155 stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
158 /*! \brief Destructor for bridge merge messages */
159 static void bridge_merge_message_dtor(void *obj)
161 struct ast_bridge_merge_message *msg = obj;
163 ao2_cleanup(msg->to);
165 ao2_cleanup(msg->from);
169 /*! \brief Bridge merge message creation helper */
170 static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_bridge *to, struct ast_bridge *from)
172 RAII_VAR(struct ast_bridge_merge_message *, msg, NULL, ao2_cleanup);
174 msg = ao2_alloc(sizeof(*msg), bridge_merge_message_dtor);
179 msg->to = ast_bridge_snapshot_create(to);
184 msg->from = ast_bridge_snapshot_create(from);
193 void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
195 RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
196 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
198 ast_assert(to != NULL);
199 ast_assert(from != NULL);
201 merge_msg = bridge_merge_message_create(to, from);
206 msg = stasis_message_create(ast_bridge_merge_message_type(), merge_msg);
211 stasis_publish(ast_bridge_topic_all(), msg);
214 static void bridge_blob_dtor(void *obj)
216 struct ast_bridge_blob *event = obj;
217 ao2_cleanup(event->bridge);
218 event->bridge = NULL;
219 ao2_cleanup(event->channel);
220 event->channel = NULL;
221 ast_json_unref(event->blob);
225 struct stasis_message *ast_bridge_blob_create(
226 struct stasis_message_type *message_type,
227 struct ast_bridge *bridge,
228 struct ast_channel *chan,
229 struct ast_json *blob)
231 RAII_VAR(struct ast_bridge_blob *, obj, NULL, ao2_cleanup);
232 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
234 obj = ao2_alloc(sizeof(*obj), bridge_blob_dtor);
240 obj->bridge = ast_bridge_snapshot_create(bridge);
241 if (obj->bridge == NULL) {
247 obj->channel = ast_channel_snapshot_create(chan);
248 if (obj->channel == NULL) {
254 obj->blob = ast_json_ref(blob);
257 msg = stasis_message_create(message_type, obj);
266 void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *chan)
268 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
270 msg = ast_bridge_blob_create(ast_channel_entered_bridge_type(), bridge, chan, NULL);
275 /* enter blob first, then state */
276 stasis_publish(ast_bridge_topic(bridge), msg);
277 bridge_publish_state_from_blob(stasis_message_data(msg));
280 void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
282 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
284 msg = ast_bridge_blob_create(ast_channel_left_bridge_type(), bridge, chan, NULL);
289 /* state first, then leave blob (opposite of enter, preserves nesting of events) */
290 bridge_publish_state_from_blob(stasis_message_data(msg));
291 stasis_publish(ast_bridge_topic(bridge), msg);
294 struct ast_json *ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot)
296 RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
299 if (snapshot == NULL) {
303 json_chan = ast_json_object_create();
304 if (!json_chan) { ast_log(LOG_ERROR, "Error creating channel json object\n"); return NULL; }
306 r = ast_json_object_set(json_chan, "bridge-uniqueid", ast_json_string_create(snapshot->uniqueid));
307 if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
308 r = ast_json_object_set(json_chan, "bridge-technology", ast_json_string_create(snapshot->technology));
309 if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
311 return ast_json_ref(json_chan);
314 struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
316 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
317 struct ast_bridge_snapshot *snapshot;
319 ast_assert(!ast_strlen_zero(uniqueid));
321 message = stasis_cache_get(ast_bridge_topic_all_cached(),
322 ast_bridge_snapshot_type(),
328 snapshot = stasis_message_data(message);
332 ao2_ref(snapshot, +1);
336 static void stasis_bridging_cleanup(void)
338 ao2_cleanup(bridge_topic_all);
339 bridge_topic_all = NULL;
340 bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
341 bridge_topic_all_cached);
342 ao2_cleanup(bridge_topic_pool);
343 bridge_topic_pool = NULL;
345 STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
346 STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
347 STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
348 STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
351 /*! \brief snapshot ID getter for caching topic */
352 static const char *bridge_snapshot_get_id(struct stasis_message *msg)
354 struct ast_bridge_snapshot *snapshot;
355 if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
358 snapshot = stasis_message_data(msg);
359 return snapshot->uniqueid;
362 int ast_stasis_bridging_init(void)
364 ast_register_cleanup(stasis_bridging_cleanup);
366 STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
367 STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
368 STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
369 STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
370 bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
371 bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
372 bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
373 return !bridge_topic_all
374 || !bridge_topic_all_cached
375 || !bridge_topic_pool ? -1 : 0;