channels: Allow updating variable value
[asterisk/asterisk.git] / res / res_pjsip / pjsip_transport_management.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <signal.h>
22
23 #include <pjsip.h>
24 #include <pjsip_ua.h>
25
26 #include "asterisk/res_pjsip.h"
27 #include "asterisk/module.h"
28 #include "asterisk/astobj2.h"
29 #include "include/res_pjsip_private.h"
30
31 /*! \brief Number of buckets for monitored transports */
32 #define TRANSPORTS_BUCKETS 127
33
34 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
35
36 /*! \brief The keep alive packet to send */
37 static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
38
39 /*! \brief Global container of active transports */
40 static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
41
42 /*! \brief Scheduler context for timing out connections with no data received */
43 static struct ast_sched_context *sched;
44
45 /*! \brief Thread keeping things alive */
46 static pthread_t keepalive_thread = AST_PTHREADT_NULL;
47
48 /*! \brief The global interval at which to send keepalives */
49 static unsigned int keepalive_interval;
50
51 /*! \brief Structure for transport to be monitored */
52 struct monitored_transport {
53         /*! \brief The underlying PJSIP transport */
54         pjsip_transport *transport;
55         /*! \brief Non-zero if a PJSIP request was received */
56         int sip_received;
57 };
58
59 static void keepalive_transport_send_keepalive(struct monitored_transport *monitored)
60 {
61         pjsip_tpselector selector = {
62                 .type = PJSIP_TPSELECTOR_TRANSPORT,
63                 .u.transport = monitored->transport,
64         };
65
66         pjsip_tpmgr_send_raw(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()),
67                 monitored->transport->key.type,
68                 &selector,
69                 NULL,
70                 keepalive_packet.ptr,
71                 keepalive_packet.slen,
72                 &monitored->transport->key.rem_addr,
73                 pj_sockaddr_get_len(&monitored->transport->key.rem_addr),
74                 NULL, NULL);
75 }
76
77 /*! \brief Thread which sends keepalives to all active connection-oriented transports */
78 static void *keepalive_transport_thread(void *data)
79 {
80         struct ao2_container *transports;
81         pj_thread_desc desc;
82         pj_thread_t *thread;
83
84         if (pj_thread_register("Asterisk Keepalive Thread", desc, &thread) != PJ_SUCCESS) {
85                 ast_log(LOG_ERROR, "Could not register keepalive thread with PJLIB, keepalives will not occur.\n");
86                 return NULL;
87         }
88
89         transports = ao2_global_obj_ref(monitored_transports);
90         if (!transports) {
91                 return NULL;
92         }
93
94         /*
95          * Once loaded this module just keeps on going as it is unsafe to stop
96          * and change the underlying callback for the transport manager.
97          */
98         while (keepalive_interval) {
99                 struct ao2_iterator iter;
100                 struct monitored_transport *monitored;
101
102                 sleep(keepalive_interval);
103
104                 /*
105                  * We must use the iterator to avoid deadlock between the container lock
106                  * and the pjproject transport manager group lock when sending
107                  * the keepalive packet.
108                  */
109                 iter = ao2_iterator_init(transports, 0);
110                 for (; (monitored = ao2_iterator_next(&iter)); ao2_ref(monitored, -1)) {
111                         keepalive_transport_send_keepalive(monitored);
112                 }
113                 ao2_iterator_destroy(&iter);
114         }
115
116         ao2_ref(transports, -1);
117         return NULL;
118 }
119
120 AST_THREADSTORAGE(desc_storage);
121
122 static int idle_sched_init_pj_thread(void)
123 {
124         if (!pj_thread_is_registered()) {
125                 pj_thread_t *thread;
126                 pj_thread_desc *desc;
127
128                 desc = ast_threadstorage_get(&desc_storage, sizeof(pj_thread_desc));
129                 if (!desc) {
130                         ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage.\n");
131                         return -1;
132                 }
133
134                 pj_bzero(*desc, sizeof(*desc));
135
136                 pj_thread_register("Transport Monitor", *desc, &thread);
137         }
138
139         return 0;
140 }
141
142 static struct monitored_transport *get_monitored_transport_by_name(const char *obj_name)
143 {
144         struct ao2_container *transports;
145         struct monitored_transport *monitored = NULL;
146
147         transports = ao2_global_obj_ref(monitored_transports);
148         if (transports) {
149                 monitored = ao2_find(transports, obj_name, OBJ_SEARCH_KEY);
150         }
151         ao2_cleanup(transports);
152
153         /* Caller is responsible for cleaning up reference */
154         return monitored;
155 }
156
157 static int idle_sched_cb(const void *data)
158 {
159         char *obj_name = (char *) data;
160         struct monitored_transport *monitored;
161
162         if (idle_sched_init_pj_thread()) {
163                 ast_free(obj_name);
164                 return 0;
165         }
166
167         monitored = get_monitored_transport_by_name(obj_name);
168         if (monitored) {
169                 if (!monitored->sip_received) {
170                         ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
171                                 monitored->transport->info, IDLE_TIMEOUT / 1000);
172                         pjsip_transport_shutdown(monitored->transport);
173                 }
174                 ao2_ref(monitored, -1);
175         }
176
177         ast_free(obj_name);
178         return 0;
179 }
180
181 static int idle_sched_cleanup(const void *data)
182 {
183         char *obj_name = (char *) data;
184         struct monitored_transport *monitored;
185
186         if (idle_sched_init_pj_thread()) {
187                 ast_free(obj_name);
188                 return 0;
189         }
190
191         monitored = get_monitored_transport_by_name(obj_name);
192         if (monitored) {
193                 pjsip_transport_shutdown(monitored->transport);
194                 ao2_ref(monitored, -1);
195         }
196
197         ast_free(obj_name);
198         return 0;
199 }
200
201 /*! \brief Destructor for keepalive transport */
202 static void monitored_transport_destroy(void *obj)
203 {
204         struct monitored_transport *monitored = obj;
205
206         pjsip_transport_dec_ref(monitored->transport);
207 }
208
209 /*! \brief Callback invoked when transport changes occur */
210 static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
211         const pjsip_transport_state_info *info)
212 {
213         struct ao2_container *transports;
214
215         /* We only care about reliable transports */
216         if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
217                 && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
218                 && (transports = ao2_global_obj_ref(monitored_transports))) {
219                 struct monitored_transport *monitored;
220
221                 switch (state) {
222                 case PJSIP_TP_STATE_CONNECTED:
223                         monitored = ao2_alloc_options(sizeof(*monitored),
224                                 monitored_transport_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
225                         if (!monitored) {
226                                 break;
227                         }
228                         monitored->transport = transport;
229                         pjsip_transport_add_ref(monitored->transport);
230
231                         ao2_link(transports, monitored);
232
233                         if (transport->dir == PJSIP_TP_DIR_INCOMING) {
234                                 char *obj_name = ast_strdup(transport->obj_name);
235
236                                 if (!obj_name
237                                    || ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, obj_name, 1) < 0) {
238                                         /* Shut down the transport if anything fails */
239                                         pjsip_transport_shutdown(transport);
240                                         ast_free(obj_name);
241                                 }
242                         }
243                         ao2_ref(monitored, -1);
244                         break;
245                 case PJSIP_TP_STATE_SHUTDOWN:
246                 case PJSIP_TP_STATE_DISCONNECTED:
247                         ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
248                         break;
249                 default:
250                         break;
251                 }
252
253                 ao2_ref(transports, -1);
254         }
255 }
256
257 struct ast_sip_tpmgr_state_callback monitored_transport_reg = {
258         monitored_transport_state_callback,
259 };
260
261 /*! \brief Hashing function for monitored transport */
262 static int monitored_transport_hash_fn(const void *obj, int flags)
263 {
264         const struct monitored_transport *object;
265         const char *key;
266
267         switch (flags & OBJ_SEARCH_MASK) {
268         case OBJ_SEARCH_KEY:
269                 key = obj;
270                 break;
271         case OBJ_SEARCH_OBJECT:
272                 object = obj;
273                 key = object->transport->obj_name;
274                 break;
275         default:
276                 /* Hash can only work on something with a full key. */
277                 ast_assert(0);
278                 return 0;
279         }
280         return ast_str_hash(key);
281 }
282
283 /*! \brief Comparison function for monitored transport */
284 static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
285 {
286         const struct monitored_transport *object_left = obj;
287         const struct monitored_transport *object_right = arg;
288         const char *right_key = arg;
289         int cmp;
290
291         switch (flags & OBJ_SEARCH_MASK) {
292         case OBJ_SEARCH_OBJECT:
293                 right_key = object_right->transport->obj_name;
294                 /* Fall through */
295         case OBJ_SEARCH_KEY:
296                 cmp = strcmp(object_left->transport->obj_name, right_key);
297                 break;
298         case OBJ_SEARCH_PARTIAL_KEY:
299                 /*
300                  * We could also use a partial key struct containing a length
301                  * so strlen() does not get called for every comparison instead.
302                  */
303                 cmp = strncmp(object_left->transport->obj_name, right_key, strlen(right_key));
304                 break;
305         default:
306                 /*
307                  * What arg points to is specific to this traversal callback
308                  * and has no special meaning to astobj2.
309                  */
310                 cmp = 0;
311                 break;
312         }
313
314         return !cmp ? CMP_MATCH : 0;
315 }
316
317 static void keepalive_global_loaded(const char *object_type)
318 {
319         unsigned int new_interval = ast_sip_get_keep_alive_interval();
320
321         if (new_interval) {
322                 keepalive_interval = new_interval;
323         } else if (keepalive_interval) {
324                 ast_log(LOG_NOTICE, "Keepalive support can not be disabled once activated.\n");
325                 return;
326         } else {
327                 /* This will occur if no keepalive interval has been specified at initial start */
328                 return;
329         }
330
331         if (keepalive_thread != AST_PTHREADT_NULL) {
332                 return;
333         }
334
335         if (ast_pthread_create(&keepalive_thread, NULL, keepalive_transport_thread, NULL)) {
336                 ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
337                 keepalive_thread = AST_PTHREADT_NULL;
338                 keepalive_interval = 0;
339         }
340 }
341
342 /*! \brief Observer which is used to update our interval when the global setting changes */
343 static struct ast_sorcery_observer keepalive_global_observer = {
344         .loaded = keepalive_global_loaded,
345 };
346
347 /*!
348  * \brief
349  * On incoming TCP connections, when we receive a SIP request, we mark that we have
350  * received a valid SIP request. This way, we will not shut the transport down for
351  * idleness
352  */
353 static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
354 {
355         struct monitored_transport *idle_trans;
356
357         idle_trans = get_monitored_transport_by_name(rdata->tp_info.transport->obj_name);
358         if (idle_trans) {
359                 idle_trans->sip_received = 1;
360                 ao2_ref(idle_trans, -1);
361         }
362
363         return PJ_FALSE;
364 }
365
366 static pjsip_module idle_monitor_module = {
367         .name = {"idle monitor module", 19},
368         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER + 3,
369         .on_rx_request = idle_monitor_on_rx_request,
370 };
371
372 int ast_sip_initialize_transport_management(void)
373 {
374         struct ao2_container *transports;
375
376         transports = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, TRANSPORTS_BUCKETS,
377                 monitored_transport_hash_fn, NULL, monitored_transport_cmp_fn);
378         if (!transports) {
379                 ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
380                 return AST_MODULE_LOAD_DECLINE;
381         }
382         ao2_global_obj_replace_unref(monitored_transports, transports);
383         ao2_ref(transports, -1);
384
385         sched = ast_sched_context_create();
386         if (!sched) {
387                 ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
388                 ao2_global_obj_release(monitored_transports);
389                 return AST_MODULE_LOAD_DECLINE;
390         }
391
392         if (ast_sched_start_thread(sched)) {
393                 ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
394                 ast_sched_context_destroy(sched);
395                 sched = NULL;
396                 ao2_global_obj_release(monitored_transports);
397                 return AST_MODULE_LOAD_DECLINE;
398         }
399
400         ast_sip_register_service(&idle_monitor_module);
401
402         ast_sip_transport_state_register(&monitored_transport_reg);
403
404         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
405         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
406
407         return AST_MODULE_LOAD_SUCCESS;
408 }
409
410 void ast_sip_destroy_transport_management(void)
411 {
412         if (keepalive_interval) {
413                 keepalive_interval = 0;
414                 if (keepalive_thread != AST_PTHREADT_NULL) {
415                         pthread_kill(keepalive_thread, SIGURG);
416                         pthread_join(keepalive_thread, NULL);
417                         keepalive_thread = AST_PTHREADT_NULL;
418                 }
419         }
420
421         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
422
423         ast_sip_transport_state_unregister(&monitored_transport_reg);
424
425         ast_sip_unregister_service(&idle_monitor_module);
426
427         ast_sched_clean_by_callback(sched, idle_sched_cb, idle_sched_cleanup);
428         ast_sched_context_destroy(sched);
429         sched = NULL;
430
431         ao2_global_obj_release(monitored_transports);
432 }