res_pjsip WebRTC/websockets: Fix usage of WS vs WSS.
[asterisk/asterisk.git] / res / res_pjsip_transport_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Jason Parker <jparker@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \brief WebSocket transport module
21  */
22
23 /*** MODULEINFO
24         <depend>pjproject</depend>
25         <depend>res_pjsip</depend>
26         <depend>res_http_websocket</depend>
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include <pjsip.h>
33 #include <pjsip_ua.h>
34
35 #include "asterisk/module.h"
36 #include "asterisk/http_websocket.h"
37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
39 #include "asterisk/taskprocessor.h"
40
41 static int transport_type_wss;
42
43 /*!
44  * \brief Wrapper for pjsip_transport, for storing the WebSocket session
45  */
46 struct ws_transport {
47         pjsip_transport transport;
48         pjsip_rx_data rdata;
49         struct ast_websocket *ws_session;
50 };
51
52 /*!
53  * \brief Send a message over the WebSocket connection.
54  *
55  * Called by pjsip transport manager.
56  */
57 static pj_status_t ws_send_msg(pjsip_transport *transport,
58                             pjsip_tx_data *tdata,
59                             const pj_sockaddr_t *rem_addr,
60                             int addr_len,
61                             void *token,
62                             pjsip_transport_callback callback)
63 {
64         struct ws_transport *wstransport = (struct ws_transport *)transport;
65         uint64_t len = tdata->buf.cur - tdata->buf.start;
66
67         if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, len)) {
68                 return PJ_EUNKNOWN;
69         }
70
71         return PJ_SUCCESS;
72 }
73
74 /*!
75  * \brief Destroy the pjsip transport.
76  *
77  * Called by pjsip transport manager.
78  */
79 static pj_status_t ws_destroy(pjsip_transport *transport)
80 {
81         struct ws_transport *wstransport = (struct ws_transport *)transport;
82         int fd = ast_websocket_fd(wstransport->ws_session);
83
84         if (fd > 0) {
85                 ast_websocket_close(wstransport->ws_session, 1000);
86                 shutdown(fd, SHUT_RDWR);
87         }
88
89         ao2_ref(wstransport, -1);
90
91         return PJ_SUCCESS;
92 }
93
94 static void transport_dtor(void *arg)
95 {
96         struct ws_transport *wstransport = arg;
97
98         if (wstransport->ws_session) {
99                 ast_websocket_unref(wstransport->ws_session);
100         }
101
102         if (wstransport->transport.ref_cnt) {
103                 pj_atomic_destroy(wstransport->transport.ref_cnt);
104         }
105
106         if (wstransport->transport.lock) {
107                 pj_lock_destroy(wstransport->transport.lock);
108         }
109
110         if (wstransport->transport.endpt && wstransport->transport.pool) {
111                 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
112         }
113
114         if (wstransport->rdata.tp_info.pool) {
115                 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
116         }
117 }
118
119 static int transport_shutdown(void *data)
120 {
121         struct ws_transport *wstransport = data;
122
123         if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) {
124                 pjsip_transport_shutdown(&wstransport->transport);
125         }
126
127         /* Note that the destructor calls PJSIP functions,
128          * therefore it must be called in a PJSIP thread.
129          */
130         ao2_ref(wstransport, -1);
131
132         return 0;
133 }
134
135 struct transport_create_data {
136         struct ws_transport *transport;
137         struct ast_websocket *ws_session;
138 };
139
140 /*!
141  * \brief Create a pjsip transport.
142  */
143 static int transport_create(void *data)
144 {
145         struct transport_create_data *create_data = data;
146         struct ws_transport *newtransport = NULL;
147
148         pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
149         struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
150
151         char *ws_addr_str;
152         pj_pool_t *pool;
153         pj_str_t buf;
154         pj_status_t status;
155
156         newtransport = ao2_t_alloc_options(sizeof(*newtransport), transport_dtor,
157                         AO2_ALLOC_OPT_LOCK_NOLOCK, "pjsip websocket transport");
158         if (!newtransport) {
159                 ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
160                 goto on_error;
161         }
162
163         newtransport->transport.endpt = endpt;
164
165         if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
166                 ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
167                 goto on_error;
168         }
169
170         newtransport->transport.pool = pool;
171         newtransport->ws_session = create_data->ws_session;
172
173         /* Keep the session until transport dies */
174         ast_websocket_ref(newtransport->ws_session);
175
176         status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
177         if (status != PJ_SUCCESS) {
178                 goto on_error;
179         }
180
181         status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
182         if (status != PJ_SUCCESS) {
183                 goto on_error;
184         }
185
186         /*
187          * The type_name here is mostly used by log messages eihter in
188          * pjproject or Asterisk.  Other places are reconstituting subscriptions
189          * after a restart (which could never work for a websocket connection anyway),
190          * received MESSAGE requests to set PJSIP_TRANSPORT, and most importantly
191          * by pjproject when generating the Via header.
192          */
193         newtransport->transport.type_name = ast_websocket_is_secure(newtransport->ws_session)
194                 ? "WSS" : "WS";
195
196         ws_addr_str = ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session));
197         ast_debug(4, "Creating websocket transport for %s:%s\n",
198                 newtransport->transport.type_name, ws_addr_str);
199
200         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.key.rem_addr);
201         newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
202         newtransport->transport.key.type = transport_type_wss;
203
204         newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
205
206         pj_sockaddr_cp(&newtransport->transport.local_addr, &newtransport->transport.key.rem_addr);
207
208         newtransport->transport.local_name.host.ptr = (char *)pj_pool_alloc(pool, newtransport->transport.addr_len+4);
209         pj_sockaddr_print(&newtransport->transport.key.rem_addr, newtransport->transport.local_name.host.ptr, newtransport->transport.addr_len+4, 0);
210         newtransport->transport.local_name.host.slen = pj_ansi_strlen(newtransport->transport.local_name.host.ptr);
211         newtransport->transport.local_name.port = pj_sockaddr_get_port(&newtransport->transport.key.rem_addr);
212
213         newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
214         newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64);
215
216         newtransport->transport.tpmgr = tpmgr;
217         newtransport->transport.send_msg = &ws_send_msg;
218         newtransport->transport.destroy = &ws_destroy;
219
220         status = pjsip_transport_register(newtransport->transport.tpmgr,
221                         (pjsip_transport *)newtransport);
222         if (status != PJ_SUCCESS) {
223                 goto on_error;
224         }
225
226         /* Add a reference for pjsip transport manager */
227         ao2_ref(newtransport, +1);
228
229         newtransport->rdata.tp_info.transport = &newtransport->transport;
230         newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
231                 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
232         if (!newtransport->rdata.tp_info.pool) {
233                 ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
234                 pjsip_transport_destroy((pjsip_transport *)newtransport);
235                 goto on_error;
236         }
237
238         create_data->transport = newtransport;
239         return 0;
240
241 on_error:
242         ao2_cleanup(newtransport);
243         return -1;
244 }
245
246 struct transport_read_data {
247         struct ws_transport *transport;
248         char *payload;
249         uint64_t payload_len;
250 };
251
252 /*!
253  * \brief Pass WebSocket data into pjsip transport manager.
254  */
255 static int transport_read(void *data)
256 {
257         struct transport_read_data *read_data = data;
258         struct ws_transport *newtransport = read_data->transport;
259         struct ast_websocket *session = newtransport->ws_session;
260
261         pjsip_rx_data *rdata = &newtransport->rdata;
262         int recvd;
263         pj_str_t buf;
264         int pjsip_pkt_len;
265
266         pj_gettimeofday(&rdata->pkt_info.timestamp);
267
268         pjsip_pkt_len = PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len;
269         pj_memcpy(rdata->pkt_info.packet, read_data->payload, pjsip_pkt_len);
270         rdata->pkt_info.len = pjsip_pkt_len;
271         rdata->pkt_info.zero = 0;
272
273         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
274         rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
275
276         rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
277
278         pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
279         rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
280
281         recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
282
283         pj_pool_reset(rdata->tp_info.pool);
284
285         return (read_data->payload_len == recvd) ? 0 : -1;
286 }
287
288 static int get_write_timeout(void)
289 {
290         int write_timeout = -1;
291         struct ao2_container *transport_states;
292
293         transport_states = ast_sip_get_transport_states();
294
295         if (transport_states) {
296                 struct ao2_iterator it_transport_states = ao2_iterator_init(transport_states, 0);
297                 struct ast_sip_transport_state *transport_state;
298
299                 for (; (transport_state = ao2_iterator_next(&it_transport_states)); ao2_cleanup(transport_state)) {
300                         struct ast_sip_transport *transport;
301                         if (transport_state->type != AST_TRANSPORT_WS && transport_state->type != AST_TRANSPORT_WSS) {
302                                 continue;
303                         }
304                         transport = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "transport", transport_state->id);
305                         ast_debug(5, "Found %s transport with write timeout: %d\n",
306                                 transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
307                                 transport->write_timeout);
308                         write_timeout = MAX(write_timeout, transport->write_timeout);
309                 }
310                 ao2_iterator_destroy(&it_transport_states);
311                 ao2_cleanup(transport_states);
312         }
313
314         if (write_timeout < 0) {
315                 write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
316         }
317
318         ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
319         return write_timeout;
320 }
321
322 static struct ast_taskprocessor *create_websocket_serializer(void)
323 {
324         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
325
326         /* Create name with seq number appended. */
327         ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/websocket");
328
329         return ast_sip_create_serializer(tps_name);
330 }
331
332 /*! \brief WebSocket connection handler. */
333 static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
334 {
335         struct ast_taskprocessor *serializer;
336         struct transport_create_data create_data;
337         struct ws_transport *transport;
338         struct transport_read_data read_data;
339
340         if (ast_websocket_set_nonblock(session)) {
341                 ast_websocket_unref(session);
342                 return;
343         }
344
345         if (ast_websocket_set_timeout(session, get_write_timeout())) {
346                 ast_websocket_unref(session);
347                 return;
348         }
349
350         serializer = create_websocket_serializer();
351         if (!serializer) {
352                 ast_websocket_unref(session);
353                 return;
354         }
355
356         create_data.ws_session = session;
357
358         if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
359                 ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
360                 ast_websocket_unref(session);
361                 return;
362         }
363
364         transport = create_data.transport;
365         read_data.transport = transport;
366
367         while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
368                 enum ast_websocket_opcode opcode;
369                 int fragmented;
370
371                 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
372                         break;
373                 }
374
375                 if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
376                         ast_sip_push_task_synchronous(serializer, transport_read, &read_data);
377                 } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
378                         break;
379                 }
380         }
381
382         ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
383
384         ast_taskprocessor_unreference(serializer);
385         ast_websocket_unref(session);
386 }
387
388 /*!
389  * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
390  */
391 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
392 {
393         static const pj_str_t STR_WS = { "ws", 2 };
394         pjsip_contact_hdr *contact;
395
396         long type = rdata->tp_info.transport->key.type;
397
398         if (type != (long) transport_type_wss) {
399                 return PJ_FALSE;
400         }
401
402         contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
403         if (contact
404                 && !contact->star
405                 && (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
406                 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
407                 const pj_str_t *txp_str = &STR_WS;
408
409                 ast_debug(4, "%s re-writing Contact URI from %.*s:%d%s%.*s to %s:%d;transport=%s\n",
410                         pjsip_rx_data_get_info(rdata),
411                         (int)pj_strlen(&uri->host), pj_strbuf(&uri->host), uri->port,
412                         pj_strlen(&uri->transport_param) ? ";transport=" : "",
413                         (int)pj_strlen(&uri->transport_param), pj_strbuf(&uri->transport_param),
414                         rdata->pkt_info.src_name ?: "", rdata->pkt_info.src_port, pj_strbuf(txp_str));
415
416                 pj_cstr(&uri->host, rdata->pkt_info.src_name);
417                 uri->port = rdata->pkt_info.src_port;
418                 pj_strdup(rdata->tp_info.pool, &uri->transport_param, txp_str);
419         }
420
421         rdata->msg_info.via->rport_param = 0;
422
423         return PJ_FALSE;
424 }
425
426 static pjsip_module websocket_module = {
427         .name = { "WebSocket Transport Module", 26 },
428         .id = -1,
429         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
430         .on_rx_request = websocket_on_rx_msg,
431         .on_rx_response = websocket_on_rx_msg,
432 };
433
434 /*! \brief Function called when an INVITE goes out */
435 static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
436 {
437         if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
438                 pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
439         }
440 }
441
442 /*! \brief Supplement for adding Websocket functionality to dialog */
443 static struct ast_sip_session_supplement websocket_supplement = {
444         .method = "INVITE",
445         .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
446         .outgoing_request = websocket_outgoing_invite_request,
447 };
448
449 static int load_module(void)
450 {
451         CHECK_PJSIP_MODULE_LOADED();
452
453         /*
454          * We only need one transport type defined.  Firefox and Chrome
455          * do not support anything other than secure websockets anymore.
456          *
457          * Also we really cannot have two transports with the same name
458          * because it would be ambiguous.  Outgoing requests may try to
459          * find the transport by name and pjproject only finds the first
460          * one registered.
461          */
462         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE, "ws", 5060, &transport_type_wss);
463
464         if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
465                 return AST_MODULE_LOAD_DECLINE;
466         }
467
468         if (ast_sip_session_register_supplement(&websocket_supplement)) {
469                 ast_sip_unregister_service(&websocket_module);
470                 return AST_MODULE_LOAD_DECLINE;
471         }
472
473         if (ast_websocket_add_protocol("sip", websocket_cb)) {
474                 ast_sip_session_unregister_supplement(&websocket_supplement);
475                 ast_sip_unregister_service(&websocket_module);
476                 return AST_MODULE_LOAD_DECLINE;
477         }
478
479         return AST_MODULE_LOAD_SUCCESS;
480 }
481
482 static int unload_module(void)
483 {
484         ast_sip_unregister_service(&websocket_module);
485         ast_sip_session_unregister_supplement(&websocket_supplement);
486         ast_websocket_remove_protocol("sip", websocket_cb);
487
488         return 0;
489 }
490
491 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
492         .support_level = AST_MODULE_SUPPORT_CORE,
493         .load = load_module,
494         .unload = unload_module,
495         .load_pri = AST_MODPRI_APP_DEPEND,
496 );