8530abd9f6d118c2c8d1401111dd2878bd775b5c
[asterisk/asterisk.git] / res / stasis / control.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application control support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "asterisk/stasis_channels.h"
31
32 #include "command.h"
33 #include "control.h"
34 #include "asterisk/dial.h"
35 #include "asterisk/bridge.h"
36 #include "asterisk/bridge_after.h"
37 #include "asterisk/bridge_basic.h"
38 #include "asterisk/frame.h"
39 #include "asterisk/pbx.h"
40 #include "asterisk/musiconhold.h"
41
42 struct stasis_app_control {
43         ast_cond_t wait_cond;
44         /*! Queue of commands to dispatch on the channel */
45         struct ao2_container *command_queue;
46         /*!
47          * The associated channel.
48          * Be very careful with the threading associated w/ manipulating
49          * the channel.
50          */
51         struct ast_channel *channel;
52         /*!
53          * When a channel is in a bridge, the bridge that it is in.
54          */
55         struct ast_bridge *bridge;
56         /*!
57          * Holding place for channel's PBX while imparted to a bridge.
58          */
59         struct ast_pbx *pbx;
60         /*!
61          * When set, /c app_stasis should exit and continue in the dialplan.
62          */
63         int is_done:1;
64 };
65
66 static void control_dtor(void *obj)
67 {
68         struct stasis_app_control *control = obj;
69
70         ao2_cleanup(control->command_queue);
71         ast_cond_destroy(&control->wait_cond);
72 }
73
74 struct stasis_app_control *control_create(struct ast_channel *channel)
75 {
76         RAII_VAR(struct stasis_app_control *, control, NULL, ao2_cleanup);
77         int res;
78
79         control = ao2_alloc(sizeof(*control), control_dtor);
80         if (!control) {
81                 return NULL;
82         }
83
84         res = ast_cond_init(&control->wait_cond, NULL);
85         if (res != 0) {
86                 ast_log(LOG_ERROR, "Error initializing ast_cond_t: %s\n",
87                         strerror(errno));
88                 return NULL;
89         }
90
91         control->command_queue = ao2_container_alloc_list(
92                 AO2_ALLOC_OPT_LOCK_MUTEX, 0, NULL, NULL);
93
94         if (!control->command_queue) {
95                 return NULL;
96         }
97
98         control->channel = channel;
99
100         ao2_ref(control, +1);
101         return control;
102 }
103
104 static void *noop_cb(struct stasis_app_control *control,
105         struct ast_channel *chan, void *data)
106 {
107         return NULL;
108 }
109
110
111 static struct stasis_app_command *exec_command(
112         struct stasis_app_control *control, stasis_app_command_cb command_fn,
113         void *data)
114 {
115         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
116
117         command_fn = command_fn ? : noop_cb;
118
119         command = command_create(command_fn, data);
120
121         if (!command) {
122                 return NULL;
123         }
124
125         ao2_lock(control->command_queue);
126         ao2_link_flags(control->command_queue, command, OBJ_NOLOCK);
127         ast_cond_signal(&control->wait_cond);
128         ao2_unlock(control->command_queue);
129
130         ao2_ref(command, +1);
131         return command;
132 }
133
134 struct stasis_app_control_dial_data {
135         char endpoint[AST_CHANNEL_NAME];
136         int timeout;
137 };
138
139 static void *app_control_dial(struct stasis_app_control *control,
140         struct ast_channel *chan, void *data)
141 {
142         RAII_VAR(struct ast_dial *, dial, ast_dial_create(), ast_dial_destroy);
143         RAII_VAR(struct stasis_app_control_dial_data *, dial_data, data, ast_free);
144         enum ast_dial_result res;
145         char *tech, *resource;
146
147         struct ast_channel *new_chan;
148         struct ast_bridge *bridge;
149
150         tech = dial_data->endpoint;
151         if (!(resource = strchr(tech, '/'))) {
152                 return NULL;
153         }
154         *resource++ = '\0';
155
156         if (!dial) {
157                 ast_log(LOG_ERROR, "Failed to create dialing structure.\n");
158                 return NULL;
159         }
160
161         if (ast_dial_append(dial, tech, resource) < 0) {
162                 ast_log(LOG_ERROR, "Failed to add %s/%s to dialing structure.\n", tech, resource);
163                 return NULL;
164         }
165
166         ast_dial_set_global_timeout(dial, dial_data->timeout);
167
168         res = ast_dial_run(dial, NULL, 0);
169
170         if (res != AST_DIAL_RESULT_ANSWERED || !(new_chan = ast_dial_answered_steal(dial))) {
171                 return NULL;
172         }
173
174         if (!(bridge = ast_bridge_basic_new())) {
175                 ast_log(LOG_ERROR, "Failed to create basic bridge.\n");
176                 return NULL;
177         }
178
179         ast_bridge_impart(bridge, new_chan, NULL, NULL, 1);
180         stasis_app_control_add_channel_to_bridge(control, bridge);
181
182         return NULL;
183 }
184
185 int stasis_app_control_dial(struct stasis_app_control *control, const char *endpoint, int timeout)
186 {
187         struct stasis_app_control_dial_data *dial_data;
188
189         if (!(dial_data = ast_calloc(1, sizeof(*dial_data)))) {
190                 return -1;
191         }
192
193         ast_copy_string(dial_data->endpoint, endpoint, sizeof(dial_data->endpoint));
194
195         if (timeout > 0) {
196                 dial_data->timeout = timeout * 1000;
197         } else if (timeout == -1) {
198                 dial_data->timeout = -1;
199         } else {
200                 dial_data->timeout = 30000;
201         }
202
203         stasis_app_send_command_async(control, app_control_dial, dial_data);
204
205         return 0;
206 }
207
208 int stasis_app_control_add_role(struct stasis_app_control *control, const char *role)
209 {
210         return ast_channel_add_bridge_role(control->channel, role);
211 }
212
213 void stasis_app_control_clear_roles(struct stasis_app_control *control)
214 {
215         ast_channel_clear_bridge_roles(control->channel);
216 }
217
218 int control_is_done(struct stasis_app_control *control)
219 {
220         /* Called from stasis_app_exec thread; no lock needed */
221         return control->is_done;
222 }
223
224 struct stasis_app_control_continue_data {
225         char context[AST_MAX_CONTEXT];
226         char extension[AST_MAX_EXTENSION];
227         int priority;
228 };
229
230 static void *app_control_continue(struct stasis_app_control *control,
231         struct ast_channel *chan, void *data)
232 {
233         RAII_VAR(struct stasis_app_control_continue_data *, continue_data, data, ast_free);
234
235         ast_assert(control->channel != NULL);
236
237         /* If we're in a Stasis bridge, depart it before going back to the
238          * dialplan */
239         if (stasis_app_get_bridge(control)) {
240                 ast_bridge_depart(control->channel);
241         }
242
243         /* Called from stasis_app_exec thread; no lock needed */
244         ast_explicit_goto(control->channel, continue_data->context, continue_data->extension, continue_data->priority);
245
246         control->is_done = 1;
247
248         return NULL;
249 }
250
251 int stasis_app_control_continue(struct stasis_app_control *control, const char *context, const char *extension, int priority)
252 {
253         struct stasis_app_control_continue_data *continue_data;
254
255         if (!(continue_data = ast_calloc(1, sizeof(*continue_data)))) {
256                 return -1;
257         }
258         ast_copy_string(continue_data->context, S_OR(context, ""), sizeof(continue_data->context));
259         ast_copy_string(continue_data->extension, S_OR(extension, ""), sizeof(continue_data->extension));
260         if (priority > 0) {
261                 continue_data->priority = priority;
262         } else {
263                 continue_data->priority = -1;
264         }
265
266         stasis_app_send_command_async(control, app_control_continue, continue_data);
267
268         return 0;
269 }
270
271 struct stasis_app_control_mute_data {
272         enum ast_frame_type frametype;
273         unsigned int direction;
274 };
275
276 static void *app_control_mute(struct stasis_app_control *control,
277         struct ast_channel *chan, void *data)
278 {
279         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
280         SCOPED_CHANNELLOCK(lockvar, chan);
281
282         ast_channel_suppress(control->channel, mute_data->direction, mute_data->frametype);
283
284         return NULL;
285 }
286
287 int stasis_app_control_mute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
288 {
289         struct stasis_app_control_mute_data *mute_data;
290
291         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
292                 return -1;
293         }
294
295         mute_data->direction = direction;
296         mute_data->frametype = frametype;
297
298         stasis_app_send_command_async(control, app_control_mute, mute_data);
299
300         return 0;
301 }
302
303 static void *app_control_unmute(struct stasis_app_control *control,
304         struct ast_channel *chan, void *data)
305 {
306         RAII_VAR(struct stasis_app_control_mute_data *, mute_data, data, ast_free);
307         SCOPED_CHANNELLOCK(lockvar, chan);
308
309         ast_channel_unsuppress(control->channel, mute_data->direction, mute_data->frametype);
310
311         return NULL;
312 }
313
314 int stasis_app_control_unmute(struct stasis_app_control *control, unsigned int direction, enum ast_frame_type frametype)
315 {
316         struct stasis_app_control_mute_data *mute_data;
317
318         if (!(mute_data = ast_calloc(1, sizeof(*mute_data)))) {
319                 return -1;
320         }
321
322         mute_data->direction = direction;
323         mute_data->frametype = frametype;
324
325         stasis_app_send_command_async(control, app_control_unmute, mute_data);
326
327         return 0;
328 }
329
330 char *stasis_app_control_get_channel_var(struct stasis_app_control *control, const char *variable)
331 {
332         RAII_VAR(struct ast_str *, tmp, ast_str_create(32), ast_free);
333         SCOPED_CHANNELLOCK(lockvar, control->channel);
334
335         if (!tmp) {
336                 return NULL;
337         }
338
339         if (variable[strlen(variable) - 1] == ')') {
340                 if (ast_func_read2(control->channel, variable, &tmp, 0)) {
341                         return NULL;
342                 }
343         } else {
344                 if (!ast_str_retrieve_variable(&tmp, 0, control->channel, NULL, variable)) {
345                         return NULL;
346                 }
347         }
348
349         return ast_strdup(ast_str_buffer(tmp));
350 }
351
352 int stasis_app_control_set_channel_var(struct stasis_app_control *control, const char *variable, const char *value)
353 {
354         return pbx_builtin_setvar_helper(control->channel, variable, value);
355 }
356
357 static void *app_control_hold(struct stasis_app_control *control,
358         struct ast_channel *chan, void *data)
359 {
360         ast_indicate(control->channel, AST_CONTROL_HOLD);
361
362         return NULL;
363 }
364
365 void stasis_app_control_hold(struct stasis_app_control *control)
366 {
367         stasis_app_send_command_async(control, app_control_hold, NULL);
368 }
369
370 static void *app_control_unhold(struct stasis_app_control *control,
371         struct ast_channel *chan, void *data)
372 {
373         ast_indicate(control->channel, AST_CONTROL_UNHOLD);
374
375         return NULL;
376 }
377
378 void stasis_app_control_unhold(struct stasis_app_control *control)
379 {
380         stasis_app_send_command_async(control, app_control_unhold, NULL);
381 }
382
383 static void *app_control_moh_start(struct stasis_app_control *control,
384         struct ast_channel *chan, void *data)
385 {
386         char *moh_class = data;
387
388         ast_moh_start(chan, moh_class, NULL);
389
390         ast_free(moh_class);
391         return NULL;
392 }
393
394 void stasis_app_control_moh_start(struct stasis_app_control *control, const char *moh_class)
395 {
396         char *data = NULL;
397
398         if (!ast_strlen_zero(moh_class)) {
399                 data = ast_strdup(moh_class);
400         }
401
402         stasis_app_send_command_async(control, app_control_moh_start, data);
403 }
404
405 static void *app_control_moh_stop(struct stasis_app_control *control,
406         struct ast_channel *chan, void *data)
407 {
408         ast_moh_stop(chan);
409         return NULL;
410 }
411
412 void stasis_app_control_moh_stop(struct stasis_app_control *control)
413 {
414         stasis_app_send_command_async(control, app_control_moh_stop, NULL);
415 }
416
417 struct ast_channel_snapshot *stasis_app_control_get_snapshot(
418         const struct stasis_app_control *control)
419 {
420         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
421         struct ast_channel_snapshot *snapshot;
422
423         msg = stasis_cache_get(ast_channel_cache(), ast_channel_snapshot_type(),
424                 stasis_app_control_get_channel_id(control));
425         if (!msg) {
426                 return NULL;
427         }
428
429         snapshot = stasis_message_data(msg);
430         ast_assert(snapshot != NULL);
431
432         ao2_ref(snapshot, +1);
433         return snapshot;
434 }
435
436 void *stasis_app_send_command(struct stasis_app_control *control,
437         stasis_app_command_cb command_fn, void *data)
438 {
439         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
440
441         if (control == NULL) {
442                 return NULL;
443         }
444
445         command = exec_command(control, command_fn, data);
446         if (!command) {
447                 return NULL;
448         }
449
450         return command_join(command);
451 }
452
453 int stasis_app_send_command_async(struct stasis_app_control *control,
454         stasis_app_command_cb command_fn, void *data)
455 {
456         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
457
458         if (control == NULL) {
459                 return -1;
460         }
461
462         command = exec_command(control, command_fn, data);
463         if (!command) {
464                 return -1;
465         }
466
467         return 0;
468 }
469
470 struct ast_bridge *stasis_app_get_bridge(struct stasis_app_control *control)
471 {
472         if (!control) {
473                 return NULL;
474         } else {
475                 SCOPED_AO2LOCK(lock, control);
476                 return control->bridge;
477         }
478 }
479
480
481 static void bridge_after_cb(struct ast_channel *chan, void *data)
482 {
483         struct stasis_app_control *control = data;
484         SCOPED_AO2LOCK(lock, control);
485
486         ast_debug(3, "%s, %s: Channel leaving bridge\n",
487                 ast_channel_uniqueid(chan), control->bridge->uniqueid);
488
489         ast_assert(chan == control->channel);
490
491         /* Restore the channel's PBX */
492         ast_channel_pbx_set(control->channel, control->pbx);
493         control->pbx = NULL;
494
495         /* No longer in the bridge */
496         control->bridge = NULL;
497
498         /* Wakeup the command_queue loop */
499         exec_command(control, NULL, NULL);
500 }
501
502 static void bridge_after_cb_failed(enum ast_bridge_after_cb_reason reason,
503         void *data)
504 {
505         struct stasis_app_control *control = data;
506
507         bridge_after_cb(control->channel, data);
508
509         ast_debug(3, "  reason: %s\n",
510                 ast_bridge_after_cb_reason_string(reason));
511 }
512
513 static int OK = 0;
514 static int FAIL = -1;
515
516 static void *app_control_add_channel_to_bridge(
517         struct stasis_app_control *control,
518         struct ast_channel *chan, void *data)
519 {
520         struct ast_bridge *bridge = data;
521         int res;
522
523         if (!control || !bridge) {
524                 return NULL;
525         }
526
527         ast_debug(3, "%s: Adding to bridge %s\n",
528                 stasis_app_control_get_channel_id(control),
529                 bridge->uniqueid);
530
531         ast_assert(chan != NULL);
532
533         /* Depart whatever Stasis bridge we're currently in. */
534         if (stasis_app_get_bridge(control)) {
535                 /* Note that it looks like there's a race condition here, since
536                  * we don't have control locked. But this happens from the
537                  * control callback thread, so there won't be any other
538                  * concurrent attempts to bridge.
539                  */
540                 ast_bridge_depart(chan);
541         }
542
543
544         res = ast_bridge_set_after_callback(chan, bridge_after_cb,
545                 bridge_after_cb_failed, control);
546         if (res != 0) {
547                 ast_log(LOG_ERROR, "Error setting after-bridge callback\n");
548                 return &FAIL;
549         }
550
551         {
552                 /* pbx and bridge are modified by the bridging impart thread.
553                  * It shouldn't happen concurrently, but we still need to lock
554                  * for the memory fence.
555                  */
556                 SCOPED_AO2LOCK(lock, control);
557
558                 /* Save off the channel's PBX */
559                 ast_assert(control->pbx == NULL);
560                 if (!control->pbx) {
561                         control->pbx = ast_channel_pbx(chan);
562                         ast_channel_pbx_set(chan, NULL);
563                 }
564
565                 res = ast_bridge_impart(bridge,
566                         chan,
567                         NULL, /* swap channel */
568                         NULL, /* features */
569                         0); /* independent - false allows us to ast_bridge_depart() */
570
571                 if (res != 0) {
572                         ast_log(LOG_ERROR, "Error adding channel to bridge\n");
573                         ast_channel_pbx_set(chan, control->pbx);
574                         control->pbx = NULL;
575                         return &FAIL;
576                 }
577
578                 ast_assert(stasis_app_get_bridge(control) == NULL);
579                 control->bridge = bridge;
580         }
581         return &OK;
582 }
583
584 int stasis_app_control_add_channel_to_bridge(
585         struct stasis_app_control *control, struct ast_bridge *bridge)
586 {
587         int *res;
588         ast_debug(3, "%s: Sending channel add_to_bridge command\n",
589                         stasis_app_control_get_channel_id(control));
590         res = stasis_app_send_command(control,
591                 app_control_add_channel_to_bridge, bridge);
592         return *res;
593 }
594
595 static void *app_control_remove_channel_from_bridge(
596         struct stasis_app_control *control,
597         struct ast_channel *chan, void *data)
598 {
599         struct ast_bridge *bridge = data;
600
601         if (!control) {
602                 return &FAIL;
603         }
604
605         /* We should only depart from our own bridge */
606         ast_debug(3, "%s: Departing bridge %s\n",
607                 stasis_app_control_get_channel_id(control),
608                 bridge->uniqueid);
609
610         if (bridge != stasis_app_get_bridge(control)) {
611                 ast_log(LOG_WARNING, "%s: Not in bridge %s; not removing\n",
612                         stasis_app_control_get_channel_id(control),
613                         bridge->uniqueid);
614                 return &FAIL;
615         }
616
617         ast_bridge_depart(chan);
618         return &OK;
619 }
620
621 int stasis_app_control_remove_channel_from_bridge(
622         struct stasis_app_control *control, struct ast_bridge *bridge)
623 {
624         int *res;
625         ast_debug(3, "%s: Sending channel remove_from_bridge command\n",
626                         stasis_app_control_get_channel_id(control));
627         res = stasis_app_send_command(control,
628                 app_control_remove_channel_from_bridge, bridge);
629         return *res;
630 }
631
632 const char *stasis_app_control_get_channel_id(
633         const struct stasis_app_control *control)
634 {
635         return ast_channel_uniqueid(control->channel);
636 }
637
638 void stasis_app_control_publish(
639         struct stasis_app_control *control, struct stasis_message *message)
640 {
641         if (!control || !control->channel || !message) {
642                 return;
643         }
644         stasis_publish(ast_channel_topic(control->channel), message);
645 }
646
647 int stasis_app_control_queue_control(struct stasis_app_control *control,
648         enum ast_control_frame_type frame_type)
649 {
650         return ast_queue_control(control->channel, frame_type);
651 }
652
653 int control_dispatch_all(struct stasis_app_control *control,
654         struct ast_channel *chan)
655 {
656         int count = 0;
657         struct ao2_iterator i;
658         void *obj;
659
660         ast_assert(control->channel == chan);
661
662         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
663
664         while ((obj = ao2_iterator_next(&i))) {
665                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
666                 command_invoke(command, control, chan);
667                 ++count;
668         }
669
670         ao2_iterator_destroy(&i);
671         return count;
672 }
673
674 void control_wait(struct stasis_app_control *control)
675 {
676         if (!control) {
677                 return;
678         }
679
680         ast_assert(control->command_queue != NULL);
681
682         ao2_lock(control->command_queue);
683         while (ao2_container_count(control->command_queue) == 0) {
684                 int res = ast_cond_wait(&control->wait_cond,
685                         ao2_object_get_lockaddr(control->command_queue));
686                 if (res < 0) {
687                         ast_log(LOG_ERROR, "Error waiting on command queue\n");
688                         break;
689                 }
690         }
691         ao2_unlock(control->command_queue);
692 }