res_pjsip_transport_websocket: Properly set src_name for IPv6
[asterisk/asterisk.git] / res / res_pjsip_transport_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Jason Parker <jparker@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \brief WebSocket transport module
21  */
22
23 /*** MODULEINFO
24         <depend>pjproject</depend>
25         <depend>res_pjsip</depend>
26         <depend>res_http_websocket</depend>
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include <pjsip.h>
33 #include <pjsip_ua.h>
34
35 #include "asterisk/module.h"
36 #include "asterisk/http_websocket.h"
37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
39 #include "asterisk/taskprocessor.h"
40
41 static int transport_type_wss;
42 static int transport_type_wss_ipv6;
43
44 /*!
45  * \brief Wrapper for pjsip_transport, for storing the WebSocket session
46  */
47 struct ws_transport {
48         pjsip_transport transport;
49         pjsip_rx_data rdata;
50         struct ast_websocket *ws_session;
51 };
52
53 /*!
54  * \brief Send a message over the WebSocket connection.
55  *
56  * Called by pjsip transport manager.
57  */
58 static pj_status_t ws_send_msg(pjsip_transport *transport,
59                             pjsip_tx_data *tdata,
60                             const pj_sockaddr_t *rem_addr,
61                             int addr_len,
62                             void *token,
63                             pjsip_transport_callback callback)
64 {
65         struct ws_transport *wstransport = (struct ws_transport *)transport;
66         uint64_t len = tdata->buf.cur - tdata->buf.start;
67
68         if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, len)) {
69                 return PJ_EUNKNOWN;
70         }
71
72         return PJ_SUCCESS;
73 }
74
75 /*!
76  * \brief Destroy the pjsip transport.
77  *
78  * Called by pjsip transport manager.
79  */
80 static pj_status_t ws_destroy(pjsip_transport *transport)
81 {
82         struct ws_transport *wstransport = (struct ws_transport *)transport;
83         int fd = ast_websocket_fd(wstransport->ws_session);
84
85         if (fd > 0) {
86                 ast_websocket_close(wstransport->ws_session, 1000);
87                 shutdown(fd, SHUT_RDWR);
88         }
89
90         ao2_ref(wstransport, -1);
91
92         return PJ_SUCCESS;
93 }
94
95 static void transport_dtor(void *arg)
96 {
97         struct ws_transport *wstransport = arg;
98
99         if (wstransport->ws_session) {
100                 ast_websocket_unref(wstransport->ws_session);
101         }
102
103         if (wstransport->transport.ref_cnt) {
104                 pj_atomic_destroy(wstransport->transport.ref_cnt);
105         }
106
107         if (wstransport->transport.lock) {
108                 pj_lock_destroy(wstransport->transport.lock);
109         }
110
111         if (wstransport->transport.endpt && wstransport->transport.pool) {
112                 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
113         }
114
115         if (wstransport->rdata.tp_info.pool) {
116                 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
117         }
118 }
119
120 static int transport_shutdown(void *data)
121 {
122         struct ws_transport *wstransport = data;
123
124         if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) {
125                 pjsip_transport_shutdown(&wstransport->transport);
126         }
127
128         /* Note that the destructor calls PJSIP functions,
129          * therefore it must be called in a PJSIP thread.
130          */
131         ao2_ref(wstransport, -1);
132
133         return 0;
134 }
135
136 struct transport_create_data {
137         struct ws_transport *transport;
138         struct ast_websocket *ws_session;
139 };
140
141 /*!
142  * \brief Create a pjsip transport.
143  */
144 static int transport_create(void *data)
145 {
146         struct transport_create_data *create_data = data;
147         struct ws_transport *newtransport = NULL;
148         pjsip_tp_state_callback state_cb;
149
150         pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
151         struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
152
153         char *ws_addr_str;
154         pj_pool_t *pool;
155         pj_str_t buf;
156         pj_status_t status;
157
158         newtransport = ao2_t_alloc_options(sizeof(*newtransport), transport_dtor,
159                         AO2_ALLOC_OPT_LOCK_NOLOCK, "pjsip websocket transport");
160         if (!newtransport) {
161                 ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
162                 goto on_error;
163         }
164
165         /* Give websocket transport a unique name for its lifetime */
166         snprintf(newtransport->transport.obj_name, PJ_MAX_OBJ_NAME, "ws%p",
167                 &newtransport->transport);
168
169         newtransport->transport.endpt = endpt;
170
171         if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
172                 ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
173                 goto on_error;
174         }
175
176         newtransport->transport.pool = pool;
177         newtransport->ws_session = create_data->ws_session;
178
179         /* Keep the session until transport dies */
180         ast_websocket_ref(newtransport->ws_session);
181
182         status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
183         if (status != PJ_SUCCESS) {
184                 goto on_error;
185         }
186
187         status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
188         if (status != PJ_SUCCESS) {
189                 goto on_error;
190         }
191
192         /*
193          * The type_name here is mostly used by log messages eihter in
194          * pjproject or Asterisk.  Other places are reconstituting subscriptions
195          * after a restart (which could never work for a websocket connection anyway),
196          * received MESSAGE requests to set PJSIP_TRANSPORT, and most importantly
197          * by pjproject when generating the Via header.
198          */
199         newtransport->transport.type_name = ast_websocket_is_secure(newtransport->ws_session)
200                 ? "WSS" : "WS";
201
202         ws_addr_str = ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session));
203         ast_debug(4, "Creating websocket transport for %s:%s\n",
204                 newtransport->transport.type_name, ws_addr_str);
205
206         newtransport->transport.info = (char *) pj_pool_alloc(newtransport->transport.pool,
207                 strlen(newtransport->transport.type_name) + strlen(ws_addr_str) + sizeof(" to "));
208         sprintf(newtransport->transport.info, "%s to %s", newtransport->transport.type_name, ws_addr_str);
209
210         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.key.rem_addr);
211         if (newtransport->transport.key.rem_addr.addr.sa_family == pj_AF_INET6()) {
212                 newtransport->transport.key.type = transport_type_wss_ipv6;
213         } else {
214                 newtransport->transport.key.type = transport_type_wss;
215         }
216
217         newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
218
219         ws_addr_str = ast_sockaddr_stringify(ast_websocket_local_address(newtransport->ws_session));
220         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ws_addr_str), &newtransport->transport.local_addr);
221         pj_strdup2(pool, &newtransport->transport.local_name.host, ast_sockaddr_stringify_host(ast_websocket_local_address(newtransport->ws_session)));
222         newtransport->transport.local_name.port = ast_sockaddr_port(ast_websocket_local_address(newtransport->ws_session));
223
224         newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
225         newtransport->transport.dir = PJSIP_TP_DIR_INCOMING;
226         newtransport->transport.tpmgr = tpmgr;
227         newtransport->transport.send_msg = &ws_send_msg;
228         newtransport->transport.destroy = &ws_destroy;
229
230         status = pjsip_transport_register(newtransport->transport.tpmgr,
231                         (pjsip_transport *)newtransport);
232         if (status != PJ_SUCCESS) {
233                 goto on_error;
234         }
235
236         /* Add a reference for pjsip transport manager */
237         ao2_ref(newtransport, +1);
238
239         newtransport->rdata.tp_info.transport = &newtransport->transport;
240         newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
241                 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
242         if (!newtransport->rdata.tp_info.pool) {
243                 ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
244                 pjsip_transport_destroy((pjsip_transport *)newtransport);
245                 goto on_error;
246         }
247
248         create_data->transport = newtransport;
249
250         /* Notify application of transport state */
251         state_cb = pjsip_tpmgr_get_state_cb(newtransport->transport.tpmgr);
252         if (state_cb) {
253                 pjsip_transport_state_info state_info;
254
255                 memset(&state_info, 0, sizeof(state_info));
256                 state_cb(&newtransport->transport, PJSIP_TP_STATE_CONNECTED, &state_info);
257         }
258
259         return 0;
260
261 on_error:
262         ao2_cleanup(newtransport);
263         return -1;
264 }
265
266 struct transport_read_data {
267         struct ws_transport *transport;
268         char *payload;
269         uint64_t payload_len;
270 };
271
272 /*!
273  * \brief Pass WebSocket data into pjsip transport manager.
274  */
275 static int transport_read(void *data)
276 {
277         struct transport_read_data *read_data = data;
278         struct ws_transport *newtransport = read_data->transport;
279         struct ast_websocket *session = newtransport->ws_session;
280
281         pjsip_rx_data *rdata = &newtransport->rdata;
282         int recvd;
283         pj_str_t buf;
284         int pjsip_pkt_len;
285
286         pj_gettimeofday(&rdata->pkt_info.timestamp);
287
288         pjsip_pkt_len = PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len;
289         pj_memcpy(rdata->pkt_info.packet, read_data->payload, pjsip_pkt_len);
290         rdata->pkt_info.len = pjsip_pkt_len;
291         rdata->pkt_info.zero = 0;
292
293         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
294         rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
295
296         pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_addr(ast_websocket_remote_address(session)));
297         rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
298
299         recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
300
301         pj_pool_reset(rdata->tp_info.pool);
302
303         return (read_data->payload_len == recvd) ? 0 : -1;
304 }
305
306 static int get_write_timeout(void)
307 {
308         int write_timeout = -1;
309         struct ao2_container *transport_states;
310
311         transport_states = ast_sip_get_transport_states();
312
313         if (transport_states) {
314                 struct ao2_iterator it_transport_states = ao2_iterator_init(transport_states, 0);
315                 struct ast_sip_transport_state *transport_state;
316
317                 for (; (transport_state = ao2_iterator_next(&it_transport_states)); ao2_cleanup(transport_state)) {
318                         struct ast_sip_transport *transport;
319
320                         if (transport_state->type != AST_TRANSPORT_WS && transport_state->type != AST_TRANSPORT_WSS) {
321                                 continue;
322                         }
323                         transport = ast_sorcery_retrieve_by_id(ast_sip_get_sorcery(), "transport", transport_state->id);
324                         if (!transport) {
325                                 continue;
326                         }
327                         ast_debug(5, "Found %s transport with write timeout: %d\n",
328                                 transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
329                                 transport->write_timeout);
330                         write_timeout = MAX(write_timeout, transport->write_timeout);
331                 }
332                 ao2_iterator_destroy(&it_transport_states);
333                 ao2_cleanup(transport_states);
334         }
335
336         if (write_timeout < 0) {
337                 write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
338         }
339
340         ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
341         return write_timeout;
342 }
343
344 static struct ast_taskprocessor *create_websocket_serializer(void)
345 {
346         char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
347
348         /* Create name with seq number appended. */
349         ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "pjsip/websocket");
350
351         return ast_sip_create_serializer(tps_name);
352 }
353
354 /*! \brief WebSocket connection handler. */
355 static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
356 {
357         struct ast_taskprocessor *serializer;
358         struct transport_create_data create_data;
359         struct ws_transport *transport;
360         struct transport_read_data read_data;
361
362         if (ast_websocket_set_nonblock(session)) {
363                 ast_websocket_unref(session);
364                 return;
365         }
366
367         if (ast_websocket_set_timeout(session, get_write_timeout())) {
368                 ast_websocket_unref(session);
369                 return;
370         }
371
372         serializer = create_websocket_serializer();
373         if (!serializer) {
374                 ast_websocket_unref(session);
375                 return;
376         }
377
378         create_data.ws_session = session;
379
380         if (ast_sip_push_task_wait_serializer(serializer, transport_create, &create_data)) {
381                 ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
382                 ast_taskprocessor_unreference(serializer);
383                 ast_websocket_unref(session);
384                 return;
385         }
386
387         transport = create_data.transport;
388         read_data.transport = transport;
389
390         while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
391                 enum ast_websocket_opcode opcode;
392                 int fragmented;
393
394                 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
395                         break;
396                 }
397
398                 if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
399                         ast_sip_push_task_wait_serializer(serializer, transport_read, &read_data);
400                 } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
401                         break;
402                 }
403         }
404
405         ast_sip_push_task_wait_serializer(serializer, transport_shutdown, transport);
406
407         ast_taskprocessor_unreference(serializer);
408         ast_websocket_unref(session);
409 }
410
411 /*!
412  * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
413  */
414 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
415 {
416         static const pj_str_t STR_WS = { "ws", 2 };
417         pjsip_contact_hdr *contact;
418
419         long type = rdata->tp_info.transport->key.type;
420
421         if (type != (long) transport_type_wss && type != (long) transport_type_wss_ipv6) {
422                 return PJ_FALSE;
423         }
424
425         contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL);
426         if (contact
427                 && !contact->star
428                 && (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
429                 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
430                 const pj_str_t *txp_str = &STR_WS;
431
432                 ast_debug(4, "%s re-writing Contact URI from %.*s:%d%s%.*s to %s:%d;transport=%s\n",
433                         pjsip_rx_data_get_info(rdata),
434                         (int)pj_strlen(&uri->host), pj_strbuf(&uri->host), uri->port,
435                         pj_strlen(&uri->transport_param) ? ";transport=" : "",
436                         (int)pj_strlen(&uri->transport_param), pj_strbuf(&uri->transport_param),
437                         rdata->pkt_info.src_name ?: "", rdata->pkt_info.src_port, pj_strbuf(txp_str));
438
439                 pj_cstr(&uri->host, rdata->pkt_info.src_name);
440                 uri->port = rdata->pkt_info.src_port;
441                 pj_strdup(rdata->tp_info.pool, &uri->transport_param, txp_str);
442         }
443
444         rdata->msg_info.via->rport_param = 0;
445
446         return PJ_FALSE;
447 }
448
449 static pjsip_module websocket_module = {
450         .name = { "WebSocket Transport Module", 26 },
451         .id = -1,
452         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
453         .on_rx_request = websocket_on_rx_msg,
454         .on_rx_response = websocket_on_rx_msg,
455 };
456
457 /*! \brief Function called when an INVITE goes out */
458 static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
459 {
460         if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
461                 pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
462         }
463 }
464
465 /*! \brief Supplement for adding Websocket functionality to dialog */
466 static struct ast_sip_session_supplement websocket_supplement = {
467         .method = "INVITE",
468         .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
469         .outgoing_request = websocket_outgoing_invite_request,
470 };
471
472 static int load_module(void)
473 {
474         /*
475          * We only need one transport type name (ws) defined.  Firefox
476          * and Chrome do not support anything other than secure websockets
477          * anymore.
478          *
479          * Also we really cannot have two transports with the same name
480          * and address family because it would be ambiguous.  Outgoing
481          * requests may try to find the transport by name and pjproject
482          * only finds the first one registered.
483          */
484         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE, "ws", 5060, &transport_type_wss);
485         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE | PJSIP_TRANSPORT_SECURE | PJSIP_TRANSPORT_IPV6, "ws", 5060, &transport_type_wss_ipv6);
486
487         if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
488                 return AST_MODULE_LOAD_DECLINE;
489         }
490
491         ast_sip_session_register_supplement(&websocket_supplement);
492
493         if (ast_websocket_add_protocol("sip", websocket_cb)) {
494                 ast_sip_session_unregister_supplement(&websocket_supplement);
495                 ast_sip_unregister_service(&websocket_module);
496                 return AST_MODULE_LOAD_DECLINE;
497         }
498
499         return AST_MODULE_LOAD_SUCCESS;
500 }
501
502 static int unload_module(void)
503 {
504         ast_sip_unregister_service(&websocket_module);
505         ast_sip_session_unregister_supplement(&websocket_supplement);
506         ast_websocket_remove_protocol("sip", websocket_cb);
507
508         return 0;
509 }
510
511 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
512         .support_level = AST_MODULE_SUPPORT_CORE,
513         .load = load_module,
514         .unload = unload_module,
515         .load_pri = AST_MODPRI_APP_DEPEND,
516         .requires = "res_pjsip,res_http_websocket",
517 );