e666f2e05866b889b6d648f52940bbad64fe06db
[asterisk/asterisk.git] / res / ari / resource_events.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2012 - 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief /api-docs/events.{format} implementation- WebSocket resource
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 #include "asterisk.h"
27
28 ASTERISK_REGISTER_FILE()
29
30 #include "asterisk/astobj2.h"
31 #include "asterisk/stasis_app.h"
32 #include "resource_events.h"
33
34 /*! Number of buckets for the Stasis application hash table. Remember to keep it
35  *  a prime number!
36  */
37 #define APPS_NUM_BUCKETS 7
38
39 /*! \brief A connection to the event WebSocket */
40 struct event_session {
41         struct ast_ari_websocket_session *ws_session;
42         struct ao2_container *websocket_apps;
43 };
44
45 /*!
46  * \brief Explicitly shutdown a session.
47  *
48  * An explicit shutdown is necessary, since stasis-app has a reference to this
49  * session. We also need to be sure to null out the \c ws_session field, since
50  * the websocket is about to go away.
51  *
52  * \param session Session info struct.
53  */
54 static void session_shutdown(struct event_session *session)
55 {
56         struct ao2_iterator i;
57         char *app;
58         SCOPED_AO2LOCK(lock, session);
59
60         i = ao2_iterator_init(session->websocket_apps, 0);
61         while ((app = ao2_iterator_next(&i))) {
62                 stasis_app_unregister(app);
63                 ao2_cleanup(app);
64         }
65         ao2_iterator_destroy(&i);
66         ao2_cleanup(session->websocket_apps);
67
68         session->websocket_apps = NULL;
69         session->ws_session = NULL;
70 }
71
72 static void session_dtor(void *obj)
73 {
74 #ifdef AST_DEVMODE /* Avoid unused variable warning */
75         struct event_session *session = obj;
76 #endif
77
78         /* session_shutdown should have been called before */
79         ast_assert(session->ws_session == NULL);
80         ast_assert(session->websocket_apps == NULL);
81 }
82
83 static void session_cleanup(struct event_session *session)
84 {
85         session_shutdown(session);
86         ao2_cleanup(session);
87 }
88
89 static struct event_session *session_create(
90         struct ast_ari_websocket_session *ws_session)
91 {
92         RAII_VAR(struct event_session *, session, NULL, ao2_cleanup);
93
94         session = ao2_alloc(sizeof(*session), session_dtor);
95
96         session->ws_session = ws_session;
97         session->websocket_apps =
98                 ast_str_container_alloc(APPS_NUM_BUCKETS);
99
100         if (!session->websocket_apps) {
101                 return NULL;
102         }
103
104         ao2_ref(session, +1);
105         return session;
106 }
107
108 /*!
109  * \brief Callback handler for Stasis application messages.
110  */
111 static void app_handler(void *data, const char *app_name,
112                         struct ast_json *message)
113 {
114         struct event_session *session = data;
115         int res;
116         const char *msg_type = S_OR(
117                 ast_json_string_get(ast_json_object_get(message, "type")),
118                 "");
119         const char *msg_application = S_OR(
120                 ast_json_string_get(ast_json_object_get(message, "application")),
121                 "");
122
123         if (!session) {
124                 return;
125         }
126  
127         /* Determine if we've been replaced */
128         if (strcmp(msg_type, "ApplicationReplaced") == 0 &&
129                 strcmp(msg_application, app_name) == 0) {
130                 ao2_find(session->websocket_apps, msg_application,
131                         OBJ_UNLINK | OBJ_NODATA);
132         }
133
134         res = ast_json_object_set(message, "application",
135                                   ast_json_string_create(app_name));
136         if(res != 0) {
137                 return;
138         }
139
140         ao2_lock(session);
141         if (session->ws_session) {
142                 ast_ari_websocket_session_write(session->ws_session, message);
143         }
144         ao2_unlock(session);
145 }
146
147 /*!
148  * \brief Register for all of the apps given.
149  * \param session Session info struct.
150  * \param app_name Name of application to register.
151  */
152 static int session_register_app(struct event_session *session,
153                                  const char *app_name)
154 {
155         SCOPED_AO2LOCK(lock, session);
156
157         ast_assert(session->ws_session != NULL);
158         ast_assert(session->websocket_apps != NULL);
159
160         if (ast_strlen_zero(app_name)) {
161                 return -1;
162         }
163
164         if (ast_str_container_add(session->websocket_apps, app_name)) {
165                 ast_ari_websocket_session_write(session->ws_session,
166                         ast_ari_oom_json());
167                 return -1;
168         }
169
170         stasis_app_register(app_name, app_handler, session);
171
172         return 0;
173 }
174
175 int ast_ari_websocket_events_event_websocket_attempted(struct ast_tcptls_session_instance *ser,
176         struct ast_variable *headers,
177         struct ast_ari_events_event_websocket_args *args)
178 {
179         int res = 0;
180         size_t i, j;
181
182         ast_debug(3, "/events WebSocket attempted\n");
183
184         if (args->app_count == 0) {
185                 ast_http_error(ser, 400, "Bad Request", "Missing param 'app'");
186                 return -1;
187         }
188
189         for (i = 0; i < args->app_count; ++i) {
190                 if (ast_strlen_zero(args->app[i])) {
191                         res = -1;
192                         break;
193                 }
194
195                 res |= stasis_app_register(args->app[i], app_handler, NULL);
196         }
197
198         if (res) {
199                 for (j = 0; j < i; ++j) {
200                         stasis_app_unregister(args->app[j]);
201                 }
202                 ast_http_error(ser, 400, "Bad Request", "Invalid application provided in param 'app'.");
203         }
204
205         return res;
206 }
207
208 void ast_ari_websocket_events_event_websocket_established(struct ast_ari_websocket_session *ws_session,
209         struct ast_variable *headers,
210         struct ast_ari_events_event_websocket_args *args)
211 {
212         RAII_VAR(struct event_session *, session, NULL, session_cleanup);
213         struct ast_json *msg;
214         int res;
215         size_t i;
216
217         ast_debug(3, "/events WebSocket connection\n");
218
219         session = session_create(ws_session);
220         if (!session) {
221                 ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
222                 return;
223         }
224
225         res = 0;
226         for (i = 0; i < args->app_count; ++i) {
227                 if (ast_strlen_zero(args->app[i])) {
228                         continue;
229                 }
230                 res |= session_register_app(session, args->app[i]);
231         }
232
233         if (ao2_container_count(session->websocket_apps) == 0) {
234                 RAII_VAR(struct ast_json *, msg, NULL, ast_json_unref);
235
236                 msg = ast_json_pack("{s: s, s: [s]}",
237                         "type", "MissingParams",
238                         "params", "app");
239                 if (!msg) {
240                         msg = ast_json_ref(ast_ari_oom_json());
241                 }
242
243                 ast_ari_websocket_session_write(session->ws_session, msg);
244                 return;
245         }
246
247         if (res != 0) {
248                 ast_ari_websocket_session_write(ws_session, ast_ari_oom_json());
249                 return;
250         }
251
252         /* We don't process any input, but we'll consume it waiting for EOF */
253         while ((msg = ast_ari_websocket_session_read(ws_session))) {
254                 ast_json_unref(msg);
255         }
256 }
257
258 void ast_ari_events_user_event(struct ast_variable *headers,
259         struct ast_ari_events_user_event_args *args,
260         struct ast_ari_response *response)
261 {
262         enum stasis_app_user_event_res res;
263         struct ast_json *json_variables = NULL;
264
265         if (args->variables) {
266                 ast_ari_events_user_event_parse_body(args->variables, args);
267                 json_variables = ast_json_object_get(args->variables, "variables");
268         }
269
270         if (ast_strlen_zero(args->application)) {
271                 ast_ari_response_error(response, 400, "Bad Request",
272                         "Missing parameter application");
273                 return;
274         }
275
276         res = stasis_app_user_event(args->application,
277                 args->event_name,
278                 args->source, args->source_count,
279                 json_variables);
280
281         switch (res) {
282         case STASIS_APP_USER_OK:
283                 ast_ari_response_no_content(response);
284                 break;
285
286         case STASIS_APP_USER_APP_NOT_FOUND:
287                 ast_ari_response_error(response, 404, "Not Found",
288                         "Application not found");
289                 break;
290
291         case STASIS_APP_USER_EVENT_SOURCE_NOT_FOUND:
292                 ast_ari_response_error(response, 422, "Unprocessable Entity",
293                         "Event source was not found");
294                 break;
295
296         case STASIS_APP_USER_EVENT_SOURCE_BAD_SCHEME:
297                 ast_ari_response_error(response, 400, "Bad Request",
298                         "Invalid event source URI scheme");
299                 break;
300
301         case STASIS_APP_USER_USEREVENT_INVALID:
302                 ast_ari_response_error(response, 400, "Bad Request",
303                         "Invalid userevnet data");
304                 break;
305
306         case STASIS_APP_USER_INTERNAL_ERROR:
307         default:
308                 ast_ari_response_error(response, 500, "Internal Server Error",
309                         "Error processing request");
310         }
311 }
312