0a9669d3bc82cd77caad9b21f7011e38dba1c8e9
[asterisk/asterisk.git] / res / stasis / control.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application control support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "asterisk/stasis_channels.h"
31
32 #include "command.h"
33 #include "control.h"
34 #include "app.h"
35 #include "asterisk/dial.h"
36 #include "asterisk/bridge.h"
37 #include "asterisk/bridge_after.h"
38 #include "asterisk/bridge_basic.h"
39 #include "asterisk/frame.h"
40 #include "asterisk/pbx.h"
41 #include "asterisk/musiconhold.h"
42 #include "asterisk/app.h"
43
44 AST_LIST_HEAD(app_control_rules, stasis_app_control_rule);
45
46 struct stasis_app_control {
47         ast_cond_t wait_cond;
48         /*! Queue of commands to dispatch on the channel */
49         struct ao2_container *command_queue;
50         /*!
51          * The associated channel.
52          * Be very careful with the threading associated w/ manipulating
53          * the channel.
54          */
55         struct ast_channel *channel;
56         /*!
57          * When a channel is in a bridge, the bridge that it is in.
58          */
59         struct ast_bridge *bridge;
60         /*!
61          * Holding place for channel's PBX while imparted to a bridge.
62          */
63         struct ast_pbx *pbx;
64         /*!
65          * A list of rules to check before adding a channel to a bridge.
66          */
67         struct app_control_rules add_rules;
68         /*!
69          * A list of rules to check before removing a channel from a bridge.
70          */
71         struct app_control_rules remove_rules;
72         /*!
73          * Silence generator, when silence is being generated.
74          */
75         struct ast_silence_generator *silgen;
76         /*!
77          * The app for which this control was created
78          */
79         struct stasis_app *app;
80         /*!
81          * When set, /c app_stasis should exit and continue in the dialplan.
82          */
83         int is_done:1;
84 };
85
86 static void control_dtor(void *obj)
87 {
88         struct stasis_app_control *control = obj;
89
90         AST_LIST_HEAD_DESTROY(&control->add_rules);
91         AST_LIST_HEAD_DESTROY(&control->remove_rules);
92
93         /* We may have a lingering silence generator; free it */
94         ast_channel_stop_silence_generator(control->channel, control->silgen);
95         control->silgen = NULL;
96
97         ao2_cleanup(control->command_queue);
98         ast_cond_destroy(&control->wait_cond);
99         ao2_cleanup(control->app);
100 }
101
102 struct stasis_app_control *control_create(struct ast_channel *channel, struct stasis_app *app)
103 {
104         RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
105         int res;
106
107         control = ao2_alloc(sizeof(*control), control_dtor);
108         if (!control) {
109                 return NULL;
110         }
111
112         control->app = ao2_bump(app);
113
114         res = ast_cond_init(&control->wait_cond, NULL);
115         if (res != 0) {
116                 ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n",
117                         strerror(errno));
118                 return NULL;
119         }
120
121         control->command_queue = ao2_container_alloc_list(
122                 AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL);
123
124         if (!control->command_queue) {
125                 return NULL;
126         }
127
128         control->channel = channel;
129
130         AST_LIST_HEAD_INIT(&control->add_rules);
131         AST_LIST_HEAD_INIT(&control->remove_rules);
132
133         ao2_ref(control, +1);
134         return control;
135 }
136
137 static void app_control_register_rule(
138         const struct stasis_app_control *control,
139         struct app_control_rules *list, struct stasis_app_control_rule *obj)
140 {
141         SCOPED_AO2LOCK(lock, control->command_queue);
142         AST_LIST_INSERT_TAIL(list, obj, next);
143 }
144
145 static void app_control_unregister_rule(
146         const struct stasis_app_control *control,
147         struct app_control_rules *list, struct stasis_app_control_rule *obj)
148 {
149         struct stasis_app_control_rule *rule;
150         SCOPED_AO2LOCK(lock, control->command_queue);
151         AST_RWLIST_TRAVERSE_SAFE_BEGIN(list, rule, next) {
152                 if (rule == obj) {
153                         AST_RWLIST_REMOVE_CURRENT(next);
154                         break;
155                 }
156         }
157         AST_RWLIST_TRAVERSE_SAFE_END;
158 }
159
160 /*!
161  * \internal
162  * \brief Checks to make sure each rule in the given list passes.
163  *
164  * \details Loops over a list of rules checking for rejections or failures.
165  *          If one rule fails its resulting error code is returned.
166  *
167  * \note Command queue should be locked before calling this function.
168  *
169  * \param control The stasis application control
170  * \param list The list of rules to check
171  *
172  * \retval 0 if all rules pass
173  * \retval non-zero error code if a rule fails
174  */
175 static enum stasis_app_control_channel_result app_control_check_rules(
176         const struct stasis_app_control *control,
177         struct app_control_rules *list)
178 {
179         int res = 0;
180         struct stasis_app_control_rule *rule;
181         AST_LIST_TRAVERSE(list, rule, next) {
182                 if ((res = rule->check_rule(control))) {
183                         return res;
184                 }
185         }
186         return res;
187 }
188
189 void stasis_app_control_register_add_rule(
190         struct stasis_app_control *control,
191         struct stasis_app_control_rule *rule)
192 {
193         return app_control_register_rule(control, &control->add_rules, rule);
194 }
195
196 void stasis_app_control_unregister_add_rule(
197         struct stasis_app_control *control,
198         struct stasis_app_control_rule *rule)
199 {
200         app_control_unregister_rule(control, &control->add_rules, rule);
201 }
202
203 void stasis_app_control_register_remove_rule(
204         struct stasis_app_control *control,
205         struct stasis_app_control_rule *rule)
206 {
207         return app_control_register_rule(control, &control->remove_rules, rule);
208 }
209
210 void stasis_app_control_unregister_remove_rule(
211         struct stasis_app_control *control,
212         struct stasis_app_control_rule *rule)
213 {
214         app_control_unregister_rule(control, &control->remove_rules, rule);
215 }
216
217 static int app_control_can_add_channel_to_bridge(
218         struct stasis_app_control *control)
219 {
220         return app_control_check_rules(control, &control->add_rules);
221 }
222
223 static int app_control_can_remove_channel_from_bridge(
224         struct stasis_app_control *control)
225 {
226         return app_control_check_rules(control, &control->remove_rules);
227 }
228
229 static int noop_cb(struct stasis_app_control *control,
230         struct ast_channel *chan, void *data)
231 {
232         return 0;
233 }
234
235 /*! Callback type to see if the command can execute
236     note: command_queue is locked during callback */
237 typedef int (*app_command_can_exec_cb)(struct stasis_app_control *control);
238
239 static struct stasis_app_command *exec_command_on_condition(
240         struct stasis_app_control *control, stasis_app_command_cb command_fn,
241         void *data, app_command_can_exec_cb can_exec_fn)
242 {
243         int retval;
244         struct stasis_app_command *command;
245
246         command_fn = command_fn ? : noop_cb;
247
248         command = command_create(command_fn, data);
249         if (!command) {
250                 return NULL;
251         }
252
253         ao2_lock(control->command_queue);
254         if (can_exec_fn && (retval = can_exec_fn(control))) {
255                 ao2_unlock(control->command_queue);
256                 command_complete(command, retval);
257                 return command;
258         }
259
260         ao2_link_flags(control->command_queue, command, OBJ_NOLOCK);
261         ast_cond_signal(&control->wait_cond);
262         ao2_unlock(control->command_queue);
263
264         return command;
265 }
266
267 static struct stasis_app_command *exec_command(
268         struct stasis_app_control *control, stasis_app_command_cb command_fn,
269         void *data)
270 {
271         return exec_command_on_condition(control, command_fn, data, NULL);
272 }
273
274 struct stasis_app_control_dial_data {
275         char endpoint[AST_CHANNEL_NAME];
276         int timeout;
277 };
278
279 static int app_control_dial(struct stasis_app_control *control,
280         struct ast_channel *chan, void *data)
281 {
282         RAII_VAR(struct ast_dial *, dial, ast_dial_create(), ast_dial_destroy);
283         RAII_VAR(struct stasis_app_control_dial_data *, dial_data, data, ast_free);
284         enum ast_dial_result res;
285         char *tech, *resource;
286         struct ast_channel *new_chan;
287         RAII_VAR(struct ast_bridge *, bridge, NULL, ao2_cleanup);
288
289         tech = dial_data->endpoint;
290         if (!(resource = strchr(tech, '/'))) {
291                 return -1;
292         }
293         *resource++ = '\0';
294
295         if (!dial) {
296                 ast_log(LOG_ERROR, "Failed to create dialing structure.\n");
297                 return -1;
298         }
299
300         if (ast_dial_append(dial, tech, resource, NULL) < 0) {
301                 ast_log(LOG_ERROR, "Failed to add %s/%s to dialing structure.\n", tech, resource);
302                 return -1;
303         }
304
305         ast_dial_set_global_timeout(dial, dial_data->timeout);
306
307         res = ast_dial_run(dial, NULL, 0);
308         if (res != AST_DIAL_RESULT_ANSWERED || !(new_chan = ast_dial_answered_steal(dial))) {
309                 return -1;
310         }
311
312         if (!(bridge = ast_bridge_basic_new())) {
313                 ast_log(LOG_ERROR, "Failed to create basic bridge.\n");
314                 return -1;
315         }
316
317         if (ast_bridge_impart(bridge, new_chan, NULL, NULL,
318                 AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
319                 ast_hangup(new_chan);
320         } else {
321                 control_add_channel_to_bridge(control, chan, bridge);
322         }
323
324         return 0;
325 }
326
327 int stasis_app_control_dial(struct stasis_app_control *control, const char *endpoint, const char *exten, const char *context,
328                             int timeout)
329 {
330         struct stasis_app_control_dial_data *dial_data;
331
332         if (!(dial_data = ast_calloc(1, sizeof(*dial_data)))) {
333                 return -1;
334         }
335
336         if (!ast_strlen_zero(endpoint)) {
337                 ast_copy_string(dial_data->endpoint, endpoint, sizeof(dial_data->endpoint));
338         } else if (!ast_strlen_zero(exten) && !ast_strlen_zero(context)) {
339                 snprintf(dial_data->endpoint, sizeof(dial_data->endpoint), "Local/%s@%s", exten, context);
340         } else {
341                 return -1;
342         }
343
344         if (timeout > 0) {
345                 dial_data->timeout = timeout * 1000;
346         } else if (timeout == -1) {
347                 dial_data->timeout = -1;
348         } else {
349                 dial_data->timeout = 30000;
350         }
351
352         stasis_app_send_command_async(control, app_control_dial, dial_data);
353
354         return 0;
355 }
356
357 int stasis_app_control_add_role(struct stasis_app_control *control, const char *role)
358 {
359         return ast_channel_add_bridge_role(control->channel, role);
360 }
361
362 void stasis_app_control_clear_roles(struct stasis_app_control *control)
363 {
364         ast_channel_clear_bridge_roles(control->channel);
365 }
366
367 int control_command_count(struct stasis_app_control *control)
368 {
369         return ao2_container_count(control->command_queue);
370 }
371
372 int control_is_done(struct stasis_app_control *control)
373 {
374         /* Called from stasis_app_exec thread; no lock needed */
375         return control->is_done;
376 }
377
378 void control_mark_done(struct stasis_app_control *control)
379 {
380         control->is_done = 1;
381 }
382
383 struct stasis_app_control_continue_data {
384         char context[AST_MAX_CONTEXT];
385         char extension[AST_MAX_EXTENSION];
386         int priority;
387 };
388
389 static int app_control_continue(struct stasis_app_control *control,
390         struct ast_channel *chan, void *data)
391 {
392         RAII_VAR(struct stasis_app_control_continue_data *, continue_data, data, ast_free);
393
394         ast_assert(control->channel != NULL);
395
396         /* If we're in a Stasis bridge, depart it before going back to the
397          * dialplan */
398         if (stasis_app_get_bridge(control)) {
399                 ast_bridge_depart(control->channel);
400         }
401
402         /* Called from stasis_app_exec thread; no lock needed */
403         ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
404
405         control->is_done = 1;
406
407         return 0;
408 }
409
410 int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
411 {
412         struct stasis_app_control_continue_data *continue_data;
413
414         if (!(continue_data = ast_calloc(1, sizeof(*continue_data)))) {
415                 return -1;
416         }
417         ast_copy_string(continue_data->context, S_OR(context, ""), sizeof(continue_data->context));
418         ast_copy_string(continue_data->extension, S_OR(extension, ""), sizeof(continue_data->extension));
419         if (priority > 0) {
420                 continue_data->priority = priority;
421         } else {
422                 continue_data->priority = -1;
423         }
424
425         stasis_app_send_command_async(control, app_control_continue, continue_data);
426
427         return 0;
428 }
429
430 struct stasis_app_control_dtmf_data {
431         int before;
432         int between;
433         unsigned int duration;
434         int after;
435         char dtmf[];
436 };
437
438 static int app_control_dtmf(struct stasis_app_control *control,
439         struct ast_channel *chan, void *data)
440 {
441         RAII_VAR(struct stasis_app_control_dtmf_data *, dtmf_data, data, ast_free);
442
443         if (ast_channel_state(chan) != AST_STATE_UP) {
444                 ast_indicate(chan, AST_CONTROL_PROGRESS);
445         }
446
447         if (dtmf_data->before) {
448                 ast_safe_sleep(chan, dtmf_data->before);
449         }
450
451         ast_dtmf_stream(chan, NULL, dtmf_data->dtmf, dtmf_data->between, dtmf_data->duration);
452
453         if (dtmf_data->after) {
454                 ast_safe_sleep(chan, dtmf_data->after);
455         }
456
457         return 0;
458 }
459
460 int stasis_app_control_dtmf(struct stasis_app_control *control, const char *dtmf, int before, int between, unsigned int duration, int after)
461 {
462         struct stasis_app_control_dtmf_data *dtmf_data;
463
464         if (!(dtmf_data = ast_calloc(1, sizeof(*dtmf_data) + strlen(dtmf) + 1))) {
465                 return -1;
466         }
467
468         dtmf_data->before = before;
469         dtmf_data->between = between;
470         dtmf_data->duration = duration;
471         dtmf_data->after = after;
472         strcpy(dtmf_data->dtmf, dtmf);
473
474         stasis_app_send_command_async(control, app_control_dtmf, dtmf_data);
475
476         return 0;
477 }
478
479 static int app_control_ring(struct stasis_app_control *control,
480         struct ast_channel *chan, void *data)
481 {
482         ast_indicate(control->channel, AST_CONTROL_RINGING);
483
484         return 0;
485 }
486
487 int stasis_app_control_ring(struct stasis_app_control *control)
488 {
489         stasis_app_send_command_async(control, app_control_ring, NULL);
490
491         return 0;
492 }
493
494 static int app_control_ring_stop(struct stasis_app_control *control,
495         struct ast_channel *chan, void *data)
496 {
497         ast_indicate(control->channel, -1);
498
499         return 0;
500 }
501
502 int stasis_app_control_ring_stop(struct stasis_app_control *control)
503 {
504         stasis_app_send_command_async(control, app_control_ring_stop, NULL);
505
506         return 0;
507 }
508
509 struct stasis_app_control_mute_data {
510         enum ast_frame_type frametype;
511         unsigned int direction;
512 };
513
514 static int app_control_mute(struct stasis_app_control *control,
515         struct ast_channel *chan, void *data)
516 {
517         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
518         SCOPED_CHANNELLOCK(lockvar, chan);
519
520         ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype);
521
522         return 0;
523 }
524
525 int stasis_app_control_mute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
526 {
527         struct stasis_app_control_mute_data *mute_data;
528
529         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
530                 return -1;
531         }
532
533         mute_data->direction = direction;
534         mute_data->frametype = frametype;
535
536         stasis_app_send_command_async(control, app_control_mute, mute_data);
537
538         return 0;
539 }
540
541 static int app_control_unmute(struct stasis_app_control *control,
542         struct ast_channel *chan, void *data)
543 {
544         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
545         SCOPED_CHANNELLOCK(lockvar, chan);
546
547         ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype);
548
549         return 0;
550 }
551
552 int stasis_app_control_unmute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
553 {
554         struct stasis_app_control_mute_data *mute_data;
555
556         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
557                 return -1;
558         }
559
560         mute_data->direction = direction;
561         mute_data->frametype = frametype;
562
563         stasis_app_send_command_async(control, app_control_unmute, mute_data);
564
565         return 0;
566 }
567
568 char *stasis_app_control_get_channel_var(struct stasis_app_control *control, const char *variable)
569 {
570         RAII_VAR(struct ast_str *, tmp, ast_str_create(32), ast_free);
571
572         /* You may be tempted to lock the channel you're about to read from. You
573          * would be wrong. Some dialplan functions put the channel into
574          * autoservice, which deadlocks if the channel is already locked.
575          * ast_str_retrieve_variable() does its own locking, and the dialplan
576          * functions need to as well. We should be fine without the lock.
577          */
578
579         if (!tmp) {
580                 return NULL;
581         }
582
583         if (variable[strlen(variable) - 1] == ')') {
584                 if (ast_func_read2(control->channel, variable, &tmp, 0)) {
585                         return NULL;
586                 }
587         } else {
588                 if (!ast_str_retrieve_variable(&tmp, 0, control->channel, NULL, variable)) {
589                         return NULL;
590                 }
591         }
592
593         return ast_strdup(ast_str_buffer(tmp));
594 }
595
596 int stasis_app_control_set_channel_var(struct stasis_app_control *control, const char *variable, const char *value)
597 {
598         return pbx_builtin_setvar_helper(control->channel, variable, value);
599 }
600
601 static int app_control_hold(struct stasis_app_control *control,
602         struct ast_channel *chan, void *data)
603 {
604         ast_indicate(control->channel, AST_CONTROL_HOLD);
605
606         return 0;
607 }
608
609 void stasis_app_control_hold(struct stasis_app_control *control)
610 {
611         stasis_app_send_command_async(control, app_control_hold, NULL);
612 }
613
614 static int app_control_unhold(struct stasis_app_control *control,
615         struct ast_channel *chan, void *data)
616 {
617         ast_indicate(control->channel, AST_CONTROL_UNHOLD);
618
619         return 0;
620 }
621
622 void stasis_app_control_unhold(struct stasis_app_control *control)
623 {
624         stasis_app_send_command_async(control, app_control_unhold, NULL);
625 }
626
627 static int app_control_moh_start(struct stasis_app_control *control,
628         struct ast_channel *chan, void *data)
629 {
630         char *moh_class = data;
631
632         if (ast_channel_state(chan) != AST_STATE_UP) {
633                 ast_indicate(chan, AST_CONTROL_PROGRESS);
634         }
635
636         ast_moh_start(chan, moh_class, NULL);
637
638         ast_free(moh_class);
639         return 0;
640 }
641
642 void stasis_app_control_moh_start(struct stasis_app_control *control, const char *moh_class)
643 {
644         char *data = NULL;
645
646         if (!ast_strlen_zero(moh_class)) {
647                 data = ast_strdup(moh_class);
648         }
649
650         stasis_app_send_command_async(control, app_control_moh_start, data);
651 }
652
653 static int app_control_moh_stop(struct stasis_app_control *control,
654         struct ast_channel *chan, void *data)
655 {
656         ast_moh_stop(chan);
657         return 0;
658 }
659
660 void stasis_app_control_moh_stop(struct stasis_app_control *control)
661 {
662         stasis_app_send_command_async(control, app_control_moh_stop, NULL);
663 }
664
665 static int app_control_silence_start(struct stasis_app_control *control,
666         struct ast_channel *chan, void *data)
667 {
668         if (ast_channel_state(chan) != AST_STATE_UP) {
669                 ast_indicate(chan, AST_CONTROL_PROGRESS);
670         }
671
672         if (control->silgen) {
673                 /* We have a silence generator, but it may have been implicitly
674                  * disabled by media actions (music on hold, playing media,
675                  * etc.) Just stop it and restart a new one.
676                  */
677                 ast_channel_stop_silence_generator(
678                         control->channel, control->silgen);
679         }
680
681         ast_debug(3, "%s: Starting silence generator\n",
682                 stasis_app_control_get_channel_id(control));
683         control->silgen = ast_channel_start_silence_generator(control->channel);
684
685         if (!control->silgen) {
686                 ast_log(LOG_WARNING,
687                         "%s: Failed to start silence generator.\n",
688                         stasis_app_control_get_channel_id(control));
689         }
690
691         return 0;
692 }
693
694 void stasis_app_control_silence_start(struct stasis_app_control *control)
695 {
696         stasis_app_send_command_async(control, app_control_silence_start, NULL);
697 }
698
699 static int app_control_silence_stop(struct stasis_app_control *control,
700         struct ast_channel *chan, void *data)
701 {
702         if (control->silgen) {
703                 ast_debug(3, "%s: Stopping silence generator\n",
704                         stasis_app_control_get_channel_id(control));
705                 ast_channel_stop_silence_generator(
706                         control->channel, control->silgen);
707                 control->silgen = NULL;
708         }
709
710         return 0;
711 }
712
713 void stasis_app_control_silence_stop(struct stasis_app_control *control)
714 {
715         stasis_app_send_command_async(control, app_control_silence_stop, NULL);
716 }
717
718 struct ast_channel_snapshot *stasis_app_control_get_snapshot(
719         const struct stasis_app_control *control)
720 {
721         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
722         struct ast_channel_snapshot *snapshot;
723
724         msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
725                 stasis_app_control_get_channel_id(control));
726         if (!msg) {
727                 return NULL;
728         }
729
730         snapshot = stasis_message_data(msg);
731         ast_assert(snapshot != NULL);
732
733         ao2_ref(snapshot, +1);
734         return snapshot;
735 }
736
737 static int app_send_command_on_condition(struct stasis_app_control *control,
738                                          stasis_app_command_cb command_fn, void *data,
739                                          app_command_can_exec_cb can_exec_fn)
740 {
741         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
742
743         if (control == NULL) {
744                 return -1;
745         }
746
747         command = exec_command_on_condition(
748                 control, command_fn, data, can_exec_fn);
749         if (!command) {
750                 return -1;
751         }
752
753         return command_join(command);
754 }
755
756 int stasis_app_send_command(struct stasis_app_control *control,
757         stasis_app_command_cb command_fn, void *data)
758 {
759         return app_send_command_on_condition(control, command_fn, data, NULL);
760 }
761
762 int stasis_app_send_command_async(struct stasis_app_control *control,
763         stasis_app_command_cb command_fn, void *data)
764 {
765         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
766
767         if (control == NULL) {
768                 return -1;
769         }
770
771         command = exec_command(control, command_fn, data);
772         if (!command) {
773                 return -1;
774         }
775
776         return 0;
777 }
778
779 struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
780 {
781         if (!control) {
782                 return NULL;
783         } else {
784                 SCOPED_AO2LOCK(lock, control);
785                 return control->bridge;
786         }
787 }
788
789 static int bridge_channel_depart(struct stasis_app_control *control,
790         struct ast_channel *chan, void *data)
791 {
792         RAII_VAR(struct ast_bridge_channel *, bridge_channel, data, ao2_cleanup);
793
794         {
795                 SCOPED_CHANNELLOCK(lock, chan);
796
797                 if (bridge_channel != ast_channel_internal_bridge_channel(chan)) {
798                         ast_debug(3, "%s: Channel is no longer in departable state\n",
799                                 ast_channel_uniqueid(chan));
800                         return -1;
801                 }
802         }
803
804         ast_debug(3, "%s: Channel departing bridge\n",
805                 ast_channel_uniqueid(chan));
806
807         ast_bridge_depart(chan);
808
809         return 0;
810 }
811
812 static void bridge_after_cb(struct ast_channel *chan, void *data)
813 {
814         struct stasis_app_control *control = data;
815         SCOPED_AO2LOCK(lock, control);
816         struct ast_bridge_channel *bridge_channel;
817
818         ast_debug(3, "%s, %s: Channel leaving bridge\n",
819                 ast_channel_uniqueid(chan), control->bridge->uniqueid);
820
821         ast_assert(chan == control->channel);
822
823         /* Restore the channel's PBX */
824         ast_channel_pbx_set(control->channel, control->pbx);
825         control->pbx = NULL;
826
827         app_unsubscribe_bridge(control->app, control->bridge);
828
829         /* No longer in the bridge */
830         control->bridge = NULL;
831
832         /* Get the bridge channel so we don't depart from the wrong bridge */
833         ast_channel_lock(chan);
834         bridge_channel = ast_channel_get_bridge_channel(chan);
835         ast_channel_unlock(chan);
836
837         /* Depart this channel from the bridge using the command queue if possible */
838         if (stasis_app_send_command_async(control, bridge_channel_depart, bridge_channel)) {
839                 ao2_cleanup(bridge_channel);
840         }
841 }
842
843 static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
844         void *data)
845 {
846         struct stasis_app_control *control = data;
847
848         bridge_after_cb(control->channel, data);
849
850         ast_debug(3, "  reason: %s\n",
851                 ast_bridge_after_cb_reason_string(reason));
852 }
853
854 int control_add_channel_to_bridge(
855         struct stasis_app_control *control,
856         struct ast_channel *chan, void *data)
857 {
858         struct ast_bridge *bridge = data;
859         int res;
860
861         if (!control || !bridge) {
862                 return -1;
863         }
864
865         ast_debug(3, "%s: Adding to bridge %s\n",
866                 stasis_app_control_get_channel_id(control),
867                 bridge->uniqueid);
868
869         ast_assert(chan != NULL);
870
871         /* Depart whatever Stasis bridge we're currently in. */
872         if (stasis_app_get_bridge(control)) {
873                 /* Note that it looks like there's a race condition here, since
874                  * we don't have control locked. But this happens from the
875                  * control callback thread, so there won't be any other
876                  * concurrent attempts to bridge.
877                  */
878                 ast_bridge_depart(chan);
879         }
880
881
882         res = ast_bridge_set_after_callback(chan, bridge_after_cb,
883                 bridge_after_cb_failed, control);
884         if (res != 0) {
885                 ast_log(LOG_ERROR, "Error setting after-bridge callback\n");
886                 return -1;
887         }
888
889         {
890                 /* pbx and bridge are modified by the bridging impart thread.
891                  * It shouldn't happen concurrently, but we still need to lock
892                  * for the memory fence.
893                  */
894                 SCOPED_AO2LOCK(lock, control);
895
896                 /* Ensure the controlling application is subscribed early enough
897                  * to receive the ChannelEnteredBridge message. This works in concert
898                  * with the subscription handled in the Stasis application execution
899                  * loop */
900                 app_subscribe_bridge(control->app, bridge);
901
902                 /* Save off the channel's PBX */
903                 ast_assert(control->pbx == NULL);
904                 if (!control->pbx) {
905                         control->pbx = ast_channel_pbx(chan);
906                         ast_channel_pbx_set(chan, NULL);
907                 }
908
909                 res = ast_bridge_impart(bridge,
910                         chan,
911                         NULL, /* swap channel */
912                         NULL, /* features */
913                         AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
914                 if (res != 0) {
915                         ast_log(LOG_ERROR, "Error adding channel to bridge\n");
916                         ast_channel_pbx_set(chan, control->pbx);
917                         control->pbx = NULL;
918                         return -1;
919                 }
920
921                 ast_assert(stasis_app_get_bridge(control) == NULL);
922                 control->bridge = bridge;
923         }
924         return 0;
925 }
926
927 int stasis_app_control_add_channel_to_bridge(
928         struct stasis_app_control *control, struct ast_bridge *bridge)
929 {
930         ast_debug(3, "%s: Sending channel add_to_bridge command\n",
931                         stasis_app_control_get_channel_id(control));
932
933         return app_send_command_on_condition(
934                 control, control_add_channel_to_bridge, bridge,
935                 app_control_can_add_channel_to_bridge);
936 }
937
938 static int app_control_remove_channel_from_bridge(
939         struct stasis_app_control *control,
940         struct ast_channel *chan, void *data)
941 {
942         struct ast_bridge *bridge = data;
943
944         if (!control) {
945                 return -1;
946         }
947
948         /* We should only depart from our own bridge */
949         ast_debug(3, "%s: Departing bridge %s\n",
950                 stasis_app_control_get_channel_id(control),
951                 bridge->uniqueid);
952
953         if (bridge != stasis_app_get_bridge(control)) {
954                 ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n",
955                         stasis_app_control_get_channel_id(control),
956                         bridge->uniqueid);
957                 return -1;
958         }
959
960         ast_bridge_depart(chan);
961         return 0;
962 }
963
964 int stasis_app_control_remove_channel_from_bridge(
965         struct stasis_app_control *control, struct ast_bridge *bridge)
966 {
967         ast_debug(3, "%s: Sending channel remove_from_bridge command\n",
968                         stasis_app_control_get_channel_id(control));
969         return app_send_command_on_condition(
970                 control, app_control_remove_channel_from_bridge, bridge,
971                 app_control_can_remove_channel_from_bridge);
972 }
973
974 const char *stasis_app_control_get_channel_id(
975         const struct stasis_app_control *control)
976 {
977         return ast_channel_uniqueid(control->channel);
978 }
979
980 void stasis_app_control_publish(
981         struct stasis_app_control *control, struct stasis_message *message)
982 {
983         if (!control || !control->channel || !message) {
984                 return;
985         }
986         stasis_publish(ast_channel_topic(control->channel), message);
987 }
988
989 int stasis_app_control_queue_control(struct stasis_app_control *control,
990         enum ast_control_frame_type frame_type)
991 {
992         return ast_queue_control(control->channel, frame_type);
993 }
994
995 int control_dispatch_all(struct stasis_app_control *control,
996         struct ast_channel *chan)
997 {
998         int count = 0;
999         struct ao2_iterator i;
1000         void *obj;
1001
1002         ast_assert(control->channel == chan);
1003
1004         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
1005
1006         while ((obj = ao2_iterator_next(&i))) {
1007                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
1008                 command_invoke(command, control, chan);
1009                 ++count;
1010         }
1011
1012         ao2_iterator_destroy(&i);
1013         return count;
1014 }
1015
1016 void control_wait(struct stasis_app_control *control)
1017 {
1018         if (!control) {
1019                 return;
1020         }
1021
1022         ast_assert(control->command_queue != NULL);
1023
1024         ao2_lock(control->command_queue);
1025         while (ao2_container_count(control->command_queue) == 0) {
1026                 int res = ast_cond_wait(&control->wait_cond,
1027                         ao2_object_get_lockaddr(control->command_queue));
1028                 if (res < 0) {
1029                         ast_log(LOG_ERROR, "Error waiting on command queue\n");
1030                         break;
1031                 }
1032         }
1033         ao2_unlock(control->command_queue);
1034 }
1035
1036 int control_prestart_dispatch_all(struct stasis_app_control *control,
1037         struct ast_channel *chan)
1038 {
1039         struct ao2_container *command_queue;
1040         int count = 0;
1041         struct ao2_iterator iter;
1042         struct stasis_app_command *command;
1043
1044         ast_channel_lock(chan);
1045         command_queue = command_prestart_get_container(chan);
1046         ast_channel_unlock(chan);
1047         if (!command_queue) {
1048                 return 0;
1049         }
1050
1051         iter = ao2_iterator_init(command_queue, AO2_ITERATOR_UNLINK);
1052
1053         while ((command = ao2_iterator_next(&iter))) {
1054                 command_invoke(command, control, chan);
1055                 ao2_cleanup(command);
1056                 ++count;
1057         }
1058
1059         ao2_iterator_destroy(&iter);
1060         ao2_cleanup(command_queue);
1061         return count;
1062 }
1063
1064 struct stasis_app *control_app(struct stasis_app_control *control)
1065 {
1066         return control->app;
1067 }