72f8519230670ecee4abba5b2fe466ea5db2e9cd
[asterisk/asterisk.git] / res / res_stasis_snoop.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application snoop control support.
22  *
23  * \author Joshua Colp <jcolp@digium.com>
24  */
25
26 /*** MODULEINFO
27         <depend type="module">res_stasis</depend>
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34
35 #include "asterisk/module.h"
36 #include "asterisk/stasis_app_impl.h"
37 #include "asterisk/stasis_app_snoop.h"
38 #include "asterisk/audiohook.h"
39 #include "asterisk/pbx.h"
40 #include "asterisk/timing.h"
41 #include "asterisk/stasis_channels.h"
42 #include "asterisk/json.h"
43 #include "asterisk/format_cache.h"
44
45 /*! \brief The interval (in milliseconds) that the Snoop timer is triggered, also controls length of audio within frames */
46 #define SNOOP_INTERVAL 20
47
48 /*! \brief Index used to keep Snoop channel names unique */
49 static unsigned int chan_idx;
50
51 /*! \brief Structure which contains all of the snoop information */
52 struct stasis_app_snoop {
53         /*! \brief Timer used for waking up Stasis thread */
54         struct ast_timer *timer;
55         /*! \brief Audiohook used to spy on the channel */
56         struct ast_audiohook spy;
57         /*! \brief Direction for spying */
58         enum ast_audiohook_direction spy_direction;
59         /*! \brief Number of samples to be read in when spying */
60         unsigned int spy_samples;
61         /*! \brief Format in use by the spy audiohook */
62         struct ast_format *spy_format;
63         /*! \brief Audiohook used to whisper on the channel */
64         struct ast_audiohook whisper;
65         /*! \brief Direction for whispering */
66         enum ast_audiohook_direction whisper_direction;
67         /*! \brief Stasis application and arguments */
68         struct ast_str *app;
69         /*! \brief Snoop channel */
70         struct ast_channel *chan;
71         /*! \brief Whether the spy capability is active or not */
72         unsigned int spy_active:1;
73         /*! \brief Whether the whisper capability is active or not */
74         unsigned int whisper_active:1;
75         /*! \brief Uniqueid of the channel this snoop is snooping on */
76         char uniqueid[AST_MAX_UNIQUEID];
77 };
78
79 /*! \brief Destructor for snoop structure */
80 static void snoop_destroy(void *obj)
81 {
82         struct stasis_app_snoop *snoop = obj;
83
84         if (snoop->timer) {
85                 ast_timer_close(snoop->timer);
86         }
87
88         if (snoop->spy_active) {
89                 ast_audiohook_destroy(&snoop->spy);
90         }
91
92         if (snoop->whisper_active) {
93                 ast_audiohook_destroy(&snoop->whisper);
94         }
95
96         ast_free(snoop->app);
97
98         ast_channel_cleanup(snoop->chan);
99 }
100
101 /*! \internal
102  * \brief Publish the chanspy message over Stasis-Core
103  * \param snoop The snoop structure
104  * \start start If non-zero, the spying is starting. Otherwise, the spyer is
105  * finishing
106  */
107 static void publish_chanspy_message(struct stasis_app_snoop *snoop, int start)
108 {
109         RAII_VAR(struct ast_json *, blob, NULL, ast_json_unref);
110         RAII_VAR(struct ast_multi_channel_blob *, payload, NULL, ao2_cleanup);
111         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
112         RAII_VAR(struct ast_channel_snapshot *, snoop_snapshot, NULL, ao2_cleanup);
113         RAII_VAR(struct ast_channel_snapshot *, spyee_snapshot, NULL, ao2_cleanup);
114         struct stasis_message_type *type = start ? ast_channel_chanspy_start_type(): ast_channel_chanspy_stop_type();
115
116         blob = ast_json_null();
117         if (!blob || !type) {
118                 return;
119         }
120
121         payload = ast_multi_channel_blob_create(blob);
122         if (!payload) {
123                 return;
124         }
125
126         snoop_snapshot = ast_channel_snapshot_get_latest(ast_channel_uniqueid(snoop->chan));
127         if (!snoop_snapshot) {
128                 return;
129         }
130         ast_multi_channel_blob_add_channel(payload, "spyer_channel", snoop_snapshot);
131
132         spyee_snapshot = ast_channel_snapshot_get_latest(snoop->uniqueid);
133         if (spyee_snapshot) {
134                 ast_multi_channel_blob_add_channel(payload, "spyee_channel", spyee_snapshot);
135         }
136
137         message = stasis_message_create(type, payload);
138         if (!message) {
139                 return;
140         }
141
142         stasis_publish(ast_channel_topic(snoop->chan), message);
143 }
144
145 /*! \brief Callback function for writing to a Snoop whisper audiohook */
146 static int snoop_write(struct ast_channel *chan, struct ast_frame *frame)
147 {
148         struct stasis_app_snoop *snoop = ast_channel_tech_pvt(chan);
149
150         if (!snoop->whisper_active) {
151                 return 0;
152         }
153
154         ast_audiohook_lock(&snoop->whisper);
155         if (snoop->whisper_direction == AST_AUDIOHOOK_DIRECTION_BOTH) {
156                 ast_audiohook_write_frame(&snoop->whisper, AST_AUDIOHOOK_DIRECTION_READ, frame);
157                 ast_audiohook_write_frame(&snoop->whisper, AST_AUDIOHOOK_DIRECTION_WRITE, frame);
158         } else {
159                 ast_audiohook_write_frame(&snoop->whisper, snoop->whisper_direction, frame);
160         }
161         ast_audiohook_unlock(&snoop->whisper);
162
163         return 0;
164 }
165
166 /*! \brief Callback function for reading from a Snoop channel */
167 static struct ast_frame *snoop_read(struct ast_channel *chan)
168 {
169         struct stasis_app_snoop *snoop = ast_channel_tech_pvt(chan);
170         struct ast_frame *frame = NULL;
171
172         /* If we fail to ack the timer OR if any active audiohooks are done hangup */
173         if ((ast_timer_ack(snoop->timer, 1) < 0) ||
174                 (snoop->spy_active && snoop->spy.status != AST_AUDIOHOOK_STATUS_RUNNING) ||
175                 (snoop->whisper_active && snoop->whisper.status != AST_AUDIOHOOK_STATUS_RUNNING)) {
176                 return NULL;
177         }
178
179         /* Only get audio from the spy audiohook if it is active */
180         if (snoop->spy_active) {
181                 ast_audiohook_lock(&snoop->spy);
182                 frame = ast_audiohook_read_frame(&snoop->spy, snoop->spy_samples, snoop->spy_direction, snoop->spy_format);
183                 ast_audiohook_unlock(&snoop->spy);
184         }
185
186         return frame ? frame : &ast_null_frame;
187 }
188
189 /*! \brief Callback function for hanging up a Snoop channel */
190 static int snoop_hangup(struct ast_channel *chan)
191 {
192         struct stasis_app_snoop *snoop = ast_channel_tech_pvt(chan);
193
194         if (snoop->spy_active) {
195                 ast_audiohook_lock(&snoop->spy);
196                 ast_audiohook_detach(&snoop->spy);
197                 ast_audiohook_unlock(&snoop->spy);
198         }
199
200         if (snoop->whisper_active) {
201                 ast_audiohook_lock(&snoop->whisper);
202                 ast_audiohook_detach(&snoop->whisper);
203                 ast_audiohook_unlock(&snoop->whisper);
204         }
205
206         publish_chanspy_message(snoop, 0);
207
208         ao2_cleanup(snoop);
209
210         ast_channel_tech_pvt_set(chan, NULL);
211
212         return 0;
213 }
214
215 static int snoop_fixup(struct ast_channel *oldchan, struct ast_channel *newchan)
216 {
217         struct stasis_app_snoop *snoop = ast_channel_tech_pvt(oldchan);
218
219         if (snoop->chan != oldchan) {
220                 return -1;
221         }
222
223         ast_channel_unref(snoop->chan);
224         ast_channel_ref(newchan);
225         snoop->chan = newchan;
226
227         return 0;
228 }
229
230 /*! \brief Channel interface declaration */
231 static struct ast_channel_tech snoop_tech = {
232         .type = "Snoop",
233         .description = "Snoop Channel Driver",
234         .write = snoop_write,
235         .read = snoop_read,
236         .hangup = snoop_hangup,
237         .fixup = snoop_fixup,
238 };
239
240 /*! \brief Thread used for running the Stasis application */
241 static void *snoop_stasis_thread(void *obj)
242 {
243         RAII_VAR(struct stasis_app_snoop *, snoop, obj, ao2_cleanup);
244         struct ast_app *stasis = pbx_findapp("Stasis");
245
246         if (!stasis) {
247                 ast_hangup(snoop->chan);
248                 return NULL;
249         }
250
251         pbx_exec(snoop->chan, stasis, ast_str_buffer(snoop->app));
252
253         ast_hangup(snoop->chan);
254
255         return NULL;
256 }
257
258 /*! \brief Internal helper function which sets up and attaches a snoop audiohook */
259 static int snoop_setup_audiohook(struct ast_channel *chan, enum ast_audiohook_type type, enum stasis_app_snoop_direction requested_direction,
260         enum ast_audiohook_direction *direction, struct ast_audiohook *audiohook)
261 {
262         ast_audiohook_init(audiohook, type, "Snoop", 0);
263
264         if (requested_direction == STASIS_SNOOP_DIRECTION_OUT) {
265                 *direction = AST_AUDIOHOOK_DIRECTION_WRITE;
266         } else if (requested_direction == STASIS_SNOOP_DIRECTION_IN) {
267                 *direction = AST_AUDIOHOOK_DIRECTION_READ;
268         } else if (requested_direction == STASIS_SNOOP_DIRECTION_BOTH) {
269                 *direction = AST_AUDIOHOOK_DIRECTION_BOTH;
270         } else {
271                 return -1;
272         }
273
274         return ast_audiohook_attach(chan, audiohook);
275 }
276
277 /*! \brief Helper function which gets the format for a Snoop channel based on the channel being snooped on */
278 static void snoop_determine_format(struct ast_channel *chan, struct stasis_app_snoop *snoop)
279 {
280         SCOPED_CHANNELLOCK(lock, chan);
281         unsigned int rate = MAX(ast_format_get_sample_rate(ast_channel_rawwriteformat(chan)),
282                 ast_format_get_sample_rate(ast_channel_rawreadformat(chan)));
283
284         snoop->spy_format = ast_format_cache_get_slin_by_rate(rate);
285 }
286
287 struct ast_channel *stasis_app_control_snoop(struct ast_channel *chan,
288         enum stasis_app_snoop_direction spy, enum stasis_app_snoop_direction whisper,
289         const char *app, const char *app_args, const char *snoop_id)
290 {
291         RAII_VAR(struct stasis_app_snoop *, snoop, NULL, ao2_cleanup);
292         struct ast_format_cap *caps;
293         pthread_t thread;
294         struct ast_assigned_ids assignedids = {
295                 .uniqueid = snoop_id,
296         };
297
298         if (spy == STASIS_SNOOP_DIRECTION_NONE &&
299                 whisper == STASIS_SNOOP_DIRECTION_NONE) {
300                 return NULL;
301         }
302
303         snoop = ao2_alloc_options(sizeof(*snoop), snoop_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
304         if (!snoop) {
305                 return NULL;
306         }
307
308         /* Allocate a buffer to store the Stasis application and arguments in */
309         snoop->app = ast_str_create(64);
310         if (!snoop->app) {
311                 return NULL;
312         }
313
314         ast_str_set(&snoop->app, 0, "%s", app);
315         if (!ast_strlen_zero(app_args)) {
316                 ast_str_append(&snoop->app, 0, ",%s", app_args);
317         }
318
319         /* Set up a timer for the Snoop channel so it wakes up at a specific interval */
320         snoop->timer = ast_timer_open();
321         if (!snoop->timer) {
322                 return NULL;
323         }
324         ast_timer_set_rate(snoop->timer, 1000 / SNOOP_INTERVAL);
325
326         /* Determine which signed linear format should be used */
327         snoop_determine_format(chan, snoop);
328
329         /* Allocate a Snoop channel and set up various parameters */
330         snoop->chan = ast_channel_alloc(1, AST_STATE_UP, "", "", "", "", "", &assignedids, NULL, 0, "Snoop/%s-%08x", ast_channel_uniqueid(chan),
331                 (unsigned)ast_atomic_fetchadd_int((int *)&chan_idx, +1));
332         if (!snoop->chan) {
333                 return NULL;
334         }
335
336         strcpy(snoop->uniqueid, ast_channel_uniqueid(chan));
337
338         /* To keep the channel valid on the Snoop structure until it is destroyed we bump the ref up here */
339         ast_channel_ref(snoop->chan);
340
341         ast_channel_tech_set(snoop->chan, &snoop_tech);
342         ao2_ref(snoop, +1);
343         ast_channel_tech_pvt_set(snoop->chan, snoop);
344         ast_channel_set_fd(snoop->chan, 0, ast_timer_fd(snoop->timer));
345
346         /* The format on the Snoop channel will be this signed linear format, and it will never change */
347         caps = ast_format_cap_alloc(AST_FORMAT_CAP_FLAG_DEFAULT);
348         if (!caps) {
349                 return NULL;
350         }
351         ast_format_cap_append(caps, snoop->spy_format, 0);
352         ast_channel_nativeformats_set(snoop->chan, caps);
353         ao2_ref(caps, -1);
354
355         ast_channel_set_writeformat(snoop->chan, snoop->spy_format);
356         ast_channel_set_rawwriteformat(snoop->chan, snoop->spy_format);
357         ast_channel_set_readformat(snoop->chan, snoop->spy_format);
358         ast_channel_set_rawreadformat(snoop->chan, snoop->spy_format);
359
360         ast_channel_unlock(snoop->chan);
361
362         if (spy != STASIS_SNOOP_DIRECTION_NONE) {
363                 if (snoop_setup_audiohook(chan, AST_AUDIOHOOK_TYPE_SPY, spy, &snoop->spy_direction, &snoop->spy)) {
364                         ast_hangup(snoop->chan);
365                         return NULL;
366                 }
367
368                 snoop->spy_samples = ast_format_get_sample_rate(snoop->spy_format) / (1000 / SNOOP_INTERVAL);
369                 snoop->spy_active = 1;
370         }
371
372         /* If whispering is enabled set up the audiohook */
373         if (whisper != STASIS_SNOOP_DIRECTION_NONE) {
374                 if (snoop_setup_audiohook(chan, AST_AUDIOHOOK_TYPE_WHISPER, whisper, &snoop->whisper_direction, &snoop->whisper)) {
375                         ast_hangup(snoop->chan);
376                         return NULL;
377                 }
378
379                 snoop->whisper_active = 1;
380         }
381
382         /* Create the thread which services the Snoop channel */
383         ao2_ref(snoop, +1);
384         if (ast_pthread_create_detached_background(&thread, NULL, snoop_stasis_thread, snoop)) {
385                 ao2_cleanup(snoop);
386
387                 /* No other thread is servicing this channel so we can immediately hang it up */
388                 ast_hangup(snoop->chan);
389                 return NULL;
390         }
391
392         publish_chanspy_message(snoop, 1);
393
394         /* The caller of this has a reference as well */
395         return ast_channel_ref(snoop->chan);
396 }
397
398 static int load_module(void)
399 {
400         return AST_MODULE_LOAD_SUCCESS;
401 }
402
403 static int unload_module(void)
404 {
405         return 0;
406 }
407
408 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application snoop support",
409         .support_level = AST_MODULE_SUPPORT_CORE,
410         .load = load_module,
411         .unload = unload_module,
412         .nonoptreq = "res_stasis");