Fix some bad whitespace
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/channel.h"
36 #include "asterisk/lock.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_app.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/strings.h"
42
43 /*! Time to wait for a frame in the application */
44 #define MAX_WAIT_MS 200
45
46 /*!
47  * \brief Number of buckets for the Stasis application hash table.  Remember to
48  * keep it a prime number!
49  */
50 #define APPS_NUM_BUCKETS 127
51
52 /*!
53  * \brief Number of buckets for the Stasis application hash table.  Remember to
54  * keep it a prime number!
55  */
56 #define CONTROLS_NUM_BUCKETS 127
57
58 /*!
59  * \brief Stasis application container. Please call apps_registry() instead of
60  * directly accessing.
61  */
62 struct ao2_container *__apps_registry;
63
64 struct ao2_container *__app_controls;
65
66 /*! Ref-counting accessor for the stasis applications container */
67 static struct ao2_container *apps_registry(void)
68 {
69         ao2_ref(__apps_registry, +1);
70         return __apps_registry;
71 }
72
73 static struct ao2_container *app_controls(void)
74 {
75         ao2_ref(__app_controls, +1);
76         return __app_controls;
77 }
78
79 struct app {
80         /*! Callback function for this application. */
81         stasis_app_cb handler;
82         /*! Opaque data to hand to callback function. */
83         void *data;
84         /*! Name of the Stasis application */
85         char name[];
86 };
87
88 static void app_dtor(void *obj)
89 {
90         struct app *app = obj;
91
92         ao2_cleanup(app->data);
93         app->data = NULL;
94 }
95
96 /*! Constructor for \ref app. */
97 static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
98 {
99         struct app *app;
100         size_t size;
101
102         ast_assert(name != NULL);
103         ast_assert(handler != NULL);
104
105         size = sizeof(*app) + strlen(name) + 1;
106         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
107
108         if (!app) {
109                 return NULL;
110         }
111
112         strncpy(app->name, name, size - sizeof(*app));
113         app->handler = handler;
114         ao2_ref(data, +1);
115         app->data = data;
116
117         return app;
118 }
119
120 /*! AO2 hash function for \ref app */
121 static int app_hash(const void *obj, const int flags)
122 {
123         const struct app *app = obj;
124         const char *name = flags & OBJ_KEY ? obj : app->name;
125
126         return ast_str_hash(name);
127 }
128
129 /*! AO2 comparison function for \ref app */
130 static int app_compare(void *lhs, void *rhs, int flags)
131 {
132         const struct app *lhs_app = lhs;
133         const struct app *rhs_app = rhs;
134         const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
135
136         if (strcmp(lhs_app->name, rhs_name) == 0) {
137                 return CMP_MATCH | CMP_STOP;
138         } else {
139                 return 0;
140         }
141 }
142
143 /*!
144  * \brief Send a message to the given application.
145  * \param app App to send the message to.
146  * \param message Message to send.
147  */
148 static void app_send(struct app *app, struct ast_json *message)
149 {
150         app->handler(app->data, app->name, message);
151 }
152
153 typedef void* (*stasis_app_command_cb)(struct stasis_app_control *control,
154                                        struct ast_channel *chan,
155                                        void *data);
156
157 struct stasis_app_command {
158         ast_mutex_t lock;
159         ast_cond_t condition;
160         stasis_app_command_cb callback;
161         void *data;
162         void *retval;
163         int is_done:1;
164 };
165
166 static void command_dtor(void *obj)
167 {
168         struct stasis_app_command *command = obj;
169         ast_mutex_destroy(&command->lock);
170         ast_cond_destroy(&command->condition);
171 }
172
173 static struct stasis_app_command *command_create(stasis_app_command_cb callback,
174                                                  void *data)
175 {
176         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
177
178         command = ao2_alloc(sizeof(*command), command_dtor);
179         if (!command) {
180                 return NULL;
181         }
182
183         ast_mutex_init(&command->lock);
184         ast_cond_init(&command->condition, 0);
185         command->callback = callback;
186         command->data = data;
187
188         ao2_ref(command, +1);
189         return command;
190 }
191
192 static void command_complete(struct stasis_app_command *command, void *retval)
193 {
194         SCOPED_MUTEX(lock, &command->lock);
195
196         command->is_done = 1;
197         command->retval = retval;
198         ast_cond_signal(&command->condition);
199 }
200
201 static void *wait_for_command(struct stasis_app_command *command)
202 {
203         SCOPED_MUTEX(lock, &command->lock);
204         while (!command->is_done) {
205                 ast_cond_wait(&command->condition, &command->lock);
206         }
207
208         return command->retval;
209 }
210
211 struct stasis_app_control {
212         /*! Queue of commands to dispatch on the channel */
213         struct ao2_container *command_queue;
214         /*!
215          * When set, /c app_stasis should exit and continue in the dialplan.
216          */
217         int continue_to_dialplan:1;
218         /*! Uniqueid of the associated channel */
219         char channel_id[];
220 };
221
222 static struct stasis_app_control *control_create(const char *uniqueid)
223 {
224         struct stasis_app_control *control;
225         size_t size;
226
227         size = sizeof(*control) + strlen(uniqueid) + 1;
228         control = ao2_alloc(size, NULL);
229         if (!control) {
230                 return NULL;
231         }
232
233         control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL);
234
235         strncpy(control->channel_id, uniqueid, size - sizeof(*control));
236
237         return control;
238 }
239
240 static void *exec_command(struct stasis_app_control *control,
241                           struct stasis_app_command *command)
242 {
243         ao2_lock(control);
244         ao2_ref(command, +1);
245         ao2_link(control->command_queue, command);
246         ao2_unlock(control);
247
248         return wait_for_command(command);
249 }
250
251 /*! AO2 hash function for \ref stasis_app_control */
252 static int control_hash(const void *obj, const int flags)
253 {
254         const struct stasis_app_control *control = obj;
255         const char *id = flags & OBJ_KEY ? obj : control->channel_id;
256
257         return ast_str_hash(id);
258 }
259
260 /*! AO2 comparison function for \ref stasis_app_control */
261 static int control_compare(void *lhs, void *rhs, int flags)
262 {
263         const struct stasis_app_control *lhs_control = lhs;
264         const struct stasis_app_control *rhs_control = rhs;
265         const char *rhs_name =
266                 flags & OBJ_KEY ? rhs : rhs_control->channel_id;
267
268         if (strcmp(lhs_control->channel_id, rhs_name) == 0) {
269                 return CMP_MATCH | CMP_STOP;
270         } else {
271                 return 0;
272         }
273 }
274
275 struct stasis_app_control *stasis_app_control_find_by_channel(
276         const struct ast_channel *chan)
277 {
278         if (chan == NULL) {
279                 return NULL;
280         }
281
282         return stasis_app_control_find_by_channel_id(
283                 ast_channel_uniqueid(chan));
284 }
285
286 struct stasis_app_control *stasis_app_control_find_by_channel_id(
287         const char *channel_id)
288 {
289         RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
290         controls = app_controls();
291         return ao2_find(controls, channel_id, OBJ_KEY);
292 }
293
294 /*!
295  * \brief Test the \c continue_to_dialplan bit for the given \a app.
296  *
297  * The bit is also reset for the next call.
298  *
299  * \param app Application to check the \c continue_to_dialplan bit.
300  * \return Zero to remain in \c Stasis
301  * \return Non-zero to continue in the dialplan
302  */
303 static int control_continue_test_and_reset(struct stasis_app_control *control)
304 {
305         int r;
306         SCOPED_AO2LOCK(lock, control);
307
308         r = control->continue_to_dialplan;
309         control->continue_to_dialplan = 0;
310         return r;
311 }
312
313 void stasis_app_control_continue(struct stasis_app_control *control)
314 {
315         SCOPED_AO2LOCK(lock, control);
316         control->continue_to_dialplan = 1;
317 }
318
319 static int OK = 0;
320 static int FAIL = -1;
321
322 static void *__app_control_answer(struct stasis_app_control *control,
323                                   struct ast_channel *chan, void *data)
324 {
325         ast_debug(3, "%s: Answering", control->channel_id);
326         return __ast_answer(chan, 0, 1) == 0 ? &OK : &FAIL;
327 }
328
329 int stasis_app_control_answer(struct stasis_app_control *control)
330 {
331         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
332         int *retval;
333
334         ast_debug(3, "%s: Sending answer command\n", control->channel_id);
335
336         command = command_create(__app_control_answer, NULL);
337         retval = exec_command(control, command);
338
339         if (*retval != 0) {
340                 ast_log(LOG_WARNING, "Failed to answer channel");
341         }
342
343         return *retval;
344 }
345
346 static struct ast_json *app_event_create(
347         const char *event_name,
348         const struct ast_channel_snapshot *snapshot,
349         const struct ast_json *extra_info)
350 {
351         RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
352         RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
353
354         if (extra_info) {
355                 event = ast_json_deep_copy(extra_info);
356         } else {
357                 event = ast_json_object_create();
358         }
359
360         if (snapshot) {
361                 int ret;
362
363                 /* Mustn't already have a channel field */
364                 ast_assert(ast_json_object_get(event, "channel") == NULL);
365
366                 ret = ast_json_object_set(
367                         event,
368                         "channel", ast_channel_snapshot_to_json(snapshot));
369                 if (ret != 0) {
370                         return NULL;
371                 }
372         }
373
374         message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
375
376         return ast_json_ref(message);
377 }
378
379 static int send_start_msg(struct app *app, struct ast_channel *chan,
380                           int argc, char *argv[])
381 {
382         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
383         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
384
385         struct ast_json *json_args;
386         int i;
387
388         ast_assert(chan != NULL);
389
390         /* Set channel info */
391         snapshot = ast_channel_snapshot_create(chan);
392         if (!snapshot) {
393                 return -1;
394         }
395
396         msg = ast_json_pack("{s: {s: [], s: o}}",
397                             "stasis-start",
398                             "args",
399                             "channel", ast_channel_snapshot_to_json(snapshot));
400
401         if (!msg) {
402                 return -1;
403         }
404
405         /* Append arguments to args array */
406         json_args = ast_json_object_get(
407                 ast_json_object_get(msg, "stasis-start"),
408                 "args");
409         ast_assert(json_args != NULL);
410         for (i = 0; i < argc; ++i) {
411                 int r = ast_json_array_append(json_args,
412                                               ast_json_string_create(argv[i]));
413                 if (r != 0) {
414                         ast_log(LOG_ERROR, "Error appending start message\n");
415                         return -1;
416                 }
417         }
418
419         app_send(app, msg);
420         return 0;
421 }
422
423 static int send_end_msg(struct app *app, struct ast_channel *chan)
424 {
425         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
426         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
427
428         ast_assert(chan != NULL);
429
430         /* Set channel info */
431         snapshot = ast_channel_snapshot_create(chan);
432         if (snapshot == NULL) {
433                 return -1;
434         }
435         msg = app_event_create("stasis-end", snapshot, NULL);
436         if (!msg) {
437                 return -1;
438         }
439
440         app_send(app, msg);
441         return 0;
442 }
443
444 static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
445 {
446         RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
447         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
448         const char *direction;
449
450         /* To simplify events, we'll only generate on receive */
451         direction = ast_json_string_get(
452                 ast_json_object_get(obj->blob, "direction"));
453
454         if (strcmp("Received", direction) != 0) {
455                 return;
456         }
457
458         extra = ast_json_pack(
459                 "{s: o}",
460                 "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
461         if (!extra) {
462                 return;
463         }
464
465         msg = app_event_create("dtmf-received", obj->snapshot, extra);
466         if (!msg) {
467                 return;
468         }
469
470         app_send(app, msg);
471 }
472
473 static void blob_handler(struct app *app, struct ast_channel_blob *blob)
474 {
475         /* To simplify events, we'll only generate on DTMF end */
476         if (strcmp(ast_channel_blob_json_type(blob), "dtmf_end") == 0) {
477                 dtmf_handler(app, blob);
478         }
479 }
480
481 static void sub_handler(void *data, struct stasis_subscription *sub,
482                         struct stasis_topic *topic,
483                         struct stasis_message *message)
484 {
485         struct app *app = data;
486         if (ast_channel_snapshot_type() == stasis_message_type(message)) {
487                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
488                 struct ast_channel_snapshot *snapshot =
489                         stasis_message_data(message);
490
491                 msg = app_event_create("channel-state-change", snapshot, NULL);
492                 if (!msg) {
493                         return;
494                 }
495                 app_send(app, msg);
496         } else if (ast_channel_blob_type() == stasis_message_type(message)) {
497                 struct ast_channel_blob *blob = stasis_message_data(message);
498                 blob_handler(app, blob);
499         }
500         if (stasis_subscription_final_message(sub, message)) {
501                 ao2_cleanup(data);
502         }
503 }
504
505 /*!
506  * \brief In addition to running ao2_cleanup(), this function also removes the
507  * object from the app_controls() container.
508  */
509 static void control_unlink(struct stasis_app_control *control)
510 {
511         RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
512
513         if (!control) {
514                 return;
515         }
516
517         controls = app_controls();
518         ao2_unlink_flags(controls, control,
519                          OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA);
520         ao2_cleanup(control);
521 }
522
523 static void dispatch_commands(struct stasis_app_control *control,
524                               struct ast_channel *chan)
525 {
526         struct ao2_iterator i;
527         void *obj;
528
529         SCOPED_AO2LOCK(lock, control);
530
531         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
532
533         while ((obj = ao2_iterator_next(&i))) {
534                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
535                 void *retval = command->callback(control, chan, command->data);
536                 command_complete(command, retval);
537         }
538
539         ao2_iterator_destroy(&i);
540 }
541
542
543 /*! /brief Stasis dialplan application callback */
544 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
545                     char *argv[])
546 {
547         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
548         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
549         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
550         RAII_VAR(struct stasis_subscription *, subscription, NULL,
551                  stasis_unsubscribe);
552         int res = 0;
553         int hungup = 0;
554
555         ast_assert(chan != NULL);
556
557         app = ao2_find(apps, app_name, OBJ_KEY);
558         if (!app) {
559                 ast_log(LOG_ERROR,
560                         "Stasis app '%s' not registered\n", app_name);
561                 return -1;
562         }
563
564         {
565                 RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
566
567                 controls = app_controls();
568                 control = control_create(ast_channel_uniqueid(chan));
569                 if (!control) {
570                         ast_log(LOG_ERROR, "Allocated failed\n");
571                         return -1;
572                 }
573                 ao2_link(controls, control);
574         }
575
576         subscription =
577                 stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
578         if (subscription == NULL) {
579                 ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
580                         app_name, ast_channel_name(chan));
581                 return -1;
582         }
583         ao2_ref(app, +1); /* subscription now has a reference */
584
585         res = send_start_msg(app, chan, argc, argv);
586         if (res != 0) {
587                 ast_log(LOG_ERROR, "Error sending start message to %s\n", app_name);
588                 return res;
589         }
590
591         while (1) {
592                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
593                 int r;
594
595                 if (hungup) {
596                         ast_debug(3, "%s: Hangup\n",
597                                   ast_channel_uniqueid(chan));
598                         break;
599                 }
600
601                 if (control_continue_test_and_reset(control)) {
602                         ast_debug(3, "%s: Continue\n",
603                                   ast_channel_uniqueid(chan));
604                         break;
605                 }
606
607                 r = ast_waitfor(chan, MAX_WAIT_MS);
608
609                 if (r < 0) {
610                         ast_debug(3, "%s: Poll error\n",
611                                   ast_channel_uniqueid(chan));
612                         break;
613                 }
614
615                 dispatch_commands(control, chan);
616
617                 if (r == 0) {
618                         /* Timeout */
619                         continue;
620                 }
621
622                 f = ast_read(chan);
623                 if (!f) {
624                         ast_debug(3, "%s: No more frames. Must be done, I guess.\n", ast_channel_uniqueid(chan));
625                         break;
626                 }
627
628                 switch (f->frametype) {
629                 case AST_FRAME_CONTROL:
630                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
631                                 hungup = 1;
632                         }
633                         break;
634                 default:
635                         /* Not handled; discard */
636                         break;
637                 }
638         }
639
640         res = send_end_msg(app, chan);
641         if (res != 0) {
642                 ast_log(LOG_ERROR,
643                         "Error sending end message to %s\n", app_name);
644                 return res;
645         }
646
647         return res;
648 }
649
650 int stasis_app_send(const char *app_name, struct ast_json *message)
651 {
652         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
653         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
654
655         app = ao2_find(apps, app_name, OBJ_KEY);
656
657         if (!app) {
658                 /* XXX We can do a better job handling late binding, queueing up
659                  * the call for a few seconds to wait for the app to register.
660                  */
661                 ast_log(LOG_WARNING,
662                         "Stasis app '%s' not registered\n", app_name);
663                 return -1;
664         }
665
666         app_send(app, message);
667         return 0;
668 }
669
670 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
671 {
672         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
673         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
674
675         SCOPED_LOCK(apps_lock, apps, ao2_lock, ao2_unlock);
676
677         app = ao2_find(apps, app_name, OBJ_KEY | OBJ_NOLOCK);
678
679         if (app) {
680                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
681                 SCOPED_LOCK(app_lock, app, ao2_lock, ao2_unlock);
682
683                 msg = app_event_create("application-replaced", NULL, NULL);
684                 app->handler(app->data, app_name, msg);
685
686                 app->handler = handler;
687                 ao2_cleanup(app->data);
688                 ao2_ref(data, +1);
689                 app->data = data;
690         } else {
691                 app = app_create(app_name, handler, data);
692                 if (app) {
693                         ao2_link_flags(apps, app, OBJ_NOLOCK);
694                 } else {
695                         return -1;
696                 }
697         }
698
699         return 0;
700 }
701
702 void stasis_app_unregister(const char *app_name)
703 {
704         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
705
706         if (app_name) {
707                 apps = apps_registry();
708                 ao2_cleanup(ao2_find(apps, app_name, OBJ_KEY | OBJ_UNLINK));
709         }
710 }
711
712 static int load_module(void)
713 {
714         int r = 0;
715
716         __apps_registry =
717                 ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
718         if (__apps_registry == NULL) {
719                 return AST_MODULE_LOAD_FAILURE;
720         }
721
722         __app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
723                                              control_hash, control_compare);
724         if (__app_controls == NULL) {
725                 return AST_MODULE_LOAD_FAILURE;
726         }
727
728         return r;
729 }
730
731 static int unload_module(void)
732 {
733         int r = 0;
734
735         ao2_cleanup(__apps_registry);
736         __apps_registry = NULL;
737
738         ao2_cleanup(__app_controls);
739         __app_controls = NULL;
740
741         return r;
742 }
743
744 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS,
745                 "Stasis application support",
746                 .load = load_module,
747                 .unload = unload_module);