2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * Jason Parker <jparker@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
20 * \brief WebSocket transport module
24 <depend>pjproject</depend>
25 <depend>res_pjsip</depend>
26 <depend>res_http_websocket</depend>
27 <support_level>core</support_level>
35 #include "asterisk/module.h"
36 #include "asterisk/http_websocket.h"
37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
39 #include "asterisk/taskprocessor.h"
41 static int transport_type_ws;
42 static int transport_type_wss;
45 * \brief Wrapper for pjsip_transport, for storing the WebSocket session
48 pjsip_transport transport;
50 struct ast_websocket *ws_session;
54 * \brief Send a message over the WebSocket connection.
56 * Called by pjsip transport manager.
58 static pj_status_t ws_send_msg(pjsip_transport *transport,
60 const pj_sockaddr_t *rem_addr,
63 pjsip_transport_callback callback)
65 struct ws_transport *wstransport = (struct ws_transport *)transport;
66 uint64_t len = tdata->buf.cur - tdata->buf.start;
68 if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, len)) {
76 * \brief Destroy the pjsip transport.
78 * Called by pjsip transport manager.
80 static pj_status_t ws_destroy(pjsip_transport *transport)
82 struct ws_transport *wstransport = (struct ws_transport *)transport;
83 int fd = ast_websocket_fd(wstransport->ws_session);
86 ast_websocket_close(wstransport->ws_session, 1000);
87 shutdown(fd, SHUT_RDWR);
90 ao2_ref(wstransport, -1);
95 static void transport_dtor(void *arg)
97 struct ws_transport *wstransport = arg;
99 if (wstransport->ws_session) {
100 ast_websocket_unref(wstransport->ws_session);
103 if (wstransport->transport.ref_cnt) {
104 pj_atomic_destroy(wstransport->transport.ref_cnt);
107 if (wstransport->transport.lock) {
108 pj_lock_destroy(wstransport->transport.lock);
111 if (wstransport->transport.endpt && wstransport->transport.pool) {
112 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
115 if (wstransport->rdata.tp_info.pool) {
116 pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->rdata.tp_info.pool);
120 static int transport_shutdown(void *data)
122 struct ws_transport *wstransport = data;
124 if (!wstransport->transport.is_shutdown && !wstransport->transport.is_destroying) {
125 pjsip_transport_shutdown(&wstransport->transport);
128 /* Note that the destructor calls PJSIP functions,
129 * therefore it must be called in a PJSIP thread.
131 ao2_ref(wstransport, -1);
136 struct transport_create_data {
137 struct ws_transport *transport;
138 struct ast_websocket *ws_session;
142 * \brief Create a pjsip transport.
144 static int transport_create(void *data)
146 struct transport_create_data *create_data = data;
147 struct ws_transport *newtransport = NULL;
149 pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
150 struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
156 newtransport = ao2_t_alloc_options(sizeof(*newtransport), transport_dtor,
157 AO2_ALLOC_OPT_LOCK_NOLOCK, "pjsip websocket transport");
159 ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
163 newtransport->transport.endpt = endpt;
165 if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
166 ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
170 newtransport->transport.pool = pool;
171 newtransport->ws_session = create_data->ws_session;
173 /* Keep the session until transport dies */
174 ast_websocket_ref(newtransport->ws_session);
176 status = pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
177 if (status != PJ_SUCCESS) {
181 status = pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
182 if (status != PJ_SUCCESS) {
186 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr);
187 newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
188 newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws;
190 newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
192 pj_sockaddr_cp(&newtransport->transport.local_addr, &newtransport->transport.key.rem_addr);
194 newtransport->transport.local_name.host.ptr = (char *)pj_pool_alloc(pool, newtransport->transport.addr_len+4);
195 pj_sockaddr_print(&newtransport->transport.key.rem_addr, newtransport->transport.local_name.host.ptr, newtransport->transport.addr_len+4, 0);
196 newtransport->transport.local_name.host.slen = pj_ansi_strlen(newtransport->transport.local_name.host.ptr);
197 newtransport->transport.local_name.port = pj_sockaddr_get_port(&newtransport->transport.key.rem_addr);
199 newtransport->transport.type_name = (char *)pjsip_transport_get_type_name(newtransport->transport.key.type);
200 newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
201 newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64);
203 newtransport->transport.tpmgr = tpmgr;
204 newtransport->transport.send_msg = &ws_send_msg;
205 newtransport->transport.destroy = &ws_destroy;
207 status = pjsip_transport_register(newtransport->transport.tpmgr,
208 (pjsip_transport *)newtransport);
209 if (status != PJ_SUCCESS) {
213 /* Add a reference for pjsip transport manager */
214 ao2_ref(newtransport, +1);
216 newtransport->rdata.tp_info.transport = &newtransport->transport;
217 newtransport->rdata.tp_info.pool = pjsip_endpt_create_pool(endpt, "rtd%p",
218 PJSIP_POOL_RDATA_LEN, PJSIP_POOL_RDATA_INC);
219 if (!newtransport->rdata.tp_info.pool) {
220 ast_log(LOG_ERROR, "Failed to allocate WebSocket rdata.\n");
221 pjsip_transport_destroy((pjsip_transport *)newtransport);
225 create_data->transport = newtransport;
229 ao2_cleanup(newtransport);
233 struct transport_read_data {
234 struct ws_transport *transport;
236 uint64_t payload_len;
240 * \brief Pass WebSocket data into pjsip transport manager.
242 static int transport_read(void *data)
244 struct transport_read_data *read_data = data;
245 struct ws_transport *newtransport = read_data->transport;
246 struct ast_websocket *session = newtransport->ws_session;
248 pjsip_rx_data *rdata = &newtransport->rdata;
253 pj_gettimeofday(&rdata->pkt_info.timestamp);
255 pjsip_pkt_len = PJSIP_MAX_PKT_LEN < read_data->payload_len ? PJSIP_MAX_PKT_LEN : read_data->payload_len;
256 pj_memcpy(rdata->pkt_info.packet, read_data->payload, pjsip_pkt_len);
257 rdata->pkt_info.len = pjsip_pkt_len;
258 rdata->pkt_info.zero = 0;
260 pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
261 rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
263 rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
265 pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
266 rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
268 recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
270 pj_pool_reset(rdata->tp_info.pool);
272 return (read_data->payload_len == recvd) ? 0 : -1;
275 static int get_write_timeout(void)
277 int write_timeout = -1;
278 struct ao2_container *transports;
280 transports = ast_sorcery_retrieve_by_fields(ast_sip_get_sorcery(), "transport", AST_RETRIEVE_FLAG_ALL, NULL);
283 struct ao2_iterator it_transports = ao2_iterator_init(transports, 0);
284 struct ast_sip_transport *transport;
286 for (; (transport = ao2_iterator_next(&it_transports)); ao2_cleanup(transport)) {
287 if (transport->type != AST_TRANSPORT_WS && transport->type != AST_TRANSPORT_WSS) {
290 ast_debug(5, "Found %s transport with write timeout: %d\n",
291 transport->type == AST_TRANSPORT_WS ? "WS" : "WSS",
292 transport->write_timeout);
293 write_timeout = MAX(write_timeout, transport->write_timeout);
295 ao2_cleanup(transports);
298 if (write_timeout < 0) {
299 write_timeout = AST_DEFAULT_WEBSOCKET_WRITE_TIMEOUT;
302 ast_debug(1, "Write timeout for WS/WSS transports: %d\n", write_timeout);
303 return write_timeout;
307 \brief WebSocket connection handler.
309 static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
311 struct ast_taskprocessor *serializer = NULL;
312 struct transport_create_data create_data;
313 struct ws_transport *transport = NULL;
314 struct transport_read_data read_data;
316 if (ast_websocket_set_nonblock(session)) {
317 ast_websocket_unref(session);
321 if (ast_websocket_set_timeout(session, get_write_timeout())) {
322 ast_websocket_unref(session);
326 if (!(serializer = ast_sip_create_serializer())) {
327 ast_websocket_unref(session);
331 create_data.ws_session = session;
333 if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
334 ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
335 ast_websocket_unref(session);
339 transport = create_data.transport;
340 read_data.transport = transport;
342 while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
343 enum ast_websocket_opcode opcode;
346 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
350 if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
351 ast_sip_push_task_synchronous(serializer, transport_read, &read_data);
352 } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
357 ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
359 ast_taskprocessor_unreference(serializer);
360 ast_websocket_unref(session);
364 * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
366 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
368 static const pj_str_t STR_WS = { "ws", 2 };
369 static const pj_str_t STR_WSS = { "wss", 3 };
370 pjsip_contact_hdr *contact;
372 long type = rdata->tp_info.transport->key.type;
374 if (type != (long)transport_type_ws && type != (long)transport_type_wss) {
378 if ((contact = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL)) && !contact->star &&
379 (PJSIP_URI_SCHEME_IS_SIP(contact->uri) || PJSIP_URI_SCHEME_IS_SIPS(contact->uri))) {
380 pjsip_sip_uri *uri = pjsip_uri_get_uri(contact->uri);
382 pj_cstr(&uri->host, rdata->pkt_info.src_name);
383 uri->port = rdata->pkt_info.src_port;
384 ast_debug(4, "Re-wrote Contact URI host/port to %.*s:%d\n",
385 (int)pj_strlen(&uri->host), pj_strbuf(&uri->host), uri->port);
386 pj_strdup(rdata->tp_info.pool, &uri->transport_param, (type == (long)transport_type_ws) ? &STR_WS : &STR_WSS);
389 rdata->msg_info.via->rport_param = 0;
394 static pjsip_module websocket_module = {
395 .name = { "WebSocket Transport Module", 26 },
397 .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
398 .on_rx_request = websocket_on_rx_msg,
399 .on_rx_response = websocket_on_rx_msg,
402 /*! \brief Function called when an INVITE goes out */
403 static void websocket_outgoing_invite_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
405 if (session->inv_session->state == PJSIP_INV_STATE_NULL) {
406 pjsip_dlg_add_usage(session->inv_session->dlg, &websocket_module, NULL);
410 /*! \brief Supplement for adding Websocket functionality to dialog */
411 static struct ast_sip_session_supplement websocket_supplement = {
413 .priority = AST_SIP_SUPPLEMENT_PRIORITY_FIRST + 1,
414 .outgoing_request = websocket_outgoing_invite_request,
417 static int load_module(void)
419 CHECK_PJSIP_MODULE_LOADED();
421 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WS", 5060, &transport_type_ws);
422 pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WSS", 5060, &transport_type_wss);
424 if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
425 return AST_MODULE_LOAD_DECLINE;
428 if (ast_sip_session_register_supplement(&websocket_supplement)) {
429 ast_sip_unregister_service(&websocket_module);
430 return AST_MODULE_LOAD_DECLINE;
433 if (ast_websocket_add_protocol("sip", websocket_cb)) {
434 ast_sip_session_unregister_supplement(&websocket_supplement);
435 ast_sip_unregister_service(&websocket_module);
436 return AST_MODULE_LOAD_DECLINE;
439 return AST_MODULE_LOAD_SUCCESS;
442 static int unload_module(void)
444 ast_sip_unregister_service(&websocket_module);
445 ast_sip_session_unregister_supplement(&websocket_supplement);
446 ast_websocket_remove_protocol("sip", websocket_cb);
451 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
452 .support_level = AST_MODULE_SUPPORT_CORE,
454 .unload = unload_module,
455 .load_pri = AST_MODPRI_APP_DEPEND,