Add a SystemName field to all AMI events.
[asterisk/asterisk.git] / res / res_stasis_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief HTTP binding for the Stasis API
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <depend type="module">res_stasis</depend>
28         <depend type="module">res_http_websocket</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/http_websocket.h"
38 #include "asterisk/json.h"
39 #include "asterisk/module.h"
40 #include "asterisk/stasis_app.h"
41 #include "asterisk/strings.h"
42 #include "asterisk/utils.h"
43
44 /*! WebSocket protocol for Stasis */
45 static const char * const ws_protocol = "stasis";
46
47 /*! Message to send when out of memory */
48 static struct ast_json *oom_json;
49
50 /*! Number of buckets for the Stasis application hash table. Remember to keep it
51  *  a prime number!
52  */
53 #define APPS_NUM_BUCKETS 7
54
55 /*!
56  * \internal
57  * \brief Helper to write a JSON object to a WebSocket.
58  * \param session WebSocket session.
59  * \param message JSON message.
60  * \return 0 on success.
61  * \return -1 on error.
62  */
63 static int websocket_write_json(struct ast_websocket *session,
64                                 struct ast_json *message)
65 {
66         RAII_VAR(char *, str, ast_json_dump_string(message), ast_free);
67
68         if (str == NULL) {
69                 ast_log(LOG_ERROR, "Failed to encode JSON object\n");
70                 return -1;
71         }
72
73         return ast_websocket_write(session, AST_WEBSOCKET_OPCODE_TEXT, str,
74                                    strlen(str));
75 }
76
77 struct stasis_ws_session_info {
78         struct ast_websocket *ws_session;
79         struct ao2_container *websocket_apps;
80 };
81
82 static void session_dtor(void *obj)
83 {
84 #ifdef AST_DEVMODE /* Avoid unused variable warning */
85         struct stasis_ws_session_info *session = obj;
86 #endif
87
88         /* session_shutdown should have been called before */
89         ast_assert(session->ws_session == NULL);
90         ast_assert(session->websocket_apps == NULL);
91 }
92
93 static struct stasis_ws_session_info *session_create(
94         struct ast_websocket *ws_session)
95 {
96         RAII_VAR(struct stasis_ws_session_info *, session, NULL, ao2_cleanup);
97
98         session = ao2_alloc(sizeof(*session), session_dtor);
99
100         session->ws_session = ws_session;
101         session->websocket_apps =
102                 ast_str_container_alloc(APPS_NUM_BUCKETS);
103
104         if (!session->websocket_apps) {
105                 return NULL;
106         }
107
108         ao2_ref(session, +1);
109         return session;
110 }
111
112 /*!
113  * \brief Explicitly shutdown a session.
114  *
115  * An explicit shutdown is necessary, since stasis-app has a reference to this
116  * session. We also need to be sure to null out the \c ws_session field, since
117  * the websocket is about to go away.
118  *
119  * \param session Session info struct.
120  */
121 static void session_shutdown(struct stasis_ws_session_info *session)
122 {
123         struct ao2_iterator i;
124         char *app;
125         SCOPED_AO2LOCK(lock, session);
126
127         i = ao2_iterator_init(session->websocket_apps, 0);
128         while ((app = ao2_iterator_next(&i))) {
129                 stasis_app_unregister(app);
130                 ao2_cleanup(app);
131         }
132         ao2_iterator_destroy(&i);
133         ao2_cleanup(session->websocket_apps);
134
135         session->websocket_apps = NULL;
136         session->ws_session = NULL;
137 }
138
139 /*!
140  * \brief Callback handler for Stasis application messages.
141  */
142 static void app_handler(void *data, const char *app_name,
143                         struct ast_json *message)
144 {
145         struct stasis_ws_session_info *session = data;
146         int res;
147
148         res = ast_json_object_set(message, "application",
149                                   ast_json_string_create(app_name));
150         if(res != 0) {
151                 return;
152         }
153
154         ao2_lock(session);
155         if (session->ws_session) {
156                 websocket_write_json(session->ws_session, message);
157         }
158         ao2_unlock(session);
159 }
160
161 /*!
162  * \brief Register for all of the apps given.
163  * \param session Session info struct.
164  * \param app_list Comma seperated list of app names to register.
165  */
166 static int session_register_apps(struct stasis_ws_session_info *session,
167                                  const char *app_list)
168 {
169         RAII_VAR(char *, to_free, NULL, ast_free);
170         char *apps, *app_name;
171         SCOPED_AO2LOCK(lock, session);
172
173         ast_assert(session->ws_session != NULL);
174         ast_assert(session->websocket_apps != NULL);
175
176         to_free = apps = ast_strdup(app_list);
177         if (!apps) {
178                 websocket_write_json(session->ws_session, oom_json);
179                 return -1;
180         }
181         while ((app_name = strsep(&apps, ","))) {
182                 if (ast_str_container_add(session->websocket_apps, app_name)) {
183                         websocket_write_json(session->ws_session, oom_json);
184                         return -1;
185                 }
186
187                 stasis_app_register(app_name, app_handler, session);
188         }
189         return 0;
190 }
191
192 static void websocket_callback(struct ast_websocket *ws_session,
193                                struct ast_variable *parameters,
194                                struct ast_variable *headers)
195 {
196         RAII_VAR(struct stasis_ws_session_info *, stasis_session, NULL, ao2_cleanup);
197         struct ast_variable *param = NULL;
198         int res;
199
200         ast_debug(3, "Stasis web socket connection\n");
201
202         if (ast_websocket_set_nonblock(ws_session) != 0) {
203                 ast_log(LOG_ERROR,
204                         "Stasis web socket failed to set nonblock; closing\n");
205                 goto end;
206         }
207
208         stasis_session = session_create(ws_session);
209
210         if (!stasis_session) {
211                 websocket_write_json(ws_session, oom_json);
212                 goto end;
213         }
214
215         for (param = parameters; param; param = param->next) {
216                 if (strcmp(param->name, "app") == 0) {
217                         int ret = session_register_apps(
218                                 stasis_session, param->value);
219                         if (ret != 0) {
220                                 goto end;
221                         }
222                 }
223         }
224
225         if (ao2_container_count(stasis_session->websocket_apps) == 0) {
226                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
227
228                 msg = ast_json_pack("{s: s, s: [s]}",
229                                     "error", "MissingParams",
230                                     "params", "app");
231                 if (msg) {
232                         websocket_write_json(ws_session, msg);
233                 }
234
235                 goto end;
236         }
237
238         while ((res = ast_wait_for_input(ast_websocket_fd(ws_session), -1)) > 0) {
239                 char *payload;
240                 uint64_t payload_len;
241                 enum ast_websocket_opcode opcode;
242                 int fragmented;
243                 int read = ast_websocket_read(ws_session, &payload, &payload_len,
244                                               &opcode, &fragmented);
245
246                 if (read) {
247                         ast_log(LOG_ERROR,
248                                 "Stasis WebSocket read error; closing\n");
249                         break;
250                 }
251
252                 if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
253                         break;
254                 }
255         }
256
257 end:
258         session_shutdown(stasis_session);
259         ast_websocket_unref(ws_session);
260 }
261
262 static int load_module(void)
263 {
264         int r = 0;
265
266         stasis_app_ref();
267         oom_json = ast_json_pack("{s: s}",
268                                  "error", "OutOfMemory");
269         if (!oom_json) {
270                 /* ironic */
271                 return AST_MODULE_LOAD_FAILURE;
272         }
273         r |= ast_websocket_add_protocol(ws_protocol, websocket_callback);
274         return r;
275 }
276
277 static int unload_module(void)
278 {
279         int r = 0;
280
281         stasis_app_unref();
282         ast_json_unref(oom_json);
283         oom_json = NULL;
284         r |= ast_websocket_remove_protocol(ws_protocol, websocket_callback);
285         return r;
286 }
287
288 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "Stasis HTTP bindings",
289         .load = load_module,
290         .unload = unload_module,
291         .nonoptreq = "res_stasis,res_http_websocket",
292         .load_pri = AST_MODPRI_APP_DEPEND,
293         );