DNS: Need to use the same serializer for a pjproject SIP transaction.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "include/res_pjsip_private.h"
25 #include "asterisk/taskprocessor.h"
26 #include "asterisk/threadpool.h"
27
28 static int distribute(void *data);
29 static pj_bool_t distributor(pjsip_rx_data *rdata);
30 static pj_status_t record_serializer(pjsip_tx_data *tdata);
31
32 static pjsip_module distributor_mod = {
33         .name = {"Request Distributor", 19},
34         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
35         .on_tx_request = record_serializer,
36         .on_rx_request = distributor,
37         .on_rx_response = distributor,
38 };
39
40 /*!
41  * \internal
42  * \brief Record the task's serializer name on the tdata structure.
43  * \since 14.0.0
44  *
45  * \param tdata The outgoing message.
46  *
47  * \retval PJ_SUCCESS.
48  */
49 static pj_status_t record_serializer(pjsip_tx_data *tdata)
50 {
51         struct ast_taskprocessor *serializer;
52
53         serializer = ast_threadpool_serializer_get_current();
54         if (serializer) {
55                 const char *name;
56
57                 name = ast_taskprocessor_name(serializer);
58                 if (!ast_strlen_zero(name)
59                         && (!tdata->mod_data[distributor_mod.id]
60                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
61                         char *tdata_name;
62
63                         /* The serializer in use changed. */
64                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
65                         strcpy(tdata_name, name);/* Safe */
66
67                         tdata->mod_data[distributor_mod.id] = tdata_name;
68                 }
69         }
70
71         return PJ_SUCCESS;
72 }
73
74 /*!
75  * \internal
76  * \brief Find the request tdata to get the serializer it used.
77  * \since 14.0.0
78  *
79  * \param rdata The incoming message.
80  *
81  * \retval serializer on success.
82  * \retval NULL on error or could not find the serializer.
83  */
84 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
85 {
86         struct ast_taskprocessor *serializer = NULL;
87         pj_str_t tsx_key;
88         pjsip_transaction *tsx;
89
90         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
91                 &rdata->msg_info.cseq->method, rdata);
92
93         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
94         if (!tsx) {
95                 ast_debug(1, "Could not find %.*s transaction for %d response.\n",
96                         (int) pj_strlen(&rdata->msg_info.cseq->method.name),
97                         pj_strbuf(&rdata->msg_info.cseq->method.name),
98                         rdata->msg_info.msg->line.status.code);
99                 return NULL;
100         }
101
102         if (tsx->last_tx) {
103                 const char *serializer_name;
104
105                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
106                 if (!ast_strlen_zero(serializer_name)) {
107                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
108                 }
109         }
110
111 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
112         pj_grp_lock_release(tsx->grp_lock);
113 #else
114         pj_mutex_unlock(tsx->mutex);
115 #endif
116
117         return serializer;
118 }
119
120 /*! Dialog-specific information the distributor uses */
121 struct distributor_dialog_data {
122         /*! Serializer to distribute tasks to for this dialog */
123         struct ast_taskprocessor *serializer;
124         /*! Endpoint associated with this dialog */
125         struct ast_sip_endpoint *endpoint;
126 };
127
128 /*!
129  * \internal
130  *
131  * \note Call this with the dialog locked
132  */
133 static struct distributor_dialog_data *distributor_dialog_data_alloc(pjsip_dialog *dlg)
134 {
135         struct distributor_dialog_data *dist;
136
137         dist = PJ_POOL_ZALLOC_T(dlg->pool, struct distributor_dialog_data);
138         pjsip_dlg_set_mod_data(dlg, distributor_mod.id, dist);
139
140         return dist;
141 }
142
143 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
144 {
145         struct distributor_dialog_data *dist;
146         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
147
148         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
149         if (!dist) {
150                 dist = distributor_dialog_data_alloc(dlg);
151         }
152         dist->serializer = serializer;
153 }
154
155 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
156 {
157         struct distributor_dialog_data *dist;
158         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
159
160         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
161         if (!dist) {
162                 dist = distributor_dialog_data_alloc(dlg);
163         }
164         dist->endpoint = endpoint;
165 }
166
167 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
168 {
169         struct distributor_dialog_data *dist;
170         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
171
172         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
173         if (!dist || !dist->endpoint) {
174                 return NULL;
175         }
176         ao2_ref(dist->endpoint, +1);
177         return dist->endpoint;
178 }
179
180 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
181 {
182         pj_str_t tsx_key;
183         pjsip_transaction *tsx;
184         pjsip_dialog *dlg;
185         pj_str_t *local_tag;
186         pj_str_t *remote_tag;
187
188         if (!rdata->msg_info.msg) {
189                 return NULL;
190         }
191
192         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
193                 local_tag = &rdata->msg_info.to->tag;
194                 remote_tag = &rdata->msg_info.from->tag;
195         } else {
196                 local_tag = &rdata->msg_info.from->tag;
197                 remote_tag = &rdata->msg_info.to->tag;
198         }
199
200         /* We can only call the convenient method for
201          *  1) responses
202          *  2) non-CANCEL requests
203          *  3) CANCEL requests with a to-tag
204          */
205         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
206                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
207                         rdata->msg_info.to->tag.slen != 0) {
208                 return pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
209                                 remote_tag, PJ_TRUE);
210         }
211
212         /* Incoming CANCEL without a to-tag can't use same method for finding the
213          * dialog. Instead, we have to find the matching INVITE transaction and
214          * then get the dialog from the transaction
215          */
216         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
217                         pjsip_get_invite_method(), rdata);
218
219         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
220         if (!tsx) {
221                 ast_log(LOG_ERROR, "Could not find matching INVITE transaction for CANCEL request\n");
222                 return NULL;
223         }
224
225         dlg = pjsip_tsx_get_dlg(tsx);
226
227 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
228         pj_grp_lock_release(tsx->grp_lock);
229 #else
230         pj_mutex_unlock(tsx->mutex);
231 #endif
232
233         if (!dlg) {
234                 return NULL;
235         }
236
237         pjsip_dlg_inc_lock(dlg);
238         return dlg;
239 }
240
241 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
242
243 static pjsip_module endpoint_mod = {
244         .name = {"Endpoint Identifier", 19},
245         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
246         .on_rx_request = endpoint_lookup,
247 };
248
249 static pj_bool_t distributor(pjsip_rx_data *rdata)
250 {
251         pjsip_dialog *dlg = find_dialog(rdata);
252         struct distributor_dialog_data *dist = NULL;
253         struct ast_taskprocessor *serializer = NULL;
254         struct ast_taskprocessor *req_serializer = NULL;
255         pjsip_rx_data *clone;
256
257         if (dlg) {
258                 dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
259                 if (dist) {
260                         serializer = dist->serializer;
261                 }
262         }
263
264         if (serializer) {
265                 /* We have a serializer so we know where to send the message. */
266         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
267                 req_serializer = find_request_serializer(rdata);
268                 serializer = req_serializer;
269         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
270                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
271                 /* We have a BYE or CANCEL request without a serializer. */
272                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
273                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
274                 goto end;
275         }
276
277         pjsip_rx_data_clone(rdata, 0, &clone);
278
279         if (dist) {
280                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
281         }
282
283         ast_sip_push_task(serializer, distribute, clone);
284
285 end:
286         if (dlg) {
287                 pjsip_dlg_dec_lock(dlg);
288         }
289         ast_taskprocessor_unreference(req_serializer);
290
291         return PJ_TRUE;
292 }
293
294 static struct ast_sip_auth *artificial_auth;
295
296 static int create_artificial_auth(void)
297 {
298         if (!(artificial_auth = ast_sorcery_alloc(
299                       ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE, "artificial"))) {
300                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
301                 return -1;
302         }
303
304         ast_string_field_set(artificial_auth, realm, "asterisk");
305         ast_string_field_set(artificial_auth, auth_user, "");
306         ast_string_field_set(artificial_auth, auth_pass, "");
307         artificial_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
308         return 0;
309 }
310
311 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
312 {
313         ao2_ref(artificial_auth, +1);
314         return artificial_auth;
315 }
316
317 static struct ast_sip_endpoint *artificial_endpoint = NULL;
318
319 static int create_artificial_endpoint(void)
320 {
321         if (!(artificial_endpoint = ast_sorcery_alloc(
322                       ast_sip_get_sorcery(), "endpoint", NULL))) {
323                 return -1;
324         }
325
326         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
327         /* Pushing a bogus value into the vector will ensure that
328          * the proper size of the vector is returned. This value is
329          * not actually used anywhere
330          */
331         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
332         return 0;
333 }
334
335 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
336 {
337         ao2_ref(artificial_endpoint, +1);
338         return artificial_endpoint;
339 }
340
341 static void log_unidentified_request(pjsip_rx_data *rdata)
342 {
343         char from_buf[PJSIP_MAX_URL_SIZE];
344         char callid_buf[PJSIP_MAX_URL_SIZE];
345         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
346         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
347         ast_log(LOG_NOTICE, "Request from '%s' failed for '%s:%d' (callid: %s) - No matching endpoint found\n",
348                 from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf);
349 }
350
351 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
352 {
353         struct ast_sip_endpoint *endpoint;
354         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
355
356         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
357         if (endpoint) {
358                 return PJ_FALSE;
359         }
360
361         endpoint = ast_sip_identify_endpoint(rdata);
362
363         if (!endpoint && !is_ack) {
364                 char name[AST_UUID_STR_LEN] = "";
365                 pjsip_uri *from = rdata->msg_info.from->uri;
366
367                 /* always use an artificial endpoint - per discussion no reason
368                    to have "alwaysauthreject" as an option.  It is felt using it
369                    was a bug fix and it is not needed since we are not worried about
370                    breaking old stuff and we really don't want to enable the discovery
371                    of SIP accounts */
372                 endpoint = ast_sip_get_artificial_endpoint();
373
374                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
375                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
376                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
377                 }
378
379                 log_unidentified_request(rdata);
380                 ast_sip_report_invalid_endpoint(name, rdata);
381         }
382         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
383         return PJ_FALSE;
384 }
385
386 static pj_bool_t authenticate(pjsip_rx_data *rdata)
387 {
388         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
389         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
390
391         ast_assert(endpoint != NULL);
392
393         if (!is_ack && ast_sip_requires_authentication(endpoint, rdata)) {
394                 pjsip_tx_data *tdata;
395                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
396                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
397                 case AST_SIP_AUTHENTICATION_CHALLENGE:
398                         /* Send the 401 we created for them */
399                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
400                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
401                         return PJ_TRUE;
402                 case AST_SIP_AUTHENTICATION_SUCCESS:
403                         ast_sip_report_auth_success(endpoint, rdata);
404                         pjsip_tx_data_dec_ref(tdata);
405                         return PJ_FALSE;
406                 case AST_SIP_AUTHENTICATION_FAILED:
407                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
408                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
409                         return PJ_TRUE;
410                 case AST_SIP_AUTHENTICATION_ERROR:
411                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
412                         pjsip_tx_data_dec_ref(tdata);
413                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
414                         return PJ_TRUE;
415                 }
416         }
417
418         return PJ_FALSE;
419 }
420
421 static pjsip_module auth_mod = {
422         .name = {"Request Authenticator", 21},
423         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
424         .on_rx_request = authenticate,
425 };
426
427 static int distribute(void *data)
428 {
429         static pjsip_process_rdata_param param = {
430                 .start_mod = &distributor_mod,
431                 .idx_after_start = 1,
432         };
433         pj_bool_t handled;
434         pjsip_rx_data *rdata = data;
435         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
436         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
437         struct ast_sip_endpoint *endpoint;
438
439         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
440         if (!handled && is_request && !is_ack) {
441                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
442         }
443
444         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
445          * is the only appropriate spot to actually decrement the reference.
446          */
447         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
448         ao2_cleanup(endpoint);
449         pjsip_rx_data_free_cloned(rdata);
450         return 0;
451 }
452
453 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
454 {
455         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
456         if (endpoint) {
457                 ao2_ref(endpoint, +1);
458         }
459         return endpoint;
460 }
461
462 int ast_sip_initialize_distributor(void)
463 {
464         if (create_artificial_endpoint() || create_artificial_auth()) {
465                 return -1;
466         }
467
468         if (internal_sip_register_service(&distributor_mod)) {
469                 return -1;
470         }
471         if (internal_sip_register_service(&endpoint_mod)) {
472                 return -1;
473         }
474         if (internal_sip_register_service(&auth_mod)) {
475                 return -1;
476         }
477
478         return 0;
479 }
480
481 void ast_sip_destroy_distributor(void)
482 {
483         internal_sip_unregister_service(&distributor_mod);
484         internal_sip_unregister_service(&endpoint_mod);
485         internal_sip_unregister_service(&auth_mod);
486
487         ao2_cleanup(artificial_auth);
488         ao2_cleanup(artificial_endpoint);
489 }