8a77deae47307d243c23aed764863917bb336828
[asterisk/asterisk.git] / res / stasis / control.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application control support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "asterisk/stasis_channels.h"
31
32 #include "command.h"
33 #include "control.h"
34 #include "asterisk/dial.h"
35 #include "asterisk/bridge.h"
36 #include "asterisk/bridge_after.h"
37 #include "asterisk/bridge_basic.h"
38 #include "asterisk/frame.h"
39 #include "asterisk/pbx.h"
40 #include "asterisk/musiconhold.h"
41
42 struct stasis_app_control {
43         ast_cond_t wait_cond;
44         /*! Queue of commands to dispatch on the channel */
45         struct ao2_container *command_queue;
46         /*!
47          * The associated channel.
48          * Be very careful with the threading associated w/ manipulating
49          * the channel.
50          */
51         struct ast_channel *channel;
52         /*!
53          * When a channel is in a bridge, the bridge that it is in.
54          */
55         struct ast_bridge *bridge;
56         /*!
57          * Holding place for channel's PBX while imparted to a bridge.
58          */
59         struct ast_pbx *pbx;
60         /*!
61          * When set, /c app_stasis should exit and continue in the dialplan.
62          */
63         int is_done:1;
64 };
65
66 static void control_dtor(void *obj)
67 {
68         struct stasis_app_control *control = obj;
69
70         ao2_cleanup(control->command_queue);
71         ast_cond_destroy(&control->wait_cond);
72 }
73
74 struct stasis_app_control *control_create(struct ast_channel *channel)
75 {
76         RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
77         int res;
78
79         control = ao2_alloc(sizeof(*control), control_dtor);
80         if (!control) {
81                 return NULL;
82         }
83
84         res = ast_cond_init(&control->wait_cond, NULL);
85         if (res != 0) {
86                 ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n",
87                         strerror(errno));
88                 return NULL;
89         }
90
91         control->command_queue = ao2_container_alloc_list(
92                 AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL);
93
94         if (!control->command_queue) {
95                 return NULL;
96         }
97
98         control->channel = channel;
99
100         ao2_ref(control, +1);
101         return control;
102 }
103
104 static void *noop_cb(struct stasis_app_control *control,
105         struct ast_channel *chan, void *data)
106 {
107         return NULL;
108 }
109
110
111 static struct stasis_app_command *exec_command(
112         struct stasis_app_control *control, stasis_app_command_cb command_fn,
113         void *data)
114 {
115         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
116
117         command_fn = command_fn ? : noop_cb;
118
119         command = command_create(command_fn, data);
120         if (!command) {
121                 return NULL;
122         }
123
124         ao2_lock(control->command_queue);
125         ao2_link_flags(control->command_queue, command, OBJ_NOLOCK);
126         ast_cond_signal(&control->wait_cond);
127         ao2_unlock(control->command_queue);
128
129         ao2_ref(command, +1);
130         return command;
131 }
132
133 struct stasis_app_control_dial_data {
134         char endpoint[AST_CHANNEL_NAME];
135         int timeout;
136 };
137
138 static void *app_control_add_channel_to_bridge(
139         struct stasis_app_control *control,
140         struct ast_channel *chan, void *data);
141
142 static void *app_control_dial(struct stasis_app_control *control,
143         struct ast_channel *chan, void *data)
144 {
145         RAII_VAR(struct ast_dial *, dial, ast_dial_create(), ast_dial_destroy);
146         RAII_VAR(struct stasis_app_control_dial_data *, dial_data, data, ast_free);
147         enum ast_dial_result res;
148         char *tech, *resource;
149         struct ast_channel *new_chan;
150         RAII_VAR(struct ast_bridge *, bridge, NULL, ao2_cleanup);
151
152         tech = dial_data->endpoint;
153         if (!(resource = strchr(tech, '/'))) {
154                 return NULL;
155         }
156         *resource++ = '\0';
157
158         if (!dial) {
159                 ast_log(LOG_ERROR, "Failed to create dialing structure.\n");
160                 return NULL;
161         }
162
163         if (ast_dial_append(dial, tech, resource) < 0) {
164                 ast_log(LOG_ERROR, "Failed to add %s/%s to dialing structure.\n", tech, resource);
165                 return NULL;
166         }
167
168         ast_dial_set_global_timeout(dial, dial_data->timeout);
169
170         res = ast_dial_run(dial, NULL, 0);
171         if (res != AST_DIAL_RESULT_ANSWERED || !(new_chan = ast_dial_answered_steal(dial))) {
172                 return NULL;
173         }
174
175         if (!(bridge = ast_bridge_basic_new())) {
176                 ast_log(LOG_ERROR, "Failed to create basic bridge.\n");
177                 return NULL;
178         }
179
180         if (ast_bridge_impart(bridge, new_chan, NULL, NULL,
181                 AST_BRIDGE_IMPART_CHAN_INDEPENDENT)) {
182                 ast_hangup(new_chan);
183         } else {
184                 app_control_add_channel_to_bridge(control, chan, bridge);
185         }
186
187         return NULL;
188 }
189
190 int stasis_app_control_dial(struct stasis_app_control *control, const char *endpoint, const char *exten, const char *context,
191                             int timeout)
192 {
193         struct stasis_app_control_dial_data *dial_data;
194
195         if (!(dial_data = ast_calloc(1, sizeof(*dial_data)))) {
196                 return -1;
197         }
198
199         if (!ast_strlen_zero(endpoint)) {
200                 ast_copy_string(dial_data->endpoint, endpoint, sizeof(dial_data->endpoint));
201         } else if (!ast_strlen_zero(exten) && !ast_strlen_zero(context)) {
202                 snprintf(dial_data->endpoint, sizeof(dial_data->endpoint), "Local/%s@%s", exten, context);
203         } else {
204                 return -1;
205         }
206
207         if (timeout > 0) {
208                 dial_data->timeout = timeout * 1000;
209         } else if (timeout == -1) {
210                 dial_data->timeout = -1;
211         } else {
212                 dial_data->timeout = 30000;
213         }
214
215         stasis_app_send_command_async(control, app_control_dial, dial_data);
216
217         return 0;
218 }
219
220 int stasis_app_control_add_role(struct stasis_app_control *control, const char *role)
221 {
222         return ast_channel_add_bridge_role(control->channel, role);
223 }
224
225 void stasis_app_control_clear_roles(struct stasis_app_control *control)
226 {
227         ast_channel_clear_bridge_roles(control->channel);
228 }
229
230 int control_is_done(struct stasis_app_control *control)
231 {
232         /* Called from stasis_app_exec thread; no lock needed */
233         return control->is_done;
234 }
235
236 struct stasis_app_control_continue_data {
237         char context[AST_MAX_CONTEXT];
238         char extension[AST_MAX_EXTENSION];
239         int priority;
240 };
241
242 static void *app_control_continue(struct stasis_app_control *control,
243         struct ast_channel *chan, void *data)
244 {
245         RAII_VAR(struct stasis_app_control_continue_data *, continue_data, data, ast_free);
246
247         ast_assert(control->channel != NULL);
248
249         /* If we're in a Stasis bridge, depart it before going back to the
250          * dialplan */
251         if (stasis_app_get_bridge(control)) {
252                 ast_bridge_depart(control->channel);
253         }
254
255         /* Called from stasis_app_exec thread; no lock needed */
256         ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
257
258         control->is_done = 1;
259
260         return NULL;
261 }
262
263 int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
264 {
265         struct stasis_app_control_continue_data *continue_data;
266
267         if (!(continue_data = ast_calloc(1, sizeof(*continue_data)))) {
268                 return -1;
269         }
270         ast_copy_string(continue_data->context, S_OR(context, ""), sizeof(continue_data->context));
271         ast_copy_string(continue_data->extension, S_OR(extension, ""), sizeof(continue_data->extension));
272         if (priority > 0) {
273                 continue_data->priority = priority;
274         } else {
275                 continue_data->priority = -1;
276         }
277
278         stasis_app_send_command_async(control, app_control_continue, continue_data);
279
280         return 0;
281 }
282
283 struct stasis_app_control_mute_data {
284         enum ast_frame_type frametype;
285         unsigned int direction;
286 };
287
288 static void *app_control_mute(struct stasis_app_control *control,
289         struct ast_channel *chan, void *data)
290 {
291         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
292         SCOPED_CHANNELLOCK(lockvar, chan);
293
294         ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype);
295
296         return NULL;
297 }
298
299 int stasis_app_control_mute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
300 {
301         struct stasis_app_control_mute_data *mute_data;
302
303         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
304                 return -1;
305         }
306
307         mute_data->direction = direction;
308         mute_data->frametype = frametype;
309
310         stasis_app_send_command_async(control, app_control_mute, mute_data);
311
312         return 0;
313 }
314
315 static void *app_control_unmute(struct stasis_app_control *control,
316         struct ast_channel *chan, void *data)
317 {
318         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
319         SCOPED_CHANNELLOCK(lockvar, chan);
320
321         ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype);
322
323         return NULL;
324 }
325
326 int stasis_app_control_unmute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
327 {
328         struct stasis_app_control_mute_data *mute_data;
329
330         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
331                 return -1;
332         }
333
334         mute_data->direction = direction;
335         mute_data->frametype = frametype;
336
337         stasis_app_send_command_async(control, app_control_unmute, mute_data);
338
339         return 0;
340 }
341
342 char *stasis_app_control_get_channel_var(struct stasis_app_control *control, const char *variable)
343 {
344         RAII_VAR(struct ast_str *, tmp, ast_str_create(32), ast_free);
345         SCOPED_CHANNELLOCK(lockvar, control->channel);
346
347         if (!tmp) {
348                 return NULL;
349         }
350
351         if (variable[strlen(variable) - 1] == ')') {
352                 if (ast_func_read2(control->channel, variable, &tmp, 0)) {
353                         return NULL;
354                 }
355         } else {
356                 if (!ast_str_retrieve_variable(&tmp, 0, control->channel, NULL, variable)) {
357                         return NULL;
358                 }
359         }
360
361         return ast_strdup(ast_str_buffer(tmp));
362 }
363
364 int stasis_app_control_set_channel_var(struct stasis_app_control *control, const char *variable, const char *value)
365 {
366         return pbx_builtin_setvar_helper(control->channel, variable, value);
367 }
368
369 static void *app_control_hold(struct stasis_app_control *control,
370         struct ast_channel *chan, void *data)
371 {
372         ast_indicate(control->channel, AST_CONTROL_HOLD);
373
374         return NULL;
375 }
376
377 void stasis_app_control_hold(struct stasis_app_control *control)
378 {
379         stasis_app_send_command_async(control, app_control_hold, NULL);
380 }
381
382 static void *app_control_unhold(struct stasis_app_control *control,
383         struct ast_channel *chan, void *data)
384 {
385         ast_indicate(control->channel, AST_CONTROL_UNHOLD);
386
387         return NULL;
388 }
389
390 void stasis_app_control_unhold(struct stasis_app_control *control)
391 {
392         stasis_app_send_command_async(control, app_control_unhold, NULL);
393 }
394
395 static void *app_control_moh_start(struct stasis_app_control *control,
396         struct ast_channel *chan, void *data)
397 {
398         char *moh_class = data;
399
400         ast_moh_start(chan, moh_class, NULL);
401
402         ast_free(moh_class);
403         return NULL;
404 }
405
406 void stasis_app_control_moh_start(struct stasis_app_control *control, const char *moh_class)
407 {
408         char *data = NULL;
409
410         if (!ast_strlen_zero(moh_class)) {
411                 data = ast_strdup(moh_class);
412         }
413
414         stasis_app_send_command_async(control, app_control_moh_start, data);
415 }
416
417 static void *app_control_moh_stop(struct stasis_app_control *control,
418         struct ast_channel *chan, void *data)
419 {
420         ast_moh_stop(chan);
421         return NULL;
422 }
423
424 void stasis_app_control_moh_stop(struct stasis_app_control *control)
425 {
426         stasis_app_send_command_async(control, app_control_moh_stop, NULL);
427 }
428
429 struct ast_channel_snapshot *stasis_app_control_get_snapshot(
430         const struct stasis_app_control *control)
431 {
432         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
433         struct ast_channel_snapshot *snapshot;
434
435         msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
436                 stasis_app_control_get_channel_id(control));
437         if (!msg) {
438                 return NULL;
439         }
440
441         snapshot = stasis_message_data(msg);
442         ast_assert(snapshot != NULL);
443
444         ao2_ref(snapshot, +1);
445         return snapshot;
446 }
447
448 void *stasis_app_send_command(struct stasis_app_control *control,
449         stasis_app_command_cb command_fn, void *data)
450 {
451         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
452
453         if (control == NULL) {
454                 return NULL;
455         }
456
457         command = exec_command(control, command_fn, data);
458         if (!command) {
459                 return NULL;
460         }
461
462         return command_join(command);
463 }
464
465 int stasis_app_send_command_async(struct stasis_app_control *control,
466         stasis_app_command_cb command_fn, void *data)
467 {
468         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
469
470         if (control == NULL) {
471                 return -1;
472         }
473
474         command = exec_command(control, command_fn, data);
475         if (!command) {
476                 return -1;
477         }
478
479         return 0;
480 }
481
482 struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
483 {
484         if (!control) {
485                 return NULL;
486         } else {
487                 SCOPED_AO2LOCK(lock, control);
488                 return control->bridge;
489         }
490 }
491
492 static void *bridge_channel_depart(struct stasis_app_control *control,
493         struct ast_channel *chan, void *data)
494 {
495         RAII_VAR(struct ast_bridge_channel *, bridge_channel, data, ao2_cleanup);
496
497         {
498                 SCOPED_CHANNELLOCK(lock, chan);
499
500                 if (bridge_channel != ast_channel_internal_bridge_channel(chan)) {
501                         ast_debug(3, "%s: Channel is no longer in departable state\n",
502                                 ast_channel_uniqueid(chan));
503                         return NULL;
504                 }
505         }
506
507         ast_debug(3, "%s: Channel departing bridge\n",
508                 ast_channel_uniqueid(chan));
509
510         ast_bridge_depart(chan);
511
512         return NULL;
513 }
514
515 static void bridge_after_cb(struct ast_channel *chan, void *data)
516 {
517         struct stasis_app_control *control = data;
518         SCOPED_AO2LOCK(lock, control);
519         struct ast_bridge_channel *bridge_channel;
520
521         ast_debug(3, "%s, %s: Channel leaving bridge\n",
522                 ast_channel_uniqueid(chan), control->bridge->uniqueid);
523
524         ast_assert(chan == control->channel);
525
526         /* Restore the channel's PBX */
527         ast_channel_pbx_set(control->channel, control->pbx);
528         control->pbx = NULL;
529
530         /* No longer in the bridge */
531         control->bridge = NULL;
532
533         /* Get the bridge channel so we don't depart from the wrong bridge */
534         ast_channel_lock(chan);
535         bridge_channel = ast_channel_get_bridge_channel(chan);
536         ast_channel_unlock(chan);
537
538         /* Depart this channel from the bridge using the command queue if possible */
539         if (stasis_app_send_command_async(control, bridge_channel_depart, bridge_channel)) {
540                 ao2_cleanup(bridge_channel);
541         }
542 }
543
544 static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
545         void *data)
546 {
547         struct stasis_app_control *control = data;
548
549         bridge_after_cb(control->channel, data);
550
551         ast_debug(3, "  reason: %s\n",
552                 ast_bridge_after_cb_reason_string(reason));
553 }
554
555 static int OK = 0;
556 static int FAIL = -1;
557
558 static void *app_control_add_channel_to_bridge(
559         struct stasis_app_control *control,
560         struct ast_channel *chan, void *data)
561 {
562         struct ast_bridge *bridge = data;
563         int res;
564
565         if (!control || !bridge) {
566                 return NULL;
567         }
568
569         ast_debug(3, "%s: Adding to bridge %s\n",
570                 stasis_app_control_get_channel_id(control),
571                 bridge->uniqueid);
572
573         ast_assert(chan != NULL);
574
575         /* Depart whatever Stasis bridge we're currently in. */
576         if (stasis_app_get_bridge(control)) {
577                 /* Note that it looks like there's a race condition here, since
578                  * we don't have control locked. But this happens from the
579                  * control callback thread, so there won't be any other
580                  * concurrent attempts to bridge.
581                  */
582                 ast_bridge_depart(chan);
583         }
584
585
586         res = ast_bridge_set_after_callback(chan, bridge_after_cb,
587                 bridge_after_cb_failed, control);
588         if (res != 0) {
589                 ast_log(LOG_ERROR, "Error setting after-bridge callback\n");
590                 return &FAIL;
591         }
592
593         {
594                 /* pbx and bridge are modified by the bridging impart thread.
595                  * It shouldn't happen concurrently, but we still need to lock
596                  * for the memory fence.
597                  */
598                 SCOPED_AO2LOCK(lock, control);
599
600                 /* Save off the channel's PBX */
601                 ast_assert(control->pbx == NULL);
602                 if (!control->pbx) {
603                         control->pbx = ast_channel_pbx(chan);
604                         ast_channel_pbx_set(chan, NULL);
605                 }
606
607                 res = ast_bridge_impart(bridge,
608                         chan,
609                         NULL, /* swap channel */
610                         NULL, /* features */
611                         AST_BRIDGE_IMPART_CHAN_DEPARTABLE);
612                 if (res != 0) {
613                         ast_log(LOG_ERROR, "Error adding channel to bridge\n");
614                         ast_channel_pbx_set(chan, control->pbx);
615                         control->pbx = NULL;
616                         return &FAIL;
617                 }
618
619                 ast_assert(stasis_app_get_bridge(control) == NULL);
620                 control->bridge = bridge;
621         }
622         return &OK;
623 }
624
625 int stasis_app_control_add_channel_to_bridge(
626         struct stasis_app_control *control, struct ast_bridge *bridge)
627 {
628         int *res;
629         ast_debug(3, "%s: Sending channel add_to_bridge command\n",
630                         stasis_app_control_get_channel_id(control));
631         res = stasis_app_send_command(control,
632                 app_control_add_channel_to_bridge, bridge);
633         return *res;
634 }
635
636 static void *app_control_remove_channel_from_bridge(
637         struct stasis_app_control *control,
638         struct ast_channel *chan, void *data)
639 {
640         struct ast_bridge *bridge = data;
641
642         if (!control) {
643                 return &FAIL;
644         }
645
646         /* We should only depart from our own bridge */
647         ast_debug(3, "%s: Departing bridge %s\n",
648                 stasis_app_control_get_channel_id(control),
649                 bridge->uniqueid);
650
651         if (bridge != stasis_app_get_bridge(control)) {
652                 ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n",
653                         stasis_app_control_get_channel_id(control),
654                         bridge->uniqueid);
655                 return &FAIL;
656         }
657
658         ast_bridge_depart(chan);
659         return &OK;
660 }
661
662 int stasis_app_control_remove_channel_from_bridge(
663         struct stasis_app_control *control, struct ast_bridge *bridge)
664 {
665         int *res;
666         ast_debug(3, "%s: Sending channel remove_from_bridge command\n",
667                         stasis_app_control_get_channel_id(control));
668         res = stasis_app_send_command(control,
669                 app_control_remove_channel_from_bridge, bridge);
670         return *res;
671 }
672
673 const char *stasis_app_control_get_channel_id(
674         const struct stasis_app_control *control)
675 {
676         return ast_channel_uniqueid(control->channel);
677 }
678
679 void stasis_app_control_publish(
680         struct stasis_app_control *control, struct stasis_message *message)
681 {
682         if (!control || !control->channel || !message) {
683                 return;
684         }
685         stasis_publish(ast_channel_topic(control->channel), message);
686 }
687
688 int stasis_app_control_queue_control(struct stasis_app_control *control,
689         enum ast_control_frame_type frame_type)
690 {
691         return ast_queue_control(control->channel, frame_type);
692 }
693
694 int control_dispatch_all(struct stasis_app_control *control,
695         struct ast_channel *chan)
696 {
697         int count = 0;
698         struct ao2_iterator i;
699         void *obj;
700
701         ast_assert(control->channel == chan);
702
703         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
704
705         while ((obj = ao2_iterator_next(&i))) {
706                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
707                 command_invoke(command, control, chan);
708                 ++count;
709         }
710
711         ao2_iterator_destroy(&i);
712         return count;
713 }
714
715 void control_wait(struct stasis_app_control *control)
716 {
717         if (!control) {
718                 return;
719         }
720
721         ast_assert(control->command_queue != NULL);
722
723         ao2_lock(control->command_queue);
724         while (ao2_container_count(control->command_queue) == 0) {
725                 int res = ast_cond_wait(&control->wait_cond,
726                         ao2_object_get_lockaddr(control->command_queue));
727                 if (res < 0) {
728                         ast_log(LOG_ERROR, "Error waiting on command queue\n");
729                         break;
730                 }
731         }
732         ao2_unlock(control->command_queue);
733 }