70e5c2abf2bb6b0f1f8d5579a92b10acea2b632c
[asterisk/asterisk.git] / res / res_pjsip_transport_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Jason Parker <jparker@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \brief WebSocket transport module
21  */
22
23 /*** MODULEINFO
24         <depend>pjproject</depend>
25         <depend>res_pjsip</depend>
26         <depend>res_http_websocket</depend>
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include <pjsip.h>
33 #include <pjsip_ua.h>
34
35 #include "asterisk/module.h"
36 #include "asterisk/http_websocket.h"
37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
39 #include "asterisk/taskprocessor.h"
40
41 static int transport_type_ws;
42 static int transport_type_wss;
43
44 /*!
45  * \brief Wrapper for pjsip_transport, for storing the WebSocket session
46  */
47 struct ws_transport {
48         pjsip_transport transport;
49         pjsip_rx_data rdata;
50         struct ast_websocket *ws_session;
51 };
52
53 /*!
54  * \brief Send a message over the WebSocket connection.
55  *
56  * Called by pjsip transport manager.
57  */
58 static pj_status_t ws_send_msg(pjsip_transport *transport,
59                             pjsip_tx_data *tdata,
60                             const pj_sockaddr_t *rem_addr,
61                             int addr_len,
62                             void *token,
63                             pjsip_transport_callback callback)
64 {
65         struct ws_transport *wstransport = (struct ws_transport *)transport;
66
67         if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, (int)(tdata->buf.cur - tdata->buf.start))) {
68                 return PJ_EUNKNOWN;
69         }
70
71         return PJ_SUCCESS;
72 }
73
74 /*!
75  * \brief Destroy the pjsip transport.
76  *
77  * Called by pjsip transport manager.
78  */
79 static pj_status_t ws_destroy(pjsip_transport *transport)
80 {
81         struct ws_transport *wstransport = (struct ws_transport *)transport;
82
83         if (wstransport->transport.ref_cnt) {
84                 pj_atomic_destroy(wstransport->transport.ref_cnt);
85         }
86
87         if (wstransport->transport.lock) {
88                 pj_lock_destroy(wstransport->transport.lock);
89         }
90
91         pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
92
93         if (wstransport->rdata.tp_info.pool) {
94                 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
95         }
96
97         return PJ_SUCCESS;
98 }
99
100 static int transport_shutdown(void *data)
101 {
102         pjsip_transport *transport = data;
103
104         pjsip_transport_shutdown(transport);
105         return 0;
106 }
107
108 struct transport_create_data {
109         struct ws_transport *transport;
110         struct ast_websocket *ws_session;
111 };
112
113 /*!
114  * \brief Create a pjsip transport.
115  */
116 static int transport_create(void *data)
117 {
118         struct transport_create_data *create_data = data;
119         struct ws_transport *newtransport;
120
121         pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
122         struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
123
124         pj_pool_t *pool;
125
126         pj_str_t buf;
127
128         if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
129                 ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
130                 return -1;
131         }
132
133         if (!(newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport))) {
134                 ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
135                 pjsip_endpt_release_pool(endpt, pool);
136                 return -1;
137         }
138
139         newtransport->ws_session = create_data->ws_session;
140
141         pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
142         pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
143
144         newtransport->transport.pool = pool;
145         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr);
146         newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
147         newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws;
148
149         newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
150
151         pj_sockaddr_cp(&newtransport->transport.local_addr, &newtransport->transport.key.rem_addr);
152
153         newtransport->transport.local_name.host.ptr = (char *)pj_pool_alloc(pool, newtransport->transport.addr_len+4);
154         pj_sockaddr_print(&newtransport->transport.key.rem_addr, newtransport->transport.local_name.host.ptr, newtransport->transport.addr_len+4, 0);
155         newtransport->transport.local_name.host.slen = pj_ansi_strlen(newtransport->transport.local_name.host.ptr);
156         newtransport->transport.local_name.port = pj_sockaddr_get_port(&newtransport->transport.key.rem_addr);
157
158         newtransport->transport.type_name = (char *)pjsip_transport_get_type_name(newtransport->transport.key.type);
159         newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
160         newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64);
161
162         newtransport->transport.endpt = endpt;
163         newtransport->transport.tpmgr = tpmgr;
164         newtransport->transport.send_msg = &ws_send_msg;
165         newtransport->transport.destroy = &ws_destroy;
166
167         pjsip_transport_register(newtransport->transport.tpmgr, (pjsip_transport *)newtransport);
168
169         newtransport->rdata.tp_info.transport = &newtransport->transport;
170         newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
171                 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
172         if (!newtransport->rdata.tp_info.pool) {
173                 ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
174                 pjsip_endpt_release_pool(endpt, pool);
175                 return -1;
176         }
177
178         create_data->transport = newtransport;
179         return 0;
180 }
181
182 struct transport_read_data {
183         struct ws_transport *transport;
184         char *payload;
185         uint64_t payload_len;
186 };
187
188 /*!
189  * \brief Pass WebSocket data into pjsip transport manager.
190  */
191 static int transport_read(void *data)
192 {
193         struct transport_read_data *read_data = data;
194         struct ws_transport *newtransport = read_data->transport;
195         struct ast_websocket *session = newtransport->ws_session;
196
197         pjsip_rx_data *rdata = &newtransport->rdata;
198         int recvd;
199         pj_str_t buf;
200
201         pj_gettimeofday(&rdata->pkt_info.timestamp);
202
203         pj_memcpy(rdata->pkt_info.packet, read_data->payload,
204                 PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len);
205         rdata->pkt_info.len = read_data->payload_len;
206         rdata->pkt_info.zero = 0;
207
208         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
209         rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
210
211         rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
212
213         pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
214         rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
215
216         recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
217
218         pj_pool_reset(rdata->tp_info.pool);
219
220         return (read_data->payload_len == recvd) ? 0 : -1;
221 }
222
223 static int get_write_timeout(void)
224 {
225         int write_timeout = -1;
226         struct ao2_container *transports;
227
228         transports = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "transport", AST_RETRIEVE_FLAG_ALL, NULL);
229
230         if (transports) {
231                 struct ao2_iterator it_transports = ao2_iterator_init(transports, 0);
232                 struct ast_sip_transport *transport;
233
234                 for (; (transport = ao2_iterator_next(&it_transports)); ao2_cleanup(transport)) {
235                         if (transport->type != AST_TRANSPORT_WS && transport->type != AST_TRANSPORT_WSS) {
236                                 continue;
237                         }
238                         ast_debug(5, "Found %s transport with write timeout: %d\n",
239                                 transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
240                                 transport->write_timeout);
241                         write_timeout = MAX(write_timeout, transport->write_timeout);
242                 }
243                 ao2_cleanup(transports);
244         }
245
246         if (write_timeout < 0) {
247                 write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
248         }
249
250         ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
251         return write_timeout;
252 }
253
254 /*!
255  \brief WebSocket connection handler.
256  */
257 static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
258 {
259         struct ast_taskprocessor *serializer = NULL;
260         struct transport_create_data create_data;
261         struct ws_transport *transport = NULL;
262         struct transport_read_data read_data;
263
264         if (ast_websocket_set_nonblock(session)) {
265                 ast_websocket_unref(session);
266                 return;
267         }
268
269         if (ast_websocket_set_timeout(session, get_write_timeout())) {
270                 ast_websocket_unref(session);
271                 return;
272         }
273
274         if (!(serializer = ast_sip_create_serializer())) {
275                 ast_websocket_unref(session);
276                 return;
277         }
278
279         create_data.ws_session = session;
280
281         if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
282                 ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
283                 ast_websocket_unref(session);
284                 return;
285         }
286
287         transport = create_data.transport;
288         read_data.transport = transport;
289
290         while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
291                 enum ast_websocket_opcode opcode;
292                 int fragmented;
293
294                 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
295                         break;
296                 }
297
298                 if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
299                         ast_sip_push_task_synchronous(serializer, transport_read, &read_data);
300                 } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
301                         break;
302                 }
303         }
304
305         ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
306
307         ast_taskprocessor_unreference(serializer);
308         ast_websocket_unref(session);
309 }
310
311 /*!
312  * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
313  */
314 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
315 {
316         static const pj_str_t STR_WS = { "ws", 2 };
317         static const pj_str_t STR_WSS = { "wss", 3 };
318         pjsip_contact_hdr *contact;
319
320         long type = rdata->tp_info.transport->key.type;
321
322         if (type != (long)transport_type_ws && type != (long)transport_type_wss) {
323                 return PJ_FALSE;
324         }
325
326         if ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL)) && !contact->star &&
327                 (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
328                 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
329
330                 pj_cstr(&uri->host, rdata->pkt_info.src_name);
331                 uri->port = rdata->pkt_info.src_port;
332                 ast_debug(4, "Re-wrote Contact URI host/port to %.*s:%d\n",
333                         (int)pj_strlen(&uri->host), pj_strbuf(&uri->host), uri->port);
334                 pj_strdup(rdata->tp_info.pool, &uri->transport_param, (type == (long)transport_type_ws) ? &STR_WS : &STR_WSS);
335         }
336
337         rdata->msg_info.via->rport_param = 0;
338
339         return PJ_FALSE;
340 }
341
342 static pjsip_module websocket_module = {
343         .name = { "WebSocket Transport Module", 26 },
344         .id = -1,
345         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
346         .on_rx_request = websocket_on_rx_msg,
347         .on_rx_response = websocket_on_rx_msg,
348 };
349
350 /*! \brief Function called when an INVITE goes out */
351 static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
352 {
353         if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
354                 pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
355         }
356 }
357
358 /*! \brief Supplement for adding Websocket functionality to dialog */
359 static struct ast_sip_session_supplement websocket_supplement = {
360         .method = "INVITE",
361         .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
362         .outgoing_request = websocket_outgoing_invite_request,
363 };
364
365 static int load_module(void)
366 {
367         CHECK_PJSIP_MODULE_LOADED();
368
369         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WS", 5060, &transport_type_ws);
370         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WSS", 5060, &transport_type_wss);
371
372         if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
373                 return AST_MODULE_LOAD_DECLINE;
374         }
375
376         if (ast_sip_session_register_supplement(&websocket_supplement)) {
377                 ast_sip_unregister_service(&websocket_module);
378                 return AST_MODULE_LOAD_DECLINE;
379         }
380
381         if (ast_websocket_add_protocol("sip", websocket_cb)) {
382                 ast_sip_session_unregister_supplement(&websocket_supplement);
383                 ast_sip_unregister_service(&websocket_module);
384                 return AST_MODULE_LOAD_DECLINE;
385         }
386
387         return AST_MODULE_LOAD_SUCCESS;
388 }
389
390 static int unload_module(void)
391 {
392         ast_sip_unregister_service(&websocket_module);
393         ast_sip_session_unregister_supplement(&websocket_supplement);
394         ast_websocket_remove_protocol("sip", websocket_cb);
395
396         return 0;
397 }
398
399 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
400         .support_level = AST_MODULE_SUPPORT_CORE,
401         .load = load_module,
402         .unload = unload_module,
403         .load_pri = AST_MODPRI_APP_DEPEND,
404 );