Break res_stasis into smaller files.
[asterisk/asterisk.git] / res / stasis / control.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application control support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
29
30 #include "asterisk/stasis_channels.h"
31
32 #include "command.h"
33 #include "control.h"
34
35 struct stasis_app_control {
36         /*! Queue of commands to dispatch on the channel */
37         struct ao2_container *command_queue;
38         /*!
39          * When set, /c app_stasis should exit and continue in the dialplan.
40          */
41         int is_done:1;
42         /*!
43          * The associated channel.
44          * Be very careful with the threading associated w/ manipulating
45          * the channel.
46          */
47         struct ast_channel *channel;
48 };
49
50 struct stasis_app_control *control_create(struct ast_channel *channel)
51 {
52         struct stasis_app_control *control;
53
54         control = ao2_alloc(sizeof(*control), NULL);
55         if (!control) {
56                 return NULL;
57         }
58
59         control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL);
60
61         control->channel = channel;
62
63         return control;
64 }
65
66 static struct stasis_app_command *exec_command(
67         struct stasis_app_control *control, stasis_app_command_cb command_fn,
68         void *data)
69 {
70         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
71
72         command = command_create(command_fn, data);
73
74         if (!command) {
75                 return NULL;
76         }
77
78         ao2_lock(control);
79         ao2_ref(command, +1);
80         ao2_link(control->command_queue, command);
81         ao2_unlock(control);
82
83         ao2_ref(command, +1);
84         return command;
85 }
86
87 int control_is_done(struct stasis_app_control *control)
88 {
89         /* Called from stasis_app_exec thread; no lock needed */
90         return control->is_done;
91 }
92
93 static void *app_control_continue(struct stasis_app_control *control,
94         struct ast_channel *chan, void *data)
95 {
96         /* Called from stasis_app_exec thread; no lock needed */
97         control->is_done = 1;
98         return NULL;
99 }
100
101 void stasis_app_control_continue(struct stasis_app_control *control)
102 {
103         stasis_app_send_command_async(control, app_control_continue, NULL);
104 }
105
106 struct ast_channel_snapshot *stasis_app_control_get_snapshot(
107         const struct stasis_app_control *control)
108 {
109         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
110         struct stasis_caching_topic *caching_topic;
111         struct ast_channel_snapshot *snapshot;
112
113         caching_topic = ast_channel_topic_all_cached();
114         ast_assert(caching_topic != NULL);
115
116         msg = stasis_cache_get(caching_topic, ast_channel_snapshot_type(),
117                 stasis_app_control_get_channel_id(control));
118         if (!msg) {
119                 return NULL;
120         }
121
122         snapshot = stasis_message_data(msg);
123         ast_assert(snapshot != NULL);
124
125         ao2_ref(snapshot, +1);
126         return snapshot;
127 }
128
129 void *stasis_app_send_command(struct stasis_app_control *control,
130         stasis_app_command_cb command_fn, void *data)
131 {
132         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
133
134         if (control == NULL) {
135                 return NULL;
136         }
137
138         command = exec_command(control, command_fn, data);
139         if (!command) {
140                 return NULL;
141         }
142
143         return command_join(command);
144 }
145
146 int stasis_app_send_command_async(struct stasis_app_control *control,
147         stasis_app_command_cb command_fn, void *data)
148 {
149         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
150
151         if (control == NULL) {
152                 return -1;
153         }
154
155         command = exec_command(control, command_fn, data);
156         if (!command) {
157                 return -1;
158         }
159
160         return 0;
161 }
162
163 const char *stasis_app_control_get_channel_id(
164         const struct stasis_app_control *control)
165 {
166         return ast_channel_uniqueid(control->channel);
167 }
168
169 void stasis_app_control_publish(
170         struct stasis_app_control *control, struct stasis_message *message)
171 {
172         if (!control || !control->channel || !message) {
173                 return;
174         }
175         stasis_publish(ast_channel_topic(control->channel), message);
176 }
177
178 int control_dispatch_all(struct stasis_app_control *control,
179         struct ast_channel *chan)
180 {
181         int count = 0;
182         struct ao2_iterator i;
183         void *obj;
184
185         SCOPED_AO2LOCK(lock, control);
186
187         ast_assert(control->channel == chan);
188
189         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
190
191         while ((obj = ao2_iterator_next(&i))) {
192                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
193                 command_invoke(command, control, chan);
194                 ++count;
195         }
196
197         ao2_iterator_destroy(&i);
198         return count;
199 }