Avoid unnecessary cleanups during immediate shutdown
[asterisk/asterisk.git] / main / stasis_bridging.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Kinsey Moore <kmoore@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Messages and Data Types for Bridge Objects
22  *
23  * \author Kinsey Moore <kmoore@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/channel.h"
37 #include "asterisk/stasis_bridging.h"
38 #include "asterisk/stasis_channels.h"
39 #include "asterisk/bridging.h"
40 #include "asterisk/bridging_technology.h"
41
42 #define SNAPSHOT_CHANNELS_BUCKETS 13
43
44 /*!
45  * @{ \brief Define bridge message types.
46  */
47 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_snapshot_type);
48 STASIS_MESSAGE_TYPE_DEFN(ast_bridge_merge_message_type);
49 STASIS_MESSAGE_TYPE_DEFN(ast_channel_entered_bridge_type);
50 STASIS_MESSAGE_TYPE_DEFN(ast_channel_left_bridge_type);
51 /*! @} */
52
53 /*! \brief Aggregate topic for bridge messages */
54 static struct stasis_topic *bridge_topic_all;
55
56 /*! \brief Caching aggregate topic for bridge snapshots */
57 static struct stasis_caching_topic *bridge_topic_all_cached;
58
59 /*! \brief Topic pool for individual bridge topics */
60 static struct stasis_topic_pool *bridge_topic_pool;
61
62 /*! \brief Destructor for bridge snapshots */
63 static void bridge_snapshot_dtor(void *obj)
64 {
65         struct ast_bridge_snapshot *snapshot = obj;
66         ast_string_field_free_memory(snapshot);
67         ao2_cleanup(snapshot->channels);
68         snapshot->channels = NULL;
69 }
70
71 struct ast_bridge_snapshot *ast_bridge_snapshot_create(struct ast_bridge *bridge)
72 {
73         RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
74         struct ast_bridge_channel *bridge_channel;
75
76         snapshot = ao2_alloc(sizeof(*snapshot), bridge_snapshot_dtor);
77         if (!snapshot || ast_string_field_init(snapshot, 128)) {
78                 return NULL;
79         }
80
81         snapshot->channels = ast_str_container_alloc(SNAPSHOT_CHANNELS_BUCKETS);
82         if (!snapshot->channels) {
83                 return NULL;
84         }
85
86         AST_LIST_TRAVERSE(&bridge->channels, bridge_channel, entry) {
87                 if (ast_str_container_add(snapshot->channels,
88                                 ast_channel_uniqueid(bridge_channel->chan))) {
89                         return NULL;
90                 }
91         }
92
93         ast_string_field_set(snapshot, uniqueid, bridge->uniqueid);
94         ast_string_field_set(snapshot, technology, bridge->technology->name);
95
96         snapshot->feature_flags = bridge->feature_flags;
97         snapshot->num_channels = bridge->num_channels;
98         snapshot->num_active = bridge->num_active;
99
100         ao2_ref(snapshot, +1);
101         return snapshot;
102 }
103
104 struct stasis_topic *ast_bridge_topic(struct ast_bridge *bridge)
105 {
106         struct stasis_topic *bridge_topic = stasis_topic_pool_get_topic(bridge_topic_pool, bridge->uniqueid);
107         if (!bridge_topic) {
108                 return ast_bridge_topic_all();
109         }
110         return bridge_topic;
111 }
112
113 struct stasis_topic *ast_bridge_topic_all(void)
114 {
115         return bridge_topic_all;
116 }
117
118 struct stasis_caching_topic *ast_bridge_topic_all_cached(void)
119 {
120         return bridge_topic_all_cached;
121 }
122
123 void ast_bridge_publish_state(struct ast_bridge *bridge)
124 {
125         RAII_VAR(struct ast_bridge_snapshot *, snapshot, NULL, ao2_cleanup);
126         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
127
128         ast_assert(bridge != NULL);
129
130         snapshot = ast_bridge_snapshot_create(bridge);
131         if (!snapshot) {
132                 return;
133         }
134
135         msg = stasis_message_create(ast_bridge_snapshot_type(), snapshot);
136         if (!msg) {
137                 return;
138         }
139
140         stasis_publish(ast_bridge_topic(bridge), msg);
141 }
142
143 static void bridge_publish_state_from_blob(struct ast_bridge_blob *obj)
144 {
145         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
146
147         ast_assert(obj != NULL);
148
149         msg = stasis_message_create(ast_bridge_snapshot_type(), obj->bridge);
150         if (!msg) {
151                 return;
152         }
153
154         stasis_publish(stasis_topic_pool_get_topic(bridge_topic_pool, obj->bridge->uniqueid), msg);
155 }
156
157 /*! \brief Destructor for bridge merge messages */
158 static void bridge_merge_message_dtor(void *obj)
159 {
160         struct ast_bridge_merge_message *msg = obj;
161
162         ao2_cleanup(msg->to);
163         msg->to = NULL;
164         ao2_cleanup(msg->from);
165         msg->from = NULL;
166 }
167
168 /*! \brief Bridge merge message creation helper */
169 static struct ast_bridge_merge_message *bridge_merge_message_create(struct ast_bridge *to, struct ast_bridge *from)
170 {
171         RAII_VAR(struct ast_bridge_merge_message *, msg, NULL, ao2_cleanup);
172
173         msg = ao2_alloc(sizeof(*msg), bridge_merge_message_dtor);
174         if (!msg) {
175                 return NULL;
176         }
177
178         msg->to = ast_bridge_snapshot_create(to);
179         if (!msg->to) {
180                 return NULL;
181         }
182
183         msg->from = ast_bridge_snapshot_create(from);
184         if (!msg->from) {
185                 return NULL;
186         }
187
188         ao2_ref(msg, +1);
189         return msg;
190 }
191
192 void ast_bridge_publish_merge(struct ast_bridge *to, struct ast_bridge *from)
193 {
194         RAII_VAR(struct ast_bridge_merge_message *, merge_msg, NULL, ao2_cleanup);
195         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
196
197         ast_assert(to != NULL);
198         ast_assert(from != NULL);
199
200         merge_msg = bridge_merge_message_create(to, from);
201         if (!merge_msg) {
202                 return;
203         }
204
205         msg = stasis_message_create(ast_bridge_merge_message_type(), merge_msg);
206         if (!msg) {
207                 return;
208         }
209
210         stasis_publish(ast_bridge_topic_all(), msg);
211 }
212
213 static void bridge_blob_dtor(void *obj)
214 {
215         struct ast_bridge_blob *event = obj;
216         ao2_cleanup(event->bridge);
217         event->bridge = NULL;
218         ao2_cleanup(event->channel);
219         event->channel = NULL;
220         ast_json_unref(event->blob);
221         event->blob = NULL;
222 }
223
224 struct stasis_message *ast_bridge_blob_create(
225         struct stasis_message_type *message_type,
226         struct ast_bridge *bridge,
227         struct ast_channel *chan,
228         struct ast_json *blob)
229 {
230         RAII_VAR(struct ast_bridge_blob *, obj, NULL, ao2_cleanup);
231         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
232
233         obj = ao2_alloc(sizeof(*obj), bridge_blob_dtor);
234         if (!obj) {
235                 return NULL;
236         }
237
238         if (bridge) {
239                 obj->bridge = ast_bridge_snapshot_create(bridge);
240                 if (obj->bridge == NULL) {
241                         return NULL;
242                 }
243         }
244
245         if (chan) {
246                 obj->channel = ast_channel_snapshot_create(chan);
247                 if (obj->channel == NULL) {
248                         return NULL;
249                 }
250         }
251
252         if (blob) {
253                 obj->blob = ast_json_ref(blob);
254         }
255
256         msg = stasis_message_create(message_type, obj);
257         if (!msg) {
258                 return NULL;
259         }
260
261         ao2_ref(msg, +1);
262         return msg;
263 }
264
265 const char *ast_bridge_blob_json_type(struct ast_bridge_blob *obj)
266 {
267         if (obj == NULL) {
268                 return NULL;
269         }
270
271         return ast_json_string_get(ast_json_object_get(obj->blob, "type"));
272 }
273
274 void ast_bridge_publish_enter(struct ast_bridge *bridge, struct ast_channel *chan)
275 {
276         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
277
278         msg = ast_bridge_blob_create(ast_channel_entered_bridge_type(), bridge, chan, NULL);
279         if (!msg) {
280                 return;
281         }
282
283         /* enter blob first, then state */
284         stasis_publish(ast_bridge_topic(bridge), msg);
285         bridge_publish_state_from_blob(stasis_message_data(msg));
286 }
287
288 void ast_bridge_publish_leave(struct ast_bridge *bridge, struct ast_channel *chan)
289 {
290         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
291
292         msg = ast_bridge_blob_create(ast_channel_left_bridge_type(), bridge, chan, NULL);
293         if (!msg) {
294                 return;
295         }
296
297         /* state first, then leave blob (opposite of enter, preserves nesting of events) */
298         bridge_publish_state_from_blob(stasis_message_data(msg));
299         stasis_publish(ast_bridge_topic(bridge), msg);
300 }
301
302 struct ast_json *ast_bridge_snapshot_to_json(const struct ast_bridge_snapshot *snapshot)
303 {
304         RAII_VAR(struct ast_json *, json_chan, NULL, ast_json_unref);
305         int r = 0;
306
307         if (snapshot == NULL) {
308                 return NULL;
309         }
310
311         json_chan = ast_json_object_create();
312         if (!json_chan) { ast_log(LOG_ERROR, "Error creating channel json object\n"); return NULL; }
313
314         r = ast_json_object_set(json_chan, "bridge-uniqueid", ast_json_string_create(snapshot->uniqueid));
315         if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
316         r = ast_json_object_set(json_chan, "bridge-technology", ast_json_string_create(snapshot->technology));
317         if (r) { ast_log(LOG_ERROR, "Error adding attrib to channel json object\n"); return NULL; }
318
319         return ast_json_ref(json_chan);
320 }
321
322 static void stasis_bridging_cleanup(void)
323 {
324         ao2_cleanup(bridge_topic_all);
325         bridge_topic_all = NULL;
326         bridge_topic_all_cached = stasis_caching_unsubscribe_and_join(
327                 bridge_topic_all_cached);
328         ao2_cleanup(bridge_topic_pool);
329         bridge_topic_pool = NULL;
330
331         STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_snapshot_type);
332         STASIS_MESSAGE_TYPE_CLEANUP(ast_bridge_merge_message_type);
333         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_entered_bridge_type);
334         STASIS_MESSAGE_TYPE_CLEANUP(ast_channel_left_bridge_type);
335 }
336
337 /*! \brief snapshot ID getter for caching topic */
338 static const char *bridge_snapshot_get_id(struct stasis_message *msg)
339 {
340         struct ast_bridge_snapshot *snapshot;
341         if (stasis_message_type(msg) != ast_bridge_snapshot_type()) {
342                 return NULL;
343         }
344         snapshot = stasis_message_data(msg);
345         return snapshot->uniqueid;
346 }
347
348 int ast_stasis_bridging_init(void)
349 {
350         ast_register_cleanup(stasis_bridging_cleanup);
351
352         STASIS_MESSAGE_TYPE_INIT(ast_bridge_snapshot_type);
353         STASIS_MESSAGE_TYPE_INIT(ast_bridge_merge_message_type);
354         STASIS_MESSAGE_TYPE_INIT(ast_channel_entered_bridge_type);
355         STASIS_MESSAGE_TYPE_INIT(ast_channel_left_bridge_type);
356         bridge_topic_all = stasis_topic_create("ast_bridge_topic_all");
357         bridge_topic_all_cached = stasis_caching_topic_create(bridge_topic_all, bridge_snapshot_get_id);
358         bridge_topic_pool = stasis_topic_pool_create(bridge_topic_all);
359         return !bridge_topic_all
360                 || !bridge_topic_all_cached
361                 || !bridge_topic_pool ? -1 : 0;
362 }