The large GULP->PJSIP renaming effort.
[asterisk/asterisk.git] / res / res_pjsip_transport_websocket.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Jason Parker <jparker@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \brief WebSocket transport module
21  */
22
23 /*** MODULEINFO
24         <depend>pjproject</depend>
25         <depend>res_pjsip</depend>
26         <depend>res_http_websocket</depend>
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include <pjsip.h>
33 #include <pjsip_ua.h>
34
35 #include "asterisk/module.h"
36 #include "asterisk/http_websocket.h"
37 #include "asterisk/res_pjsip.h"
38 #include "asterisk/res_pjsip_session.h"
39 #include "asterisk/taskprocessor.h"
40
41 static int transport_type_ws;
42 static int transport_type_wss;
43
44 /*!
45  * \brief Wrapper for pjsip_transport, for storing the WebSocket session
46  */
47 struct ws_transport {
48         pjsip_transport transport;
49         pjsip_rx_data rdata;
50         struct ast_websocket *ws_session;
51 };
52
53 /*!
54  * \brief Send a message over the WebSocket connection.
55  *
56  * Called by pjsip transport manager.
57  */
58 static pj_status_t ws_send_msg(pjsip_transport *transport,
59                             pjsip_tx_data *tdata,
60                             const pj_sockaddr_t *rem_addr,
61                             int addr_len,
62                             void *token,
63                             pjsip_transport_callback callback)
64 {
65         struct ws_transport *wstransport = (struct ws_transport *)transport;
66
67         if (ast_websocket_write(wstransport->ws_session, AST_WEBSOCKET_OPCODE_TEXT, tdata->buf.start, (int)(tdata->buf.cur - tdata->buf.start))) {
68                 return PJ_EUNKNOWN;
69         }
70
71         return PJ_SUCCESS;
72 }
73
74 /*!
75  * \brief Destroy the pjsip transport.
76  *
77  * Called by pjsip transport manager.
78  */
79 static pj_status_t ws_destroy(pjsip_transport *transport)
80 {
81         struct ws_transport *wstransport = (struct ws_transport *)transport;
82
83         if (wstransport->transport.ref_cnt) {
84                 pj_atomic_destroy(wstransport->transport.ref_cnt);
85         }
86
87         if (wstransport->transport.lock) {
88                 pj_lock_destroy(wstransport->transport.lock);
89         }
90
91         pjsip_endpt_release_pool(wstransport->transport.endpt, wstransport->transport.pool);
92
93         return PJ_SUCCESS;
94 }
95
96 static int transport_shutdown(void *data)
97 {
98         RAII_VAR(struct ast_sip_contact_transport *, ct, NULL, ao2_cleanup);
99         pjsip_transport *transport = data;
100
101         if ((ct = ast_sip_location_retrieve_contact_transport_by_transport(transport))) {
102                 ast_sip_location_delete_contact_transport(ct);
103         }
104
105         pjsip_transport_shutdown(transport);
106         return 0;
107 }
108
109 struct transport_create_data {
110         struct ws_transport *transport;
111         struct ast_websocket *ws_session;
112 };
113
114 /*!
115  * \brief Create a pjsip transport.
116  */
117 static int transport_create(void *data)
118 {
119         struct transport_create_data *create_data = data;
120         struct ws_transport *newtransport;
121
122         pjsip_endpoint *endpt = ast_sip_get_pjsip_endpoint();
123         struct pjsip_tpmgr *tpmgr = pjsip_endpt_get_tpmgr(endpt);
124
125         pj_pool_t *pool;
126
127         pj_str_t buf;
128
129         if (!(pool = pjsip_endpt_create_pool(endpt, "ws", 512, 512))) {
130                 ast_log(LOG_ERROR, "Failed to allocate WebSocket endpoint pool.\n");
131                 return -1;
132         }
133
134         if (!(newtransport = PJ_POOL_ZALLOC_T(pool, struct ws_transport))) {
135                 ast_log(LOG_ERROR, "Failed to allocate WebSocket transport.\n");
136                 pjsip_endpt_release_pool(endpt, pool);
137                 return -1;
138         }
139
140         newtransport->ws_session = create_data->ws_session;
141
142         pj_atomic_create(pool, 0, &newtransport->transport.ref_cnt);
143         pj_lock_create_recursive_mutex(pool, pool->obj_name, &newtransport->transport.lock);
144
145         newtransport->transport.pool = pool;
146         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(newtransport->ws_session))), &newtransport->transport.key.rem_addr);
147         newtransport->transport.key.rem_addr.addr.sa_family = pj_AF_INET();
148         newtransport->transport.key.type = ast_websocket_is_secure(newtransport->ws_session) ? transport_type_wss : transport_type_ws;
149
150         newtransport->transport.addr_len = pj_sockaddr_get_len(&newtransport->transport.key.rem_addr);
151
152         pj_sockaddr_cp(&newtransport->transport.local_addr, &newtransport->transport.key.rem_addr);
153
154         newtransport->transport.local_name.host.ptr = (char *)pj_pool_alloc(pool, newtransport->transport.addr_len+4);
155         pj_sockaddr_print(&newtransport->transport.key.rem_addr, newtransport->transport.local_name.host.ptr, newtransport->transport.addr_len+4, 0);
156         newtransport->transport.local_name.host.slen = pj_ansi_strlen(newtransport->transport.local_name.host.ptr);
157         newtransport->transport.local_name.port = pj_sockaddr_get_port(&newtransport->transport.key.rem_addr);
158
159         newtransport->transport.type_name = (char *)pjsip_transport_get_type_name(newtransport->transport.key.type);
160         newtransport->transport.flag = pjsip_transport_get_flag_from_type((pjsip_transport_type_e)newtransport->transport.key.type);
161         newtransport->transport.info = (char *)pj_pool_alloc(newtransport->transport.pool, 64);
162
163         newtransport->transport.endpt = endpt;
164         newtransport->transport.tpmgr = tpmgr;
165         newtransport->transport.send_msg = &ws_send_msg;
166         newtransport->transport.destroy = &ws_destroy;
167
168         pjsip_transport_register(newtransport->transport.tpmgr, (pjsip_transport *)newtransport);
169
170         create_data->transport = newtransport;
171         return 0;
172 }
173
174 struct transport_read_data {
175         struct ws_transport *transport;
176         char *payload;
177         uint64_t payload_len;
178 };
179
180 /*!
181  * \brief Pass WebSocket data into pjsip transport manager.
182  */
183 static int transport_read(void *data)
184 {
185         struct transport_read_data *read_data = data;
186         struct ws_transport *newtransport = read_data->transport;
187         struct ast_websocket *session = newtransport->ws_session;
188
189         pjsip_rx_data *rdata = &newtransport->rdata;
190         int recvd;
191         pj_str_t buf;
192
193         rdata->tp_info.pool = newtransport->transport.pool;
194         rdata->tp_info.transport = &newtransport->transport;
195
196         pj_gettimeofday(&rdata->pkt_info.timestamp);
197
198         pj_memcpy(rdata->pkt_info.packet, read_data->payload, sizeof(rdata->pkt_info.packet));
199         rdata->pkt_info.len = read_data->payload_len;
200         rdata->pkt_info.zero = 0;
201
202         pj_sockaddr_parse(pj_AF_UNSPEC(), 0, pj_cstr(&buf, ast_sockaddr_stringify(ast_websocket_remote_address(session))), &rdata->pkt_info.src_addr);
203         rdata->pkt_info.src_addr.addr.sa_family = pj_AF_INET();
204
205         rdata->pkt_info.src_addr_len = sizeof(rdata->pkt_info.src_addr);
206
207         pj_ansi_strcpy(rdata->pkt_info.src_name, ast_sockaddr_stringify_host(ast_websocket_remote_address(session)));
208         rdata->pkt_info.src_port = ast_sockaddr_port(ast_websocket_remote_address(session));
209
210         recvd = pjsip_tpmgr_receive_packet(rdata->tp_info.transport->tpmgr, rdata);
211
212         return (read_data->payload_len == recvd) ? 0 : -1;
213 }
214
215 /*!
216  \brief WebSocket connection handler.
217  */
218 static void websocket_cb(struct ast_websocket *session, struct ast_variable *parameters, struct ast_variable *headers)
219 {
220         struct ast_taskprocessor *serializer = NULL;
221         struct transport_create_data create_data;
222         struct ws_transport *transport = NULL;
223
224         if (ast_websocket_set_nonblock(session)) {
225                 ast_websocket_unref(session);
226                 return;
227         }
228
229         if (!(serializer = ast_sip_create_serializer())) {
230                 ast_websocket_unref(session);
231                 return;
232         }
233
234         create_data.ws_session = session;
235
236         if (ast_sip_push_task_synchronous(serializer, transport_create, &create_data)) {
237                 ast_log(LOG_ERROR, "Could not create WebSocket transport.\n");
238                 ast_websocket_unref(session);
239                 return;
240         }
241
242         transport = create_data.transport;
243
244         while (ast_wait_for_input(ast_websocket_fd(session), -1) > 0) {
245                 struct transport_read_data read_data;
246                 enum ast_websocket_opcode opcode;
247                 int fragmented;
248
249                 if (ast_websocket_read(session, &read_data.payload, &read_data.payload_len, &opcode, &fragmented)) {
250                         break;
251                 }
252
253                 if (opcode == AST_WEBSOCKET_OPCODE_TEXT || opcode == AST_WEBSOCKET_OPCODE_BINARY) {
254                         read_data.transport = transport;
255
256                         ast_sip_push_task(serializer, transport_read, &read_data);
257                 } else if (opcode == AST_WEBSOCKET_OPCODE_CLOSE) {
258                         break;
259                 }
260         }
261
262         ast_sip_push_task_synchronous(serializer, transport_shutdown, transport);
263
264         ast_taskprocessor_unreference(serializer);
265         ast_websocket_unref(session);
266 }
267
268 /*!
269  * \brief Session supplement handler for avoiding DNS lookup on bogus address.
270  */
271 static void websocket_outgoing_request(struct ast_sip_session *session, struct pjsip_tx_data *tdata)
272 {
273         char contact_uri[PJSIP_MAX_URL_SIZE] = { 0, };
274         RAII_VAR(struct ast_sip_contact_transport *, ct, NULL, ao2_cleanup);
275         pjsip_tpselector selector = { .type = PJSIP_TPSELECTOR_TRANSPORT, };
276
277         const pjsip_sip_uri *request_uri = pjsip_uri_get_uri(tdata->msg->line.req.uri);
278
279         if (pj_stricmp2(&request_uri->transport_param, "WS") && pj_stricmp2(&request_uri->transport_param, "WSS")) {
280                 return;
281         }
282
283         pjsip_uri_print(PJSIP_URI_IN_REQ_URI, request_uri, contact_uri, sizeof(contact_uri));
284
285         if (!(ct = ast_sip_location_retrieve_contact_transport_by_uri(contact_uri))) {
286                 return;
287         }
288
289         selector.u.transport = ct->transport;
290
291         pjsip_tx_data_set_transport(tdata, &selector);
292
293         tdata->dest_info.addr.count = 1;
294         tdata->dest_info.addr.entry[0].type = ct->transport->key.type;
295         tdata->dest_info.addr.entry[0].addr = ct->transport->key.rem_addr;
296         tdata->dest_info.addr.entry[0].addr_len = ct->transport->addr_len;
297 }
298
299 static struct ast_sip_session_supplement websocket_supplement = {
300         .outgoing_request = websocket_outgoing_request,
301 };
302
303 /*!
304  * \brief Destructor for ast_sip_contact_transport
305  */
306 static void contact_transport_destroy(void *obj)
307 {
308         struct ast_sip_contact_transport *ct = obj;
309
310         ast_string_field_free_memory(ct);
311 }
312
313 static void *contact_transport_alloc(void)
314 {
315         struct ast_sip_contact_transport *ct = ao2_alloc(sizeof(*ct), contact_transport_destroy);
316
317         if (!ct) {
318                 return NULL;
319         }
320
321         if (ast_string_field_init(ct, 256)) {
322                 ao2_cleanup(ct);
323                 return NULL;
324         }
325
326         return ct;
327 }
328
329 /*!
330  * \brief Store the transport a message came in on, so it can be used for outbound messages to that contact.
331  */
332 static pj_bool_t websocket_on_rx_msg(pjsip_rx_data *rdata)
333 {
334         pjsip_contact_hdr *contact_hdr = NULL;
335
336         long type = rdata->tp_info.transport->key.type;
337
338         if (type != (long)transport_type_ws && type != (long)transport_type_wss) {
339                 return PJ_FALSE;
340         }
341
342         if ((contact_hdr = pjsip_msg_find_hdr(rdata->msg_info.msg, PJSIP_H_CONTACT, NULL))) {
343                 RAII_VAR(struct ast_sip_contact_transport *, ct, NULL, ao2_cleanup);
344                 char contact_uri[PJSIP_MAX_URL_SIZE];
345
346                 pjsip_uri_print(PJSIP_URI_IN_CONTACT_HDR, pjsip_uri_get_uri(contact_hdr->uri), contact_uri, sizeof(contact_uri));
347
348                 if (!(ct = ast_sip_location_retrieve_contact_transport_by_uri(contact_uri))) {
349                         if (!(ct = contact_transport_alloc())) {
350                                 return PJ_FALSE;
351                         }
352
353                         ast_string_field_set(ct, uri, contact_uri);
354                         ct->transport = rdata->tp_info.transport;
355
356                         ast_sip_location_add_contact_transport(ct);
357                 }
358         }
359
360         return PJ_FALSE;
361 }
362
363 static pjsip_module websocket_module = {
364         .name = { "WebSocket Transport Module", 26 },
365         .id = -1,
366         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER,
367         .on_rx_request = websocket_on_rx_msg,
368 };
369
370 static int load_module(void)
371 {
372         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WS", 5060, &transport_type_ws);
373         pjsip_transport_register_type(PJSIP_TRANSPORT_RELIABLE, "WSS", 5060, &transport_type_wss);
374
375         if (ast_sip_register_service(&websocket_module) != PJ_SUCCESS) {
376                 return AST_MODULE_LOAD_DECLINE;
377         }
378
379         ast_sip_session_register_supplement(&websocket_supplement);
380
381         if (ast_websocket_add_protocol("sip", websocket_cb)) {
382                 ast_sip_unregister_service(&websocket_module);
383                 return AST_MODULE_LOAD_DECLINE;
384         }
385
386         return AST_MODULE_LOAD_SUCCESS;
387 }
388
389 static int unload_module(void)
390 {
391         ast_sip_unregister_service(&websocket_module);
392         ast_sip_session_unregister_supplement(&websocket_supplement);
393         ast_websocket_remove_protocol("sip", websocket_cb);
394
395         return 0;
396 }
397
398 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_LOAD_ORDER, "PJSIP WebSocket Transport Support",
399                 .load = load_module,
400                 .unload = unload_module,
401                 .load_pri = AST_MODPRI_APP_DEPEND,
402            );