Remove required type field from channel blobs
[asterisk/asterisk.git] / res / res_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis application support.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/channel.h"
36 #include "asterisk/lock.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_app.h"
40 #include "asterisk/stasis_channels.h"
41 #include "asterisk/strings.h"
42
43 /*! Time to wait for a frame in the application */
44 #define MAX_WAIT_MS 200
45
46 /*!
47  * \brief Number of buckets for the Stasis application hash table.  Remember to
48  * keep it a prime number!
49  */
50 #define APPS_NUM_BUCKETS 127
51
52 /*!
53  * \brief Number of buckets for the Stasis application hash table.  Remember to
54  * keep it a prime number!
55  */
56 #define CONTROLS_NUM_BUCKETS 127
57
58 /*!
59  * \brief Stasis application container. Please call apps_registry() instead of
60  * directly accessing.
61  */
62 struct ao2_container *__apps_registry;
63
64 struct ao2_container *__app_controls;
65
66 /*! Ref-counting accessor for the stasis applications container */
67 static struct ao2_container *apps_registry(void)
68 {
69         ao2_ref(__apps_registry, +1);
70         return __apps_registry;
71 }
72
73 static struct ao2_container *app_controls(void)
74 {
75         ao2_ref(__app_controls, +1);
76         return __app_controls;
77 }
78
79 struct app {
80         /*! Callback function for this application. */
81         stasis_app_cb handler;
82         /*! Opaque data to hand to callback function. */
83         void *data;
84         /*! Name of the Stasis application */
85         char name[];
86 };
87
88 static void app_dtor(void *obj)
89 {
90         struct app *app = obj;
91
92         ao2_cleanup(app->data);
93         app->data = NULL;
94 }
95
96 /*! Constructor for \ref app. */
97 static struct app *app_create(const char *name, stasis_app_cb handler, void *data)
98 {
99         struct app *app;
100         size_t size;
101
102         ast_assert(name != NULL);
103         ast_assert(handler != NULL);
104
105         size = sizeof(*app) + strlen(name) + 1;
106         app = ao2_alloc_options(size, app_dtor, AO2_ALLOC_OPT_LOCK_MUTEX);
107
108         if (!app) {
109                 return NULL;
110         }
111
112         strncpy(app->name, name, size - sizeof(*app));
113         app->handler = handler;
114         ao2_ref(data, +1);
115         app->data = data;
116
117         return app;
118 }
119
120 /*! AO2 hash function for \ref app */
121 static int app_hash(const void *obj, const int flags)
122 {
123         const struct app *app = obj;
124         const char *name = flags & OBJ_KEY ? obj : app->name;
125
126         return ast_str_hash(name);
127 }
128
129 /*! AO2 comparison function for \ref app */
130 static int app_compare(void *lhs, void *rhs, int flags)
131 {
132         const struct app *lhs_app = lhs;
133         const struct app *rhs_app = rhs;
134         const char *rhs_name = flags & OBJ_KEY ? rhs : rhs_app->name;
135
136         if (strcmp(lhs_app->name, rhs_name) == 0) {
137                 return CMP_MATCH | CMP_STOP;
138         } else {
139                 return 0;
140         }
141 }
142
143 /*!
144  * \brief Send a message to the given application.
145  * \param app App to send the message to.
146  * \param message Message to send.
147  */
148 static void app_send(struct app *app, struct ast_json *message)
149 {
150         app->handler(app->data, app->name, message);
151 }
152
153 typedef void* (*stasis_app_command_cb)(struct stasis_app_control *control,
154                                        struct ast_channel *chan,
155                                        void *data);
156
157 struct stasis_app_command {
158         ast_mutex_t lock;
159         ast_cond_t condition;
160         stasis_app_command_cb callback;
161         void *data;
162         void *retval;
163         int is_done:1;
164 };
165
166 static void command_dtor(void *obj)
167 {
168         struct stasis_app_command *command = obj;
169         ast_mutex_destroy(&command->lock);
170         ast_cond_destroy(&command->condition);
171 }
172
173 static struct stasis_app_command *command_create(stasis_app_command_cb callback,
174                                                  void *data)
175 {
176         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
177
178         command = ao2_alloc(sizeof(*command), command_dtor);
179         if (!command) {
180                 return NULL;
181         }
182
183         ast_mutex_init(&command->lock);
184         ast_cond_init(&command->condition, 0);
185         command->callback = callback;
186         command->data = data;
187
188         ao2_ref(command, +1);
189         return command;
190 }
191
192 static void command_complete(struct stasis_app_command *command, void *retval)
193 {
194         SCOPED_MUTEX(lock, &command->lock);
195
196         command->is_done = 1;
197         command->retval = retval;
198         ast_cond_signal(&command->condition);
199 }
200
201 static void *wait_for_command(struct stasis_app_command *command)
202 {
203         SCOPED_MUTEX(lock, &command->lock);
204         while (!command->is_done) {
205                 ast_cond_wait(&command->condition, &command->lock);
206         }
207
208         return command->retval;
209 }
210
211 struct stasis_app_control {
212         /*! Queue of commands to dispatch on the channel */
213         struct ao2_container *command_queue;
214         /*!
215          * When set, /c app_stasis should exit and continue in the dialplan.
216          */
217         int continue_to_dialplan:1;
218         /*! Uniqueid of the associated channel */
219         char channel_id[];
220 };
221
222 static struct stasis_app_control *control_create(const char *uniqueid)
223 {
224         struct stasis_app_control *control;
225         size_t size;
226
227         size = sizeof(*control) + strlen(uniqueid) + 1;
228         control = ao2_alloc(size, NULL);
229         if (!control) {
230                 return NULL;
231         }
232
233         control->command_queue = ao2_container_alloc_list(0, 0, NULL, NULL);
234
235         strncpy(control->channel_id, uniqueid, size - sizeof(*control));
236
237         return control;
238 }
239
240 static void *exec_command(struct stasis_app_control *control,
241                           struct stasis_app_command *command)
242 {
243         ao2_lock(control);
244         ao2_ref(command, +1);
245         ao2_link(control->command_queue, command);
246         ao2_unlock(control);
247
248         return wait_for_command(command);
249 }
250
251 /*! AO2 hash function for \ref stasis_app_control */
252 static int control_hash(const void *obj, const int flags)
253 {
254         const struct stasis_app_control *control = obj;
255         const char *id = flags & OBJ_KEY ? obj : control->channel_id;
256
257         return ast_str_hash(id);
258 }
259
260 /*! AO2 comparison function for \ref stasis_app_control */
261 static int control_compare(void *lhs, void *rhs, int flags)
262 {
263         const struct stasis_app_control *lhs_control = lhs;
264         const struct stasis_app_control *rhs_control = rhs;
265         const char *rhs_name =
266                 flags & OBJ_KEY ? rhs : rhs_control->channel_id;
267
268         if (strcmp(lhs_control->channel_id, rhs_name) == 0) {
269                 return CMP_MATCH | CMP_STOP;
270         } else {
271                 return 0;
272         }
273 }
274
275 struct stasis_app_control *stasis_app_control_find_by_channel(
276         const struct ast_channel *chan)
277 {
278         if (chan == NULL) {
279                 return NULL;
280         }
281
282         return stasis_app_control_find_by_channel_id(
283                 ast_channel_uniqueid(chan));
284 }
285
286 struct stasis_app_control *stasis_app_control_find_by_channel_id(
287         const char *channel_id)
288 {
289         RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
290         controls = app_controls();
291         return ao2_find(controls, channel_id, OBJ_KEY);
292 }
293
294 /*!
295  * \brief Test the \c continue_to_dialplan bit for the given \a app.
296  *
297  * The bit is also reset for the next call.
298  *
299  * \param app Application to check the \c continue_to_dialplan bit.
300  * \return Zero to remain in \c Stasis
301  * \return Non-zero to continue in the dialplan
302  */
303 static int control_continue_test_and_reset(struct stasis_app_control *control)
304 {
305         int r;
306         SCOPED_AO2LOCK(lock, control);
307
308         r = control->continue_to_dialplan;
309         control->continue_to_dialplan = 0;
310         return r;
311 }
312
313 void stasis_app_control_continue(struct stasis_app_control *control)
314 {
315         SCOPED_AO2LOCK(lock, control);
316         control->continue_to_dialplan = 1;
317 }
318
319 static int OK = 0;
320 static int FAIL = -1;
321
322 static void *__app_control_answer(struct stasis_app_control *control,
323                                   struct ast_channel *chan, void *data)
324 {
325         ast_debug(3, "%s: Answering", control->channel_id);
326         return __ast_answer(chan, 0, 1) == 0 ? &OK : &FAIL;
327 }
328
329 int stasis_app_control_answer(struct stasis_app_control *control)
330 {
331         RAII_VAR(struct stasis_app_command *, command, NULL, ao2_cleanup);
332         int *retval;
333
334         ast_debug(3, "%s: Sending answer command\n", control->channel_id);
335
336         command = command_create(__app_control_answer, NULL);
337         retval = exec_command(control, command);
338
339         if (*retval != 0) {
340                 ast_log(LOG_WARNING, "Failed to answer channel");
341         }
342
343         return *retval;
344 }
345
346 static struct ast_json *app_event_create(
347         const char *event_name,
348         const struct ast_channel_snapshot *snapshot,
349         const struct ast_json *extra_info)
350 {
351         RAII_VAR(struct ast_json *, message, NULL, ast_json_unref);
352         RAII_VAR(struct ast_json *, event, NULL, ast_json_unref);
353
354         if (extra_info) {
355                 event = ast_json_deep_copy(extra_info);
356         } else {
357                 event = ast_json_object_create();
358         }
359
360         if (snapshot) {
361                 int ret;
362
363                 /* Mustn't already have a channel field */
364                 ast_assert(ast_json_object_get(event, "channel") == NULL);
365
366                 ret = ast_json_object_set(
367                         event,
368                         "channel", ast_channel_snapshot_to_json(snapshot));
369                 if (ret != 0) {
370                         return NULL;
371                 }
372         }
373
374         message = ast_json_pack("{s: o}", event_name, ast_json_ref(event));
375
376         return ast_json_ref(message);
377 }
378
379 static int send_start_msg(struct app *app, struct ast_channel *chan,
380                           int argc, char *argv[])
381 {
382         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
383         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
384
385         struct ast_json *json_args;
386         int i;
387
388         ast_assert(chan != NULL);
389
390         /* Set channel info */
391         snapshot = ast_channel_snapshot_create(chan);
392         if (!snapshot) {
393                 return -1;
394         }
395
396         msg = ast_json_pack("{s: {s: [], s: o}}",
397                             "stasis-start",
398                             "args",
399                             "channel", ast_channel_snapshot_to_json(snapshot));
400
401         if (!msg) {
402                 return -1;
403         }
404
405         /* Append arguments to args array */
406         json_args = ast_json_object_get(
407                 ast_json_object_get(msg, "stasis-start"),
408                 "args");
409         ast_assert(json_args != NULL);
410         for (i = 0; i < argc; ++i) {
411                 int r = ast_json_array_append(json_args,
412                                               ast_json_string_create(argv[i]));
413                 if (r != 0) {
414                         ast_log(LOG_ERROR, "Error appending start message\n");
415                         return -1;
416                 }
417         }
418
419         app_send(app, msg);
420         return 0;
421 }
422
423 static int send_end_msg(struct app *app, struct ast_channel *chan)
424 {
425         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
426         RAII_VAR(struct ast_channel_snapshot *, snapshot, NULL, ao2_cleanup);
427
428         ast_assert(chan != NULL);
429
430         /* Set channel info */
431         snapshot = ast_channel_snapshot_create(chan);
432         if (snapshot == NULL) {
433                 return -1;
434         }
435         msg = app_event_create("stasis-end", snapshot, NULL);
436         if (!msg) {
437                 return -1;
438         }
439
440         app_send(app, msg);
441         return 0;
442 }
443
444 static void dtmf_handler(struct app *app, struct ast_channel_blob *obj)
445 {
446         RAII_VAR(struct ast_json *, extra, NULL, ast_json_unref);
447         RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
448         const char *direction;
449
450         /* To simplify events, we'll only generate on receive */
451         direction = ast_json_string_get(
452                 ast_json_object_get(obj->blob, "direction"));
453
454         if (strcmp("Received", direction) != 0) {
455                 return;
456         }
457
458         extra = ast_json_pack(
459                 "{s: o}",
460                 "digit", ast_json_ref(ast_json_object_get(obj->blob, "digit")));
461         if (!extra) {
462                 return;
463         }
464
465         msg = app_event_create("dtmf-received", obj->snapshot, extra);
466         if (!msg) {
467                 return;
468         }
469
470         app_send(app, msg);
471 }
472
473 static void sub_handler(void *data, struct stasis_subscription *sub,
474                         struct stasis_topic *topic,
475                         struct stasis_message *message)
476 {
477         struct app *app = data;
478
479         if (stasis_subscription_final_message(sub, message)) {
480                 ao2_cleanup(data);
481                 return;
482         }
483
484         if (ast_channel_snapshot_type() == stasis_message_type(message)) {
485                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
486                 struct ast_channel_snapshot *snapshot =
487                         stasis_message_data(message);
488
489                 msg = app_event_create("channel-state-change", snapshot, NULL);
490                 if (!msg) {
491                         return;
492                 }
493                 app_send(app, msg);
494         } else if (ast_channel_dtmf_end_type() == stasis_message_type(message)) {
495                 /* To simplify events, we'll only generate on DTMF end */
496                 struct ast_channel_blob *blob = stasis_message_data(message);
497                 dtmf_handler(app, blob);
498         }
499
500 }
501
502 /*!
503  * \brief In addition to running ao2_cleanup(), this function also removes the
504  * object from the app_controls() container.
505  */
506 static void control_unlink(struct stasis_app_control *control)
507 {
508         RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
509
510         if (!control) {
511                 return;
512         }
513
514         controls = app_controls();
515         ao2_unlink_flags(controls, control,
516                          OBJ_POINTER | OBJ_UNLINK | OBJ_NODATA);
517         ao2_cleanup(control);
518 }
519
520 static void dispatch_commands(struct stasis_app_control *control,
521                               struct ast_channel *chan)
522 {
523         struct ao2_iterator i;
524         void *obj;
525
526         SCOPED_AO2LOCK(lock, control);
527
528         i = ao2_iterator_init(control->command_queue, AO2_ITERATOR_UNLINK);
529
530         while ((obj = ao2_iterator_next(&i))) {
531                 RAII_VAR(struct stasis_app_command *, command, obj, ao2_cleanup);
532                 void *retval = command->callback(control, chan, command->data);
533                 command_complete(command, retval);
534         }
535
536         ao2_iterator_destroy(&i);
537 }
538
539
540 /*! /brief Stasis dialplan application callback */
541 int stasis_app_exec(struct ast_channel *chan, const char *app_name, int argc,
542                     char *argv[])
543 {
544         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
545         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
546         RAII_VAR(struct stasis_app_control *, control, NULL, control_unlink);
547         RAII_VAR(struct stasis_subscription *, subscription, NULL,
548                  stasis_unsubscribe);
549         int res = 0;
550         int hungup = 0;
551
552         ast_assert(chan != NULL);
553
554         app = ao2_find(apps, app_name, OBJ_KEY);
555         if (!app) {
556                 ast_log(LOG_ERROR,
557                         "Stasis app '%s' not registered\n", app_name);
558                 return -1;
559         }
560
561         {
562                 RAII_VAR(struct ao2_container *, controls, NULL, ao2_cleanup);
563
564                 controls = app_controls();
565                 control = control_create(ast_channel_uniqueid(chan));
566                 if (!control) {
567                         ast_log(LOG_ERROR, "Allocated failed\n");
568                         return -1;
569                 }
570                 ao2_link(controls, control);
571         }
572
573         subscription =
574                 stasis_subscribe(ast_channel_topic(chan), sub_handler, app);
575         if (subscription == NULL) {
576                 ast_log(LOG_ERROR, "Error subscribing app %s to channel %s\n",
577                         app_name, ast_channel_name(chan));
578                 return -1;
579         }
580         ao2_ref(app, +1); /* subscription now has a reference */
581
582         res = send_start_msg(app, chan, argc, argv);
583         if (res != 0) {
584                 ast_log(LOG_ERROR, "Error sending start message to %s\n", app_name);
585                 return res;
586         }
587
588         while (1) {
589                 RAII_VAR(struct ast_frame *, f, NULL, ast_frame_dtor);
590                 int r;
591
592                 if (hungup) {
593                         ast_debug(3, "%s: Hangup\n",
594                                   ast_channel_uniqueid(chan));
595                         break;
596                 }
597
598                 if (control_continue_test_and_reset(control)) {
599                         ast_debug(3, "%s: Continue\n",
600                                   ast_channel_uniqueid(chan));
601                         break;
602                 }
603
604                 r = ast_waitfor(chan, MAX_WAIT_MS);
605
606                 if (r < 0) {
607                         ast_debug(3, "%s: Poll error\n",
608                                   ast_channel_uniqueid(chan));
609                         break;
610                 }
611
612                 dispatch_commands(control, chan);
613
614                 if (r == 0) {
615                         /* Timeout */
616                         continue;
617                 }
618
619                 f = ast_read(chan);
620                 if (!f) {
621                         ast_debug(3, "%s: No more frames. Must be done, I guess.\n", ast_channel_uniqueid(chan));
622                         break;
623                 }
624
625                 switch (f->frametype) {
626                 case AST_FRAME_CONTROL:
627                         if (f->subclass.integer == AST_CONTROL_HANGUP) {
628                                 hungup = 1;
629                         }
630                         break;
631                 default:
632                         /* Not handled; discard */
633                         break;
634                 }
635         }
636
637         res = send_end_msg(app, chan);
638         if (res != 0) {
639                 ast_log(LOG_ERROR,
640                         "Error sending end message to %s\n", app_name);
641                 return res;
642         }
643
644         return res;
645 }
646
647 int stasis_app_send(const char *app_name, struct ast_json *message)
648 {
649         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
650         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
651
652         app = ao2_find(apps, app_name, OBJ_KEY);
653
654         if (!app) {
655                 /* XXX We can do a better job handling late binding, queueing up
656                  * the call for a few seconds to wait for the app to register.
657                  */
658                 ast_log(LOG_WARNING,
659                         "Stasis app '%s' not registered\n", app_name);
660                 return -1;
661         }
662
663         app_send(app, message);
664         return 0;
665 }
666
667 int stasis_app_register(const char *app_name, stasis_app_cb handler, void *data)
668 {
669         RAII_VAR(struct ao2_container *, apps, apps_registry(), ao2_cleanup);
670         RAII_VAR(struct app *, app, NULL, ao2_cleanup);
671
672         SCOPED_LOCK(apps_lock, apps, ao2_lock, ao2_unlock);
673
674         app = ao2_find(apps, app_name, OBJ_KEY | OBJ_NOLOCK);
675
676         if (app) {
677                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
678                 SCOPED_LOCK(app_lock, app, ao2_lock, ao2_unlock);
679
680                 msg = app_event_create("application-replaced", NULL, NULL);
681                 app->handler(app->data, app_name, msg);
682
683                 app->handler = handler;
684                 ao2_cleanup(app->data);
685                 ao2_ref(data, +1);
686                 app->data = data;
687         } else {
688                 app = app_create(app_name, handler, data);
689                 if (app) {
690                         ao2_link_flags(apps, app, OBJ_NOLOCK);
691                 } else {
692                         return -1;
693                 }
694         }
695
696         return 0;
697 }
698
699 void stasis_app_unregister(const char *app_name)
700 {
701         RAII_VAR(struct ao2_container *, apps, NULL, ao2_cleanup);
702
703         if (app_name) {
704                 apps = apps_registry();
705                 ao2_cleanup(ao2_find(apps, app_name, OBJ_KEY | OBJ_UNLINK));
706         }
707 }
708
709 static int load_module(void)
710 {
711         int r = 0;
712
713         __apps_registry =
714                 ao2_container_alloc(APPS_NUM_BUCKETS, app_hash, app_compare);
715         if (__apps_registry == NULL) {
716                 return AST_MODULE_LOAD_FAILURE;
717         }
718
719         __app_controls = ao2_container_alloc(CONTROLS_NUM_BUCKETS,
720                                              control_hash, control_compare);
721         if (__app_controls == NULL) {
722                 return AST_MODULE_LOAD_FAILURE;
723         }
724
725         return r;
726 }
727
728 static int unload_module(void)
729 {
730         int r = 0;
731
732         ao2_cleanup(__apps_registry);
733         __apps_registry = NULL;
734
735         ao2_cleanup(__app_controls);
736         __app_controls = NULL;
737
738         return r;
739 }
740
741 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS,
742                 "Stasis application support",
743                 .load = load_module,
744                 .unload = unload_module);