pjsip_distributor: Add missing newline to NOTICE
[asterisk/asterisk.git] / res / res_pjsip / pjsip_distributor.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Mark Michelson <mmichelson@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <pjsip.h>
22
23 #include "asterisk/res_pjsip.h"
24 #include "include/res_pjsip_private.h"
25 #include "asterisk/taskprocessor.h"
26 #include "asterisk/threadpool.h"
27 #include "asterisk/res_pjsip_cli.h"
28
29 static int distribute(void *data);
30 static pj_bool_t distributor(pjsip_rx_data *rdata);
31 static pj_status_t record_serializer(pjsip_tx_data *tdata);
32
33 static pjsip_module distributor_mod = {
34         .name = {"Request Distributor", 19},
35         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 6,
36         .on_tx_request = record_serializer,
37         .on_rx_request = distributor,
38         .on_rx_response = distributor,
39 };
40
41 struct ast_sched_context *prune_context;
42
43 /* From the auth/realm realtime column size */
44 #define MAX_REALM_LENGTH 40
45 static char default_realm[MAX_REALM_LENGTH + 1];
46
47 #define DEFAULT_SUSPECTS_BUCKETS 53
48
49 static struct ao2_container *unidentified_requests;
50 static unsigned int unidentified_count;
51 static unsigned int unidentified_period;
52 static unsigned int unidentified_prune_interval;
53 static int using_auth_username;
54
55 struct unidentified_request{
56         struct timeval first_seen;
57         int count;
58         char src_name[];
59 };
60
61 /*!
62  * \internal
63  * \brief Record the task's serializer name on the tdata structure.
64  * \since 14.0.0
65  *
66  * \param tdata The outgoing message.
67  *
68  * \retval PJ_SUCCESS.
69  */
70 static pj_status_t record_serializer(pjsip_tx_data *tdata)
71 {
72         struct ast_taskprocessor *serializer;
73
74         serializer = ast_threadpool_serializer_get_current();
75         if (serializer) {
76                 const char *name;
77
78                 name = ast_taskprocessor_name(serializer);
79                 if (!ast_strlen_zero(name)
80                         && (!tdata->mod_data[distributor_mod.id]
81                                 || strcmp(tdata->mod_data[distributor_mod.id], name))) {
82                         char *tdata_name;
83
84                         /* The serializer in use changed. */
85                         tdata_name = pj_pool_alloc(tdata->pool, strlen(name) + 1);
86                         strcpy(tdata_name, name);/* Safe */
87
88                         tdata->mod_data[distributor_mod.id] = tdata_name;
89                 }
90         }
91
92         return PJ_SUCCESS;
93 }
94
95 /*!
96  * \internal
97  * \brief Find the request tdata to get the serializer it used.
98  * \since 14.0.0
99  *
100  * \param rdata The incoming message.
101  *
102  * \retval serializer on success.
103  * \retval NULL on error or could not find the serializer.
104  */
105 static struct ast_taskprocessor *find_request_serializer(pjsip_rx_data *rdata)
106 {
107         struct ast_taskprocessor *serializer = NULL;
108         pj_str_t tsx_key;
109         pjsip_transaction *tsx;
110
111         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAC,
112                 &rdata->msg_info.cseq->method, rdata);
113
114         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
115         if (!tsx) {
116                 ast_debug(1, "Could not find %.*s transaction for %d response.\n",
117                         (int) pj_strlen(&rdata->msg_info.cseq->method.name),
118                         pj_strbuf(&rdata->msg_info.cseq->method.name),
119                         rdata->msg_info.msg->line.status.code);
120                 return NULL;
121         }
122
123         if (tsx->last_tx) {
124                 const char *serializer_name;
125
126                 serializer_name = tsx->last_tx->mod_data[distributor_mod.id];
127                 if (!ast_strlen_zero(serializer_name)) {
128                         serializer = ast_taskprocessor_get(serializer_name, TPS_REF_IF_EXISTS);
129                         if (serializer) {
130                                 ast_debug(3, "Found serializer %s on transaction %s\n",
131                                                 serializer_name, tsx->obj_name);
132                         }
133                 }
134         }
135
136 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
137         pj_grp_lock_release(tsx->grp_lock);
138 #else
139         pj_mutex_unlock(tsx->mutex);
140 #endif
141
142         return serializer;
143 }
144
145 /*! Dialog-specific information the distributor uses */
146 struct distributor_dialog_data {
147         /*! Serializer to distribute tasks to for this dialog */
148         struct ast_taskprocessor *serializer;
149         /*! Endpoint associated with this dialog */
150         struct ast_sip_endpoint *endpoint;
151 };
152
153 /*!
154  * \internal
155  *
156  * \note Call this with the dialog locked
157  */
158 static struct distributor_dialog_data *distributor_dialog_data_alloc(pjsip_dialog *dlg)
159 {
160         struct distributor_dialog_data *dist;
161
162         dist = PJ_POOL_ZALLOC_T(dlg->pool, struct distributor_dialog_data);
163         pjsip_dlg_set_mod_data(dlg, distributor_mod.id, dist);
164
165         return dist;
166 }
167
168 void ast_sip_dialog_set_serializer(pjsip_dialog *dlg, struct ast_taskprocessor *serializer)
169 {
170         struct distributor_dialog_data *dist;
171         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
172
173         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
174         if (!dist) {
175                 dist = distributor_dialog_data_alloc(dlg);
176         }
177         dist->serializer = serializer;
178 }
179
180 void ast_sip_dialog_set_endpoint(pjsip_dialog *dlg, struct ast_sip_endpoint *endpoint)
181 {
182         struct distributor_dialog_data *dist;
183         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
184
185         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
186         if (!dist) {
187                 dist = distributor_dialog_data_alloc(dlg);
188         }
189         dist->endpoint = endpoint;
190 }
191
192 struct ast_sip_endpoint *ast_sip_dialog_get_endpoint(pjsip_dialog *dlg)
193 {
194         struct distributor_dialog_data *dist;
195         SCOPED_LOCK(lock, dlg, pjsip_dlg_inc_lock, pjsip_dlg_dec_lock);
196
197         dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
198         if (!dist || !dist->endpoint) {
199                 return NULL;
200         }
201         ao2_ref(dist->endpoint, +1);
202         return dist->endpoint;
203 }
204
205 static pjsip_dialog *find_dialog(pjsip_rx_data *rdata)
206 {
207         pj_str_t tsx_key;
208         pjsip_transaction *tsx;
209         pjsip_dialog *dlg;
210         pj_str_t *local_tag;
211         pj_str_t *remote_tag;
212
213         if (!rdata->msg_info.msg) {
214                 return NULL;
215         }
216
217         if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
218                 local_tag = &rdata->msg_info.to->tag;
219                 remote_tag = &rdata->msg_info.from->tag;
220         } else {
221                 local_tag = &rdata->msg_info.from->tag;
222                 remote_tag = &rdata->msg_info.to->tag;
223         }
224
225         /* We can only call the convenient method for
226          *  1) responses
227          *  2) non-CANCEL requests
228          *  3) CANCEL requests with a to-tag
229          */
230         if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG ||
231                         pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method) ||
232                         rdata->msg_info.to->tag.slen != 0) {
233                 return pjsip_ua_find_dialog(&rdata->msg_info.cid->id, local_tag,
234                                 remote_tag, PJ_TRUE);
235         }
236
237         /* Incoming CANCEL without a to-tag can't use same method for finding the
238          * dialog. Instead, we have to find the matching INVITE transaction and
239          * then get the dialog from the transaction
240          */
241         pjsip_tsx_create_key(rdata->tp_info.pool, &tsx_key, PJSIP_ROLE_UAS,
242                         pjsip_get_invite_method(), rdata);
243
244         tsx = pjsip_tsx_layer_find_tsx(&tsx_key, PJ_TRUE);
245         if (!tsx) {
246                 ast_log(LOG_ERROR, "Could not find matching INVITE transaction for CANCEL request\n");
247                 return NULL;
248         }
249
250         dlg = pjsip_tsx_get_dlg(tsx);
251
252 #ifdef HAVE_PJ_TRANSACTION_GRP_LOCK
253         pj_grp_lock_release(tsx->grp_lock);
254 #else
255         pj_mutex_unlock(tsx->mutex);
256 #endif
257
258         if (!dlg) {
259                 return NULL;
260         }
261
262         pjsip_dlg_inc_lock(dlg);
263         return dlg;
264 }
265
266 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata);
267
268 static pjsip_module endpoint_mod = {
269         .name = {"Endpoint Identifier", 19},
270         .priority = PJSIP_MOD_PRIORITY_TSX_LAYER - 3,
271         .on_rx_request = endpoint_lookup,
272 };
273
274 #define SIP_MAX_QUEUE (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 3)
275
276 static pj_bool_t distributor(pjsip_rx_data *rdata)
277 {
278         pjsip_dialog *dlg = find_dialog(rdata);
279         struct distributor_dialog_data *dist = NULL;
280         struct ast_taskprocessor *serializer = NULL;
281         pjsip_rx_data *clone;
282
283         if (dlg) {
284                 ast_debug(3, "Searching for serializer on dialog %s for %s\n",
285                                 dlg->obj_name, rdata->msg_info.info);
286                 dist = pjsip_dlg_get_mod_data(dlg, distributor_mod.id);
287                 if (dist) {
288                         serializer = ao2_bump(dist->serializer);
289                         if (serializer) {
290                                 ast_debug(3, "Found serializer %s on dialog %s\n",
291                                                 ast_taskprocessor_name(serializer), dlg->obj_name);
292                         }
293                 }
294                 pjsip_dlg_dec_lock(dlg);
295         }
296
297         if (serializer) {
298                 /* We have a serializer so we know where to send the message. */
299         } else if (rdata->msg_info.msg->type == PJSIP_RESPONSE_MSG) {
300                 ast_debug(3, "No dialog serializer for response %s. Using request transaction as basis\n",
301                                 rdata->msg_info.info);
302                 serializer = find_request_serializer(rdata);
303         } else if (!pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_cancel_method)
304                 || !pjsip_method_cmp(&rdata->msg_info.msg->line.req.method, &pjsip_bye_method)) {
305                 /* We have a BYE or CANCEL request without a serializer. */
306                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata,
307                         PJSIP_SC_CALL_TSX_DOES_NOT_EXIST, NULL, NULL, NULL);
308                 return PJ_TRUE;
309         }
310
311         pjsip_rx_data_clone(rdata, 0, &clone);
312
313         if (dist) {
314                 clone->endpt_info.mod_data[endpoint_mod.id] = ao2_bump(dist->endpoint);
315         }
316
317         if (ast_sip_threadpool_queue_size() > SIP_MAX_QUEUE) {
318                 /* When the threadpool is backed up this much, there is a good chance that we have encountered
319                  * some sort of terrible condition and don't need to be adding more work to the threadpool.
320                  * It's in our best interest to send back a 503 response and be done with it.
321                  */
322                 if (rdata->msg_info.msg->type == PJSIP_REQUEST_MSG) {
323                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 503, NULL, NULL, NULL);
324                 }
325                 ao2_cleanup(clone->endpt_info.mod_data[endpoint_mod.id]);
326                 pjsip_rx_data_free_cloned(clone);
327         } else {
328                 ast_sip_push_task(serializer, distribute, clone);
329         }
330
331         ast_taskprocessor_unreference(serializer);
332
333         return PJ_TRUE;
334 }
335
336 static struct ast_sip_auth *artificial_auth;
337
338 static int create_artificial_auth(void)
339 {
340         if (!(artificial_auth = ast_sorcery_alloc(
341                       ast_sip_get_sorcery(), SIP_SORCERY_AUTH_TYPE, "artificial"))) {
342                 ast_log(LOG_ERROR, "Unable to create artificial auth\n");
343                 return -1;
344         }
345
346         ast_string_field_set(artificial_auth, realm, default_realm);
347         ast_string_field_set(artificial_auth, auth_user, "");
348         ast_string_field_set(artificial_auth, auth_pass, "");
349         artificial_auth->type = AST_SIP_AUTH_TYPE_ARTIFICIAL;
350         return 0;
351 }
352
353 struct ast_sip_auth *ast_sip_get_artificial_auth(void)
354 {
355         ao2_ref(artificial_auth, +1);
356         return artificial_auth;
357 }
358
359 static struct ast_sip_endpoint *artificial_endpoint = NULL;
360
361 static int create_artificial_endpoint(void)
362 {
363         if (!(artificial_endpoint = ast_sorcery_alloc(
364                       ast_sip_get_sorcery(), "endpoint", NULL))) {
365                 return -1;
366         }
367
368         AST_VECTOR_INIT(&artificial_endpoint->inbound_auths, 1);
369         /* Pushing a bogus value into the vector will ensure that
370          * the proper size of the vector is returned. This value is
371          * not actually used anywhere
372          */
373         AST_VECTOR_APPEND(&artificial_endpoint->inbound_auths, ast_strdup("artificial-auth"));
374         return 0;
375 }
376
377 struct ast_sip_endpoint *ast_sip_get_artificial_endpoint(void)
378 {
379         ao2_ref(artificial_endpoint, +1);
380         return artificial_endpoint;
381 }
382
383 static void log_unidentified_request(pjsip_rx_data *rdata, unsigned int count, unsigned int period)
384 {
385         char from_buf[PJSIP_MAX_URL_SIZE];
386         char callid_buf[PJSIP_MAX_URL_SIZE];
387         pjsip_uri_print(PJSIP_URI_IN_FROMTO_HDR, rdata->msg_info.from->uri, from_buf, PJSIP_MAX_URL_SIZE);
388         ast_copy_pj_str(callid_buf, &rdata->msg_info.cid->id, PJSIP_MAX_URL_SIZE);
389         if (count) {
390                 ast_log(LOG_NOTICE, "Request from '%s' failed for '%s:%d' (callid: %s) - No matching endpoint found"
391                         " after %u tries in %.3f ms\n",
392                         from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf, count, period / 1000.0);
393         } else {
394                 ast_log(LOG_NOTICE, "Request from '%s' failed for '%s:%d' (callid: %s) - No matching endpoint found\n",
395                         from_buf, rdata->pkt_info.src_name, rdata->pkt_info.src_port, callid_buf);
396         }
397 }
398
399 static void check_endpoint(pjsip_rx_data *rdata, struct unidentified_request *unid,
400         const char *name)
401 {
402         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
403
404         ao2_wrlock(unid);
405         unid->count++;
406
407         if (ms < (unidentified_period * 1000) && unid->count >= unidentified_count) {
408                 log_unidentified_request(rdata, unid->count, ms);
409                 ast_sip_report_invalid_endpoint(name, rdata);
410         }
411         ao2_unlock(unid);
412 }
413
414 static pj_bool_t endpoint_lookup(pjsip_rx_data *rdata)
415 {
416         struct ast_sip_endpoint *endpoint;
417         struct unidentified_request *unid;
418         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
419
420         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
421         if (endpoint) {
422                 /*
423                  * ao2_find with OBJ_UNLINK always write locks the container before even searching
424                  * for the object.  Since the majority case is that the object won't be found, do
425                  * the find without OBJ_UNLINK to prevent the unnecessary write lock, then unlink
426                  * if needed.
427                  */
428                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
429                         ao2_unlink(unidentified_requests, unid);
430                         ao2_ref(unid, -1);
431                 }
432                 return PJ_FALSE;
433         }
434
435         endpoint = ast_sip_identify_endpoint(rdata);
436         if (endpoint) {
437                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
438                         ao2_unlink(unidentified_requests, unid);
439                         ao2_ref(unid, -1);
440                 }
441         }
442
443         if (!endpoint && !is_ack) {
444                 char name[AST_UUID_STR_LEN] = "";
445                 pjsip_uri *from = rdata->msg_info.from->uri;
446
447                 /* always use an artificial endpoint - per discussion no reason
448                    to have "alwaysauthreject" as an option.  It is felt using it
449                    was a bug fix and it is not needed since we are not worried about
450                    breaking old stuff and we really don't want to enable the discovery
451                    of SIP accounts */
452                 endpoint = ast_sip_get_artificial_endpoint();
453
454                 if (PJSIP_URI_SCHEME_IS_SIP(from) || PJSIP_URI_SCHEME_IS_SIPS(from)) {
455                         pjsip_sip_uri *sip_from = pjsip_uri_get_uri(from);
456                         ast_copy_pj_str(name, &sip_from->user, sizeof(name));
457                 }
458
459                 if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
460                         check_endpoint(rdata, unid, name);
461                         ao2_ref(unid, -1);
462                 } else if (using_auth_username) {
463                         ao2_wrlock(unidentified_requests);
464                         /* The check again with the write lock held allows us to eliminate the DUPS_REPLACE and sort_fn */
465                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY | OBJ_NOLOCK))) {
466                                 check_endpoint(rdata, unid, name);
467                         } else {
468                                 unid = ao2_alloc_options(sizeof(*unid) + strlen(rdata->pkt_info.src_name) + 1, NULL,
469                                         AO2_ALLOC_OPT_LOCK_RWLOCK);
470                                 if (!unid) {
471                                         ao2_unlock(unidentified_requests);
472                                         return PJ_TRUE;
473                                 }
474                                 strcpy(unid->src_name, rdata->pkt_info.src_name); /* Safe */
475                                 unid->first_seen = ast_tvnow();
476                                 unid->count = 1;
477                                 ao2_link_flags(unidentified_requests, unid, OBJ_NOLOCK);
478                         }
479                         ao2_ref(unid, -1);
480                         ao2_unlock(unidentified_requests);
481                 } else {
482                         log_unidentified_request(rdata, 0, 0);
483                         ast_sip_report_invalid_endpoint(name, rdata);
484                 }
485         }
486         rdata->endpt_info.mod_data[endpoint_mod.id] = endpoint;
487         return PJ_FALSE;
488 }
489
490 static pj_bool_t authenticate(pjsip_rx_data *rdata)
491 {
492         RAII_VAR(struct ast_sip_endpoint *, endpoint, ast_pjsip_rdata_get_endpoint(rdata), ao2_cleanup);
493         int is_ack = rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD;
494
495         ast_assert(endpoint != NULL);
496
497         if (!is_ack && ast_sip_requires_authentication(endpoint, rdata)) {
498                 pjsip_tx_data *tdata;
499                 struct unidentified_request *unid;
500
501                 pjsip_endpt_create_response(ast_sip_get_pjsip_endpoint(), rdata, 401, NULL, &tdata);
502                 switch (ast_sip_check_authentication(endpoint, rdata, tdata)) {
503                 case AST_SIP_AUTHENTICATION_CHALLENGE:
504                         /* Send the 401 we created for them */
505                         ast_sip_report_auth_challenge_sent(endpoint, rdata, tdata);
506                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
507                         return PJ_TRUE;
508                 case AST_SIP_AUTHENTICATION_SUCCESS:
509                         /* See note in endpoint_lookup about not holding an unnecessary write lock */
510                         if ((unid = ao2_find(unidentified_requests, rdata->pkt_info.src_name, OBJ_SEARCH_KEY))) {
511                                 ao2_unlink(unidentified_requests, unid);
512                                 ao2_ref(unid, -1);
513                         }
514                         ast_sip_report_auth_success(endpoint, rdata);
515                         pjsip_tx_data_dec_ref(tdata);
516                         return PJ_FALSE;
517                 case AST_SIP_AUTHENTICATION_FAILED:
518                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
519                         pjsip_endpt_send_response2(ast_sip_get_pjsip_endpoint(), rdata, tdata, NULL, NULL);
520                         return PJ_TRUE;
521                 case AST_SIP_AUTHENTICATION_ERROR:
522                         ast_sip_report_auth_failed_challenge_response(endpoint, rdata);
523                         pjsip_tx_data_dec_ref(tdata);
524                         pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 500, NULL, NULL, NULL);
525                         return PJ_TRUE;
526                 }
527         }
528
529         return PJ_FALSE;
530 }
531
532 static pjsip_module auth_mod = {
533         .name = {"Request Authenticator", 21},
534         .priority = PJSIP_MOD_PRIORITY_APPLICATION - 2,
535         .on_rx_request = authenticate,
536 };
537
538 static int distribute(void *data)
539 {
540         static pjsip_process_rdata_param param = {
541                 .start_mod = &distributor_mod,
542                 .idx_after_start = 1,
543         };
544         pj_bool_t handled;
545         pjsip_rx_data *rdata = data;
546         int is_request = rdata->msg_info.msg->type == PJSIP_REQUEST_MSG;
547         int is_ack = is_request ? rdata->msg_info.msg->line.req.method.id == PJSIP_ACK_METHOD : 0;
548         struct ast_sip_endpoint *endpoint;
549
550         pjsip_endpt_process_rx_data(ast_sip_get_pjsip_endpoint(), rdata, &param, &handled);
551         if (!handled && is_request && !is_ack) {
552                 pjsip_endpt_respond_stateless(ast_sip_get_pjsip_endpoint(), rdata, 501, NULL, NULL, NULL);
553         }
554
555         /* The endpoint_mod stores an endpoint reference in the mod_data of rdata. This
556          * is the only appropriate spot to actually decrement the reference.
557          */
558         endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
559         ao2_cleanup(endpoint);
560         pjsip_rx_data_free_cloned(rdata);
561         return 0;
562 }
563
564 struct ast_sip_endpoint *ast_pjsip_rdata_get_endpoint(pjsip_rx_data *rdata)
565 {
566         struct ast_sip_endpoint *endpoint = rdata->endpt_info.mod_data[endpoint_mod.id];
567         if (endpoint) {
568                 ao2_ref(endpoint, +1);
569         }
570         return endpoint;
571 }
572
573 static int suspects_sort(const void *obj, const void *arg, int flags)
574 {
575         const struct unidentified_request *object_left = obj;
576         const struct unidentified_request *object_right = arg;
577         const char *right_key = arg;
578         int cmp;
579
580         switch (flags & OBJ_SEARCH_MASK) {
581         case OBJ_SEARCH_OBJECT:
582                 right_key = object_right->src_name;
583                 /* Fall through */
584         case OBJ_SEARCH_KEY:
585                 cmp = strcmp(object_left->src_name, right_key);
586                 break;
587         case OBJ_SEARCH_PARTIAL_KEY:
588                 cmp = strncmp(object_left->src_name, right_key, strlen(right_key));
589                 break;
590         default:
591                 cmp = 0;
592                 break;
593         }
594         return cmp;
595 }
596
597 static int suspects_compare(void *obj, void *arg, int flags)
598 {
599         const struct unidentified_request *object_left = obj;
600         const struct unidentified_request *object_right = arg;
601         const char *right_key = arg;
602         int cmp = 0;
603
604         switch (flags & OBJ_SEARCH_MASK) {
605         case OBJ_SEARCH_OBJECT:
606                 right_key = object_right->src_name;
607                 /* Fall through */
608         case OBJ_SEARCH_KEY:
609                 if (strcmp(object_left->src_name, right_key) == 0) {
610                         cmp = CMP_MATCH | CMP_STOP;
611                 }
612                 break;
613         case OBJ_SEARCH_PARTIAL_KEY:
614                 if (strncmp(object_left->src_name, right_key, strlen(right_key)) == 0) {
615                         cmp = CMP_MATCH;
616                 }
617                 break;
618         default:
619                 cmp = 0;
620                 break;
621         }
622         return cmp;
623 }
624
625 static int suspects_hash(const void *obj, int flags) {
626         const struct unidentified_request *object_left = obj;
627
628         if (flags & OBJ_SEARCH_OBJECT) {
629                 return ast_str_hash(object_left->src_name);
630         } else if (flags & OBJ_SEARCH_KEY) {
631                 return ast_str_hash(obj);
632         }
633         return -1;
634 }
635
636 static struct ao2_container *cli_unid_get_container(const char *regex)
637 {
638         struct ao2_container *s_container;
639
640         s_container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0,
641                 suspects_sort, suspects_compare);
642         if (!s_container) {
643                 return NULL;
644         }
645
646         if (ao2_container_dup(s_container, unidentified_requests, 0)) {
647                 ao2_ref(s_container, -1);
648                 return NULL;
649         }
650
651         return s_container;
652 }
653
654 static int cli_unid_iterate(void *container, ao2_callback_fn callback, void *args)
655 {
656         ao2_callback(container, 0, callback, args);
657
658         return 0;
659 }
660
661 static void *cli_unid_retrieve_by_id(const char *id)
662 {
663         return ao2_find(unidentified_requests, id, OBJ_SEARCH_KEY);
664 }
665
666 static const char *cli_unid_get_id(const void *obj)
667 {
668         const struct unidentified_request *unid = obj;
669
670         return unid->src_name;
671 }
672
673 static int cli_unid_print_header(void *obj, void *arg, int flags)
674 {
675         struct ast_sip_cli_context *context = arg;
676         RAII_VAR(struct ast_sip_cli_formatter_entry *, formatter_entry, NULL, ao2_cleanup);
677
678         int indent = CLI_INDENT_TO_SPACES(context->indent_level);
679         int filler = CLI_LAST_TABSTOP - indent - 7;
680
681         ast_assert(context->output_buffer != NULL);
682
683         ast_str_append(&context->output_buffer, 0,
684                 "%*s:  <IP Address%*.*s>  <Count> <Age(sec)>\n",
685                 indent, "Request", filler, filler, CLI_HEADER_FILLER);
686
687         return 0;
688 }
689 static int cli_unid_print_body(void *obj, void *arg, int flags)
690 {
691         struct unidentified_request *unid = obj;
692         struct ast_sip_cli_context *context = arg;
693         int indent;
694         int flexwidth;
695         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
696
697         ast_assert(context->output_buffer != NULL);
698
699         indent = CLI_INDENT_TO_SPACES(context->indent_level);
700         flexwidth = CLI_LAST_TABSTOP - 4;
701
702         ast_str_append(&context->output_buffer, 0, "%*s:  %-*.*s  %7d %10.3f\n",
703                 indent,
704                 "Request",
705                 flexwidth, flexwidth,
706                 unid->src_name, unid->count,  ms / 1000.0);
707
708         return 0;
709 }
710
711 static struct ast_cli_entry cli_commands[] = {
712         AST_CLI_DEFINE(ast_sip_cli_traverse_objects, "Show PJSIP Unidentified Requests",
713                 .command = "pjsip show unidentified_requests",
714                 .usage = "Usage: pjsip show unidentified_requests\n"
715                                 "       Show the PJSIP Unidentified Requests\n"),
716 };
717
718 struct ast_sip_cli_formatter_entry *unid_formatter;
719
720 static int expire_requests(void *object, void *arg, int flags)
721 {
722         struct unidentified_request *unid = object;
723         int *maxage = arg;
724         int64_t ms = ast_tvdiff_ms(ast_tvnow(), unid->first_seen);
725
726         if (ms > (*maxage) * 2 * 1000) {
727                 return CMP_MATCH;
728         }
729
730         return 0;
731 }
732
733 static int prune_task(const void *data)
734 {
735         unsigned int maxage;
736
737         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
738         maxage = unidentified_period * 2;
739         ao2_callback(unidentified_requests, OBJ_MULTIPLE | OBJ_NODATA | OBJ_UNLINK, expire_requests, &maxage);
740
741         return unidentified_prune_interval * 1000;
742 }
743
744 static int clean_task(const void *data)
745 {
746         return 0;
747 }
748
749 static void global_loaded(const char *object_type)
750 {
751         char *identifier_order = ast_sip_get_endpoint_identifier_order();
752         char *io_copy = ast_strdupa(identifier_order);
753         char *identify_method;
754
755         ast_free(identifier_order);
756         using_auth_username = 0;
757         while ((identify_method = ast_strip(strsep(&io_copy, ",")))) {
758                 if (!strcmp(identify_method, "auth_username")) {
759                         using_auth_username = 1;
760                         break;
761                 }
762         }
763
764         ast_sip_get_default_realm(default_realm, sizeof(default_realm));
765         ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
766
767         /* Clean out the old task, if any */
768         ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
769         if (ast_sched_add_variable(prune_context, unidentified_prune_interval * 1000, prune_task, NULL, 1) < 0) {
770                 return;
771         }
772 }
773
774 /*! \brief Observer which is used to update our interval and default_realm when the global setting changes */
775 static struct ast_sorcery_observer global_observer = {
776         .loaded = global_loaded,
777 };
778
779
780 int ast_sip_initialize_distributor(void)
781 {
782         unidentified_requests = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
783                 DEFAULT_SUSPECTS_BUCKETS, suspects_hash, NULL, suspects_compare);
784         if (!unidentified_requests) {
785                 return -1;
786         }
787
788         prune_context = ast_sched_context_create();
789         if (!prune_context) {
790                 ast_sip_destroy_distributor();
791                 return -1;
792         }
793
794         if (ast_sched_start_thread(prune_context)) {
795                 ast_sip_destroy_distributor();
796                 return -1;
797         }
798
799         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &global_observer);
800         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
801
802         if (create_artificial_endpoint() || create_artificial_auth()) {
803                 ast_sip_destroy_distributor();
804                 return -1;
805         }
806
807         if (internal_sip_register_service(&distributor_mod)) {
808                 ast_sip_destroy_distributor();
809                 return -1;
810         }
811         if (internal_sip_register_service(&endpoint_mod)) {
812                 ast_sip_destroy_distributor();
813                 return -1;
814         }
815         if (internal_sip_register_service(&auth_mod)) {
816                 ast_sip_destroy_distributor();
817                 return -1;
818         }
819
820         unid_formatter = ao2_alloc(sizeof(struct ast_sip_cli_formatter_entry), NULL);
821         if (!unid_formatter) {
822                 ast_log(LOG_ERROR, "Unable to allocate memory for unid_formatter\n");
823                 return -1;
824         }
825         unid_formatter->name = "unidentified_request";
826         unid_formatter->print_header = cli_unid_print_header;
827         unid_formatter->print_body = cli_unid_print_body;
828         unid_formatter->get_container = cli_unid_get_container;
829         unid_formatter->iterate = cli_unid_iterate;
830         unid_formatter->get_id = cli_unid_get_id;
831         unid_formatter->retrieve_by_id = cli_unid_retrieve_by_id;
832         ast_sip_register_cli_formatter(unid_formatter);
833         ast_cli_register_multiple(cli_commands, ARRAY_LEN(cli_commands));
834
835         return 0;
836 }
837
838 void ast_sip_destroy_distributor(void)
839 {
840         ast_cli_unregister_multiple(cli_commands, ARRAY_LEN(cli_commands));
841         ast_sip_unregister_cli_formatter(unid_formatter);
842
843         internal_sip_unregister_service(&distributor_mod);
844         internal_sip_unregister_service(&endpoint_mod);
845         internal_sip_unregister_service(&auth_mod);
846
847         ao2_cleanup(artificial_auth);
848         ao2_cleanup(artificial_endpoint);
849         ao2_cleanup(unidentified_requests);
850
851         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &global_observer);
852
853         if (prune_context) {
854                 ast_sched_context_destroy(prune_context);
855         }
856 }