astobj2: Remove legacy ao2_container_alloc routine.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_transport_management.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <signal.h>
22
23 #include <pjsip.h>
24 #include <pjsip_ua.h>
25
26 #include "asterisk/res_pjsip.h"
27 #include "asterisk/module.h"
28 #include "asterisk/astobj2.h"
29 #include "include/res_pjsip_private.h"
30
31 /*! \brief Number of buckets for monitored transports */
32 #define TRANSPORTS_BUCKETS 127
33
34 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
35
36 /*! \brief The keep alive packet to send */
37 static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
38
39 /*! \brief Global container of active transports */
40 static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
41
42 /*! \brief Scheduler context for timing out connections with no data received */
43 static struct ast_sched_context *sched;
44
45 /*! \brief Thread keeping things alive */
46 static pthread_t keepalive_thread = AST_PTHREADT_NULL;
47
48 /*! \brief The global interval at which to send keepalives */
49 static unsigned int keepalive_interval;
50
51 /*! \brief Structure for transport to be monitored */
52 struct monitored_transport {
53         /*! \brief The underlying PJSIP transport */
54         pjsip_transport *transport;
55         /*! \brief Non-zero if a PJSIP request was received */
56         int sip_received;
57 };
58
59 static void keepalive_transport_send_keepalive(struct monitored_transport *monitored)
60 {
61         pjsip_tpselector selector = {
62                 .type = PJSIP_TPSELECTOR_TRANSPORT,
63                 .u.transport = monitored->transport,
64         };
65
66         pjsip_tpmgr_send_raw(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()),
67                 monitored->transport->key.type,
68                 &selector,
69                 NULL,
70                 keepalive_packet.ptr,
71                 keepalive_packet.slen,
72                 &monitored->transport->key.rem_addr,
73                 pj_sockaddr_get_len(&monitored->transport->key.rem_addr),
74                 NULL, NULL);
75 }
76
77 /*! \brief Thread which sends keepalives to all active connection-oriented transports */
78 static void *keepalive_transport_thread(void *data)
79 {
80         struct ao2_container *transports;
81         pj_thread_desc desc;
82         pj_thread_t *thread;
83
84         if (pj_thread_register("Asterisk Keepalive Thread", desc, &thread) != PJ_SUCCESS) {
85                 ast_log(LOG_ERROR, "Could not register keepalive thread with PJLIB, keepalives will not occur.\n");
86                 return NULL;
87         }
88
89         transports = ao2_global_obj_ref(monitored_transports);
90         if (!transports) {
91                 return NULL;
92         }
93
94         /*
95          * Once loaded this module just keeps on going as it is unsafe to stop
96          * and change the underlying callback for the transport manager.
97          */
98         while (keepalive_interval) {
99                 struct ao2_iterator iter;
100                 struct monitored_transport *monitored;
101
102                 sleep(keepalive_interval);
103
104                 /*
105                  * We must use the iterator to avoid deadlock between the container lock
106                  * and the pjproject transport manager group lock when sending
107                  * the keepalive packet.
108                  */
109                 iter = ao2_iterator_init(transports, 0);
110                 for (; (monitored = ao2_iterator_next(&iter)); ao2_ref(monitored, -1)) {
111                         keepalive_transport_send_keepalive(monitored);
112                 }
113                 ao2_iterator_destroy(&iter);
114         }
115
116         ao2_ref(transports, -1);
117         return NULL;
118 }
119
120 AST_THREADSTORAGE(desc_storage);
121
122 static int idle_sched_init_pj_thread(void)
123 {
124         if (!pj_thread_is_registered()) {
125                 pj_thread_t *thread;
126                 pj_thread_desc *desc;
127
128                 desc = ast_threadstorage_get(&desc_storage, sizeof(pj_thread_desc));
129                 if (!desc) {
130                         ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage.\n");
131                         return -1;
132                 }
133
134                 pj_bzero(*desc, sizeof(*desc));
135
136                 pj_thread_register("Transport Monitor", *desc, &thread);
137         }
138
139         return 0;
140 }
141
142 static int idle_sched_cb(const void *data)
143 {
144         struct monitored_transport *monitored = (struct monitored_transport *) data;
145
146         if (idle_sched_init_pj_thread()) {
147                 ao2_ref(monitored, -1);
148                 return 0;
149         }
150
151         if (!monitored->sip_received) {
152                 ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
153                         monitored->transport->info, IDLE_TIMEOUT / 1000);
154                 pjsip_transport_shutdown(monitored->transport);
155         }
156
157         ao2_ref(monitored, -1);
158         return 0;
159 }
160
161 static int idle_sched_cleanup(const void *data)
162 {
163         struct monitored_transport *monitored = (struct monitored_transport *) data;
164
165         if (!idle_sched_init_pj_thread()) {
166                 pjsip_transport_shutdown(monitored->transport);
167         }
168         ao2_ref(monitored, -1);
169
170         return 0;
171 }
172
173 /*! \brief Destructor for keepalive transport */
174 static void monitored_transport_destroy(void *obj)
175 {
176         struct monitored_transport *monitored = obj;
177
178         pjsip_transport_dec_ref(monitored->transport);
179 }
180
181 /*! \brief Callback invoked when transport changes occur */
182 static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
183         const pjsip_transport_state_info *info)
184 {
185         struct ao2_container *transports;
186
187         /* We only care about reliable transports */
188         if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
189                 && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
190                 && (transports = ao2_global_obj_ref(monitored_transports))) {
191                 struct monitored_transport *monitored;
192
193                 switch (state) {
194                 case PJSIP_TP_STATE_CONNECTED:
195                         monitored = ao2_alloc_options(sizeof(*monitored),
196                                 monitored_transport_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
197                         if (!monitored) {
198                                 break;
199                         }
200                         monitored->transport = transport;
201                         pjsip_transport_add_ref(monitored->transport);
202
203                         ao2_link(transports, monitored);
204
205                         if (transport->dir == PJSIP_TP_DIR_INCOMING) {
206                                 /* Let the scheduler inherit the reference from allocation */
207                                 if (ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, monitored, 1) < 0) {
208                                         /* Uh Oh.  Could not schedule the idle check.  Kill the transport. */
209                                         pjsip_transport_shutdown(transport);
210                                 } else {
211                                         /* monitored ref successfully passed to idle_sched_cb() */
212                                         break;
213                                 }
214                         }
215                         ao2_ref(monitored, -1);
216                         break;
217                 case PJSIP_TP_STATE_SHUTDOWN:
218                 case PJSIP_TP_STATE_DISCONNECTED:
219                         ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
220                         break;
221                 default:
222                         break;
223                 }
224
225                 ao2_ref(transports, -1);
226         }
227 }
228
229 struct ast_sip_tpmgr_state_callback monitored_transport_reg = {
230         monitored_transport_state_callback,
231 };
232
233 /*! \brief Hashing function for monitored transport */
234 static int monitored_transport_hash_fn(const void *obj, int flags)
235 {
236         const struct monitored_transport *object;
237         const char *key;
238
239         switch (flags & OBJ_SEARCH_MASK) {
240         case OBJ_SEARCH_KEY:
241                 key = obj;
242                 break;
243         case OBJ_SEARCH_OBJECT:
244                 object = obj;
245                 key = object->transport->obj_name;
246                 break;
247         default:
248                 /* Hash can only work on something with a full key. */
249                 ast_assert(0);
250                 return 0;
251         }
252         return ast_str_hash(key);
253 }
254
255 /*! \brief Comparison function for monitored transport */
256 static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
257 {
258         const struct monitored_transport *object_left = obj;
259         const struct monitored_transport *object_right = arg;
260         const char *right_key = arg;
261         int cmp;
262
263         switch (flags & OBJ_SEARCH_MASK) {
264         case OBJ_SEARCH_OBJECT:
265                 right_key = object_right->transport->obj_name;
266                 /* Fall through */
267         case OBJ_SEARCH_KEY:
268                 cmp = strcmp(object_left->transport->obj_name, right_key);
269                 break;
270         case OBJ_SEARCH_PARTIAL_KEY:
271                 /*
272                  * We could also use a partial key struct containing a length
273                  * so strlen() does not get called for every comparison instead.
274                  */
275                 cmp = strncmp(object_left->transport->obj_name, right_key, strlen(right_key));
276                 break;
277         default:
278                 /*
279                  * What arg points to is specific to this traversal callback
280                  * and has no special meaning to astobj2.
281                  */
282                 cmp = 0;
283                 break;
284         }
285
286         return !cmp ? CMP_MATCH : 0;
287 }
288
289 static void keepalive_global_loaded(const char *object_type)
290 {
291         unsigned int new_interval = ast_sip_get_keep_alive_interval();
292
293         if (new_interval) {
294                 keepalive_interval = new_interval;
295         } else if (keepalive_interval) {
296                 ast_log(LOG_NOTICE, "Keepalive support can not be disabled once activated.\n");
297                 return;
298         } else {
299                 /* This will occur if no keepalive interval has been specified at initial start */
300                 return;
301         }
302
303         if (keepalive_thread != AST_PTHREADT_NULL) {
304                 return;
305         }
306
307         if (ast_pthread_create(&keepalive_thread, NULL, keepalive_transport_thread, NULL)) {
308                 ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
309                 keepalive_thread = AST_PTHREADT_NULL;
310                 keepalive_interval = 0;
311         }
312 }
313
314 /*! \brief Observer which is used to update our interval when the global setting changes */
315 static struct ast_sorcery_observer keepalive_global_observer = {
316         .loaded = keepalive_global_loaded,
317 };
318
319 /*!
320  * \brief
321  * On incoming TCP connections, when we receive a SIP request, we mark that we have
322  * received a valid SIP request. This way, we will not shut the transport down for
323  * idleness
324  */
325 static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
326 {
327         struct ao2_container *transports;
328         struct monitored_transport *idle_trans;
329
330         transports = ao2_global_obj_ref(monitored_transports);
331         if (!transports) {
332                 return PJ_FALSE;
333         }
334
335         idle_trans = ao2_find(transports, rdata->tp_info.transport->obj_name, OBJ_SEARCH_KEY);
336         ao2_ref(transports, -1);
337         if (!idle_trans) {
338                 return PJ_FALSE;
339         }
340
341         idle_trans->sip_received = 1;
342         ao2_ref(idle_trans, -1);
343
344         return PJ_FALSE;
345 }
346
347 static pjsip_module idle_monitor_module = {
348         .name = {"idle monitor module", 19},
349         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER + 3,
350         .on_rx_request = idle_monitor_on_rx_request,
351 };
352
353 int ast_sip_initialize_transport_management(void)
354 {
355         struct ao2_container *transports;
356
357         transports = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TRANSPORTS_BUCKETS,
358                 monitored_transport_hash_fn, NULL, monitored_transport_cmp_fn);
359         if (!transports) {
360                 ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
361                 return AST_MODULE_LOAD_DECLINE;
362         }
363         ao2_global_obj_replace_unref(monitored_transports, transports);
364         ao2_ref(transports, -1);
365
366         sched = ast_sched_context_create();
367         if (!sched) {
368                 ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
369                 ao2_global_obj_release(monitored_transports);
370                 return AST_MODULE_LOAD_DECLINE;
371         }
372
373         if (ast_sched_start_thread(sched)) {
374                 ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
375                 ast_sched_context_destroy(sched);
376                 sched = NULL;
377                 ao2_global_obj_release(monitored_transports);
378                 return AST_MODULE_LOAD_DECLINE;
379         }
380
381         ast_sip_register_service(&idle_monitor_module);
382
383         ast_sip_transport_state_register(&monitored_transport_reg);
384
385         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
386         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
387
388         return AST_MODULE_LOAD_SUCCESS;
389 }
390
391 void ast_sip_destroy_transport_management(void)
392 {
393         if (keepalive_interval) {
394                 keepalive_interval = 0;
395                 if (keepalive_thread != AST_PTHREADT_NULL) {
396                         pthread_kill(keepalive_thread, SIGURG);
397                         pthread_join(keepalive_thread, NULL);
398                         keepalive_thread = AST_PTHREADT_NULL;
399                 }
400         }
401
402         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
403
404         ast_sip_transport_state_unregister(&monitored_transport_reg);
405
406         ast_sip_unregister_service(&idle_monitor_module);
407
408         ast_sched_clean_by_callback(sched, idle_sched_cb, idle_sched_cleanup);
409         ast_sched_context_destroy(sched);
410         sched = NULL;
411
412         ao2_global_obj_release(monitored_transports);
413 }