Refactor CEL channel events on top of Stasis-Core
[asterisk/asterisk.git] / main / stasis_bridging.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Kinsey Moore <kmoore@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Bridge Objects
22  *
23  * \author Kinsey Moore <kmoore@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/channel.h"
37 #include "asterisk/stasis_bridging.h"
38 #include "asterisk/stasis_channels.h"
39 #include "asterisk/bridging.h"
40 #include "asterisk/bridging_technology.h"
41
42 #define SNAPSHOT_CHANNELS_BUCKETS 13
43
44 /*!
45  * @{ \brief Define bridge message types.
46  */
47 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
48 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
49 STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
50 STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
51 /*! @} */
52
53 /*! \brief Aggregate topic for bridge messages */
54 static struct stasis_topic *bridge_topic_all;
55
56 /*! \brief Caching aggregate topic for bridge snapshots */
57 static struct stasis_caching_topic *bridge_topic_all_cached;
58
59 /*! \brief Topic pool for individual bridge topics */
60 static struct stasis_topic_pool *bridge_topic_pool;
61
62 /*! \brief Destructor for bridge snapshots */
63 static void bridge_snapshot_dtor(void *obj)
64 {
65         struct ast_bridge_snapshot *snapshot = obj;
66         ast_string_field_free_memory(snapshot);
67         ao2_cleanup(snapshot->channels);
68         snapshot->channels = NULL;
69 }
70
71 struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge)
72 {
73         RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
74         struct ast_bridge_channel *bridge_channel;
75
76         snapshot = ao2_alloc(sizeof(*snapshot), bridge_snapshot_dtor);
77         if (!snapshot || ast_string_field_init(snapshot, 128)) {
78                 return NULL;
79         }
80
81         snapshot->channels = ast_str_container_alloc(SNAPSHOT_CHANNELS_BUCKETS);
82         if (!snapshot->channels) {
83                 return NULL;
84         }
85
86         AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {
87                 if (ast_str_container_add(snapshot->channels,
88                                 ast_channel_uniqueid(bridge_channel->chan))) {
89                         return NULL;
90                 }
91         }
92
93         ast_string_field_set(snapshot, uniqueid, bridge->uniqueid);
94         ast_string_field_set(snapshot, technology, bridge->technology->name);
95
96         snapshot->feature_flags = bridge->feature_flags;
97         snapshot->capabilities = bridge->technology->capabilities;
98         snapshot->num_channels = bridge->num_channels;
99         snapshot->num_active = bridge->num_active;
100
101         ao2_ref(snapshot, +1);
102         return snapshot;
103 }
104
105 struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
106 {
107         struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
108         if (!bridge_topic) {
109                 return ast_bridge_topic_all();
110         }
111         return bridge_topic;
112 }
113
114 struct stasis_topic *ast_bridge_topic_all(void)
115 {
116         return bridge_topic_all;
117 }
118
119 struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
120 {
121         return bridge_topic_all_cached;
122 }
123
124 void ast_bridge_publish_state(struct ast_bridge *bridge)
125 {
126         RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
127         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
128
129         ast_assert(bridge != NULL);
130
131         snapshot = ast_bridge_snapshot_create(bridge);
132         if (!snapshot) {
133                 return;
134         }
135
136         msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
137         if (!msg) {
138                 return;
139         }
140
141         stasis_publish(ast_bridge_topic(bridge), msg);
142 }
143
144 static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
145 {
146         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
147
148         ast_assert(obj != NULL);
149
150         msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
151         if (!msg) {
152                 return;
153         }
154
155         stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
156 }
157
158 /*! \brief Destructor for bridge merge messages */
159 static void bridge_merge_message_dtor(void *obj)
160 {
161         struct ast_bridge_merge_message *msg = obj;
162
163         ao2_cleanup(msg->to);
164         msg->to = NULL;
165         ao2_cleanup(msg->from);
166         msg->from = NULL;
167 }
168
169 /*! \brief Bridge merge message creation helper */
170 static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_bridge *to, struct ast_bridge *from)
171 {
172         RAII_VAR(struct ast_bridge_merge_message *, msg, NULL, ao2_cleanup);
173
174         msg = ao2_alloc(sizeof(*msg), bridge_merge_message_dtor);
175         if (!msg) {
176                 return NULL;
177         }
178
179         msg->to = ast_bridge_snapshot_create(to);
180         if (!msg->to) {
181                 return NULL;
182         }
183
184         msg->from = ast_bridge_snapshot_create(from);
185         if (!msg->from) {
186                 return NULL;
187         }
188
189         ao2_ref(msg, +1);
190         return msg;
191 }
192
193 void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
194 {
195         RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
196         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
197
198         ast_assert(to != NULL);
199         ast_assert(from != NULL);
200
201         merge_msg = bridge_merge_message_create(to, from);
202         if (!merge_msg) {
203                 return;
204         }
205
206         msg = stasis_message_create(ast_bridge_merge_message_type(), merge_msg);
207         if (!msg) {
208                 return;
209         }
210
211         stasis_publish(ast_bridge_topic_all(), msg);
212 }
213
214 static void bridge_blob_dtor(void *obj)
215 {
216         struct ast_bridge_blob *event = obj;
217         ao2_cleanup(event->bridge);
218         event->bridge = NULL;
219         ao2_cleanup(event->channel);
220         event->channel = NULL;
221         ast_json_unref(event->blob);
222         event->blob = NULL;
223 }
224
225 struct stasis_message *ast_bridge_blob_create(
226         struct stasis_message_type *message_type,
227         struct ast_bridge *bridge,
228         struct ast_channel *chan,
229         struct ast_json *blob)
230 {
231         RAII_VAR(struct ast_bridge_blob *, obj, NULL, ao2_cleanup);
232         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
233
234         obj = ao2_alloc(sizeof(*obj), bridge_blob_dtor);
235         if (!obj) {
236                 return NULL;
237         }
238
239         if (bridge) {
240                 obj->bridge = ast_bridge_snapshot_create(bridge);
241                 if (obj->bridge == NULL) {
242                         return NULL;
243                 }
244         }
245
246         if (chan) {
247                 obj->channel = ast_channel_snapshot_create(chan);
248                 if (obj->channel == NULL) {
249                         return NULL;
250                 }
251         }
252
253         if (blob) {
254                 obj->blob = ast_json_ref(blob);
255         }
256
257         msg = stasis_message_create(message_type, obj);
258         if (!msg) {
259                 return NULL;
260         }
261
262         ao2_ref(msg, +1);
263         return msg;
264 }
265
266 void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *chan)
267 {
268         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
269
270         msg = ast_bridge_blob_create(ast_channel_entered_bridge_type(), bridge, chan, NULL);
271         if (!msg) {
272                 return;
273         }
274
275         /* enter blob first, then state */
276         stasis_publish(ast_bridge_topic(bridge), msg);
277         bridge_publish_state_from_blob(stasis_message_data(msg));
278 }
279
280 void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
281 {
282         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
283
284         msg = ast_bridge_blob_create(ast_channel_left_bridge_type(), bridge, chan, NULL);
285         if (!msg) {
286                 return;
287         }
288
289         /* state first, then leave blob (opposite of enter, preserves nesting of events) */
290         bridge_publish_state_from_blob(stasis_message_data(msg));
291         stasis_publish(ast_bridge_topic(bridge), msg);
292 }
293
294 struct ast_json *ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot)
295 {
296         RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
297         int r = 0;
298
299         if (snapshot == NULL) {
300                 return NULL;
301         }
302
303         json_chan = ast_json_object_create();
304         if (!json_chan) { ast_log(LOG_ERROR, "Error creating channel json object\n"); return NULL; }
305
306         r = ast_json_object_set(json_chan, "bridge-uniqueid", ast_json_string_create(snapshot->uniqueid));
307         if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
308         r = ast_json_object_set(json_chan, "bridge-technology", ast_json_string_create(snapshot->technology));
309         if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
310
311         return ast_json_ref(json_chan);
312 }
313
314 struct ast_bridge_snapshot *ast_bridge_snapshot_get_latest(const char *uniqueid)
315 {
316         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
317         struct ast_bridge_snapshot *snapshot;
318
319         ast_assert(!ast_strlen_zero(uniqueid));
320
321         message = stasis_cache_get(ast_bridge_topic_all_cached(),
322                         ast_bridge_snapshot_type(),
323                         uniqueid);
324         if (!message) {
325                 return NULL;
326         }
327
328         snapshot = stasis_message_data(message);
329         if (!snapshot) {
330                 return NULL;
331         }
332         ao2_ref(snapshot, +1);
333         return snapshot;
334 }
335
336 static void stasis_bridging_cleanup(void)
337 {
338         ao2_cleanup(bridge_topic_all);
339         bridge_topic_all = NULL;
340         bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
341                 bridge_topic_all_cached);
342         ao2_cleanup(bridge_topic_pool);
343         bridge_topic_pool = NULL;
344
345         STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
346         STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
347         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
348         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
349 }
350
351 /*! \brief snapshot ID getter for caching topic */
352 static const char *bridge_snapshot_get_id(struct stasis_message *msg)
353 {
354         struct ast_bridge_snapshot *snapshot;
355         if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
356                 return NULL;
357         }
358         snapshot = stasis_message_data(msg);
359         return snapshot->uniqueid;
360 }
361
362 int ast_stasis_bridging_init(void)
363 {
364         ast_register_cleanup(stasis_bridging_cleanup);
365
366         STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
367         STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
368         STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
369         STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
370         bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
371         bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
372         bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
373         return !bridge_topic_all
374                 || !bridge_topic_all_cached
375                 || !bridge_topic_pool ? -1 : 0;
376 }