res_pjsip/pjsip_transport_management.c: Fix deadlock with transport keep alive.
[asterisk/asterisk.git] / res / res_pjsip / pjsip_transport_management.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2014, Digium, Inc.
5  *
6  * Joshua Colp <jcolp@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 #include "asterisk.h"
20
21 #include <signal.h>
22
23 #include <pjsip.h>
24 #include <pjsip_ua.h>
25
26 #include "asterisk/res_pjsip.h"
27 #include "asterisk/module.h"
28 #include "asterisk/astobj2.h"
29 #include "include/res_pjsip_private.h"
30
31 /*! \brief Number of buckets for monitored transports */
32 #define TRANSPORTS_BUCKETS 127
33
34 #define IDLE_TIMEOUT (pjsip_cfg()->tsx.td)
35
36 /*! \brief The keep alive packet to send */
37 static const pj_str_t keepalive_packet = { "\r\n\r\n", 4 };
38
39 /*! \brief Global container of active transports */
40 static AO2_GLOBAL_OBJ_STATIC(monitored_transports);
41
42 /*! \brief Scheduler context for timing out connections with no data received */
43 static struct ast_sched_context *sched;
44
45 /*! \brief Thread keeping things alive */
46 static pthread_t keepalive_thread = AST_PTHREADT_NULL;
47
48 /*! \brief The global interval at which to send keepalives */
49 static unsigned int keepalive_interval;
50
51 /*! \brief Structure for transport to be monitored */
52 struct monitored_transport {
53         /*! \brief The underlying PJSIP transport */
54         pjsip_transport *transport;
55         /*! \brief Non-zero if a PJSIP request was received */
56         int sip_received;
57 };
58
59 static void keepalive_transport_send_keepalive(struct monitored_transport *monitored)
60 {
61         pjsip_tpselector selector = {
62                 .type = PJSIP_TPSELECTOR_TRANSPORT,
63                 .u.transport = monitored->transport,
64         };
65
66         pjsip_tpmgr_send_raw(pjsip_endpt_get_tpmgr(ast_sip_get_pjsip_endpoint()),
67                 monitored->transport->key.type,
68                 &selector,
69                 NULL,
70                 keepalive_packet.ptr,
71                 keepalive_packet.slen,
72                 &monitored->transport->key.rem_addr,
73                 pj_sockaddr_get_len(&monitored->transport->key.rem_addr),
74                 NULL, NULL);
75 }
76
77 /*! \brief Thread which sends keepalives to all active connection-oriented transports */
78 static void *keepalive_transport_thread(void *data)
79 {
80         struct ao2_container *transports;
81         pj_thread_desc desc;
82         pj_thread_t *thread;
83
84         if (pj_thread_register("Asterisk Keepalive Thread", desc, &thread) != PJ_SUCCESS) {
85                 ast_log(LOG_ERROR, "Could not register keepalive thread with PJLIB, keepalives will not occur.\n");
86                 return NULL;
87         }
88
89         transports = ao2_global_obj_ref(monitored_transports);
90         if (!transports) {
91                 return NULL;
92         }
93
94         /*
95          * Once loaded this module just keeps on going as it is unsafe to stop
96          * and change the underlying callback for the transport manager.
97          */
98         while (keepalive_interval) {
99                 struct ao2_iterator iter;
100                 struct monitored_transport *monitored;
101
102                 sleep(keepalive_interval);
103
104                 /*
105                  * We must use the iterator to avoid deadlock between the container lock
106                  * and the pjproject transport manager group lock when sending
107                  * the keepalive packet.
108                  */
109                 iter = ao2_iterator_init(transports, 0);
110                 for (; (monitored = ao2_iterator_next(&iter)); ao2_ref(monitored, -1)) {
111                         keepalive_transport_send_keepalive(monitored);
112                 }
113                 ao2_iterator_destroy(&iter);
114         }
115
116         ao2_ref(transports, -1);
117         return NULL;
118 }
119
120 AST_THREADSTORAGE(desc_storage);
121
122 static int idle_sched_cb(const void *data)
123 {
124         struct monitored_transport *monitored = (struct monitored_transport *) data;
125
126         if (!pj_thread_is_registered()) {
127                 pj_thread_t *thread;
128                 pj_thread_desc *desc;
129
130                 desc = ast_threadstorage_get(&desc_storage, sizeof(pj_thread_desc));
131                 if (!desc) {
132                         ast_log(LOG_ERROR, "Could not get thread desc from thread-local storage.\n");
133                         ao2_ref(monitored, -1);
134                         return 0;
135                 }
136
137                 pj_bzero(*desc, sizeof(*desc));
138
139                 pj_thread_register("Transport Monitor", *desc, &thread);
140         }
141
142         if (!monitored->sip_received) {
143                 ast_log(LOG_NOTICE, "Shutting down transport '%s' since no request was received in %d seconds\n",
144                         monitored->transport->info, IDLE_TIMEOUT / 1000);
145                 pjsip_transport_shutdown(monitored->transport);
146         }
147
148         ao2_ref(monitored, -1);
149         return 0;
150 }
151
152 /*! \brief Destructor for keepalive transport */
153 static void monitored_transport_destroy(void *obj)
154 {
155         struct monitored_transport *monitored = obj;
156
157         pjsip_transport_dec_ref(monitored->transport);
158 }
159
160 /*! \brief Callback invoked when transport changes occur */
161 static void monitored_transport_state_callback(pjsip_transport *transport, pjsip_transport_state state,
162         const pjsip_transport_state_info *info)
163 {
164         struct ao2_container *transports;
165
166         /* We only care about reliable transports */
167         if (PJSIP_TRANSPORT_IS_RELIABLE(transport)
168                 && (transport->dir == PJSIP_TP_DIR_INCOMING || keepalive_interval)
169                 && (transports = ao2_global_obj_ref(monitored_transports))) {
170                 struct monitored_transport *monitored;
171
172                 switch (state) {
173                 case PJSIP_TP_STATE_CONNECTED:
174                         monitored = ao2_alloc_options(sizeof(*monitored),
175                                 monitored_transport_destroy, AO2_ALLOC_OPT_LOCK_NOLOCK);
176                         if (!monitored) {
177                                 break;
178                         }
179                         monitored->transport = transport;
180                         pjsip_transport_add_ref(monitored->transport);
181
182                         ao2_link(transports, monitored);
183
184                         if (transport->dir == PJSIP_TP_DIR_INCOMING) {
185                                 /* Let the scheduler inherit the reference from allocation */
186                                 if (ast_sched_add_variable(sched, IDLE_TIMEOUT, idle_sched_cb, monitored, 1) < 0) {
187                                         /* Uh Oh.  Could not schedule the idle check.  Kill the transport. */
188                                         pjsip_transport_shutdown(transport);
189                                 } else {
190                                         /* monitored ref successfully passed to idle_sched_cb() */
191                                         break;
192                                 }
193                         }
194                         ao2_ref(monitored, -1);
195                         break;
196                 case PJSIP_TP_STATE_SHUTDOWN:
197                 case PJSIP_TP_STATE_DISCONNECTED:
198                         ao2_find(transports, transport->obj_name, OBJ_SEARCH_KEY | OBJ_NODATA | OBJ_UNLINK);
199                         break;
200                 default:
201                         break;
202                 }
203
204                 ao2_ref(transports, -1);
205         }
206 }
207
208 struct ast_sip_tpmgr_state_callback monitored_transport_reg = {
209         monitored_transport_state_callback,
210 };
211
212 /*! \brief Hashing function for monitored transport */
213 static int monitored_transport_hash_fn(const void *obj, int flags)
214 {
215         const struct monitored_transport *object;
216         const char *key;
217
218         switch (flags & OBJ_SEARCH_MASK) {
219         case OBJ_SEARCH_KEY:
220                 key = obj;
221                 break;
222         case OBJ_SEARCH_OBJECT:
223                 object = obj;
224                 key = object->transport->obj_name;
225                 break;
226         default:
227                 /* Hash can only work on something with a full key. */
228                 ast_assert(0);
229                 return 0;
230         }
231         return ast_str_hash(key);
232 }
233
234 /*! \brief Comparison function for monitored transport */
235 static int monitored_transport_cmp_fn(void *obj, void *arg, int flags)
236 {
237         const struct monitored_transport *object_left = obj;
238         const struct monitored_transport *object_right = arg;
239         const char *right_key = arg;
240         int cmp;
241
242         switch (flags & OBJ_SEARCH_MASK) {
243         case OBJ_SEARCH_OBJECT:
244                 right_key = object_right->transport->obj_name;
245                 /* Fall through */
246         case OBJ_SEARCH_KEY:
247                 cmp = strcmp(object_left->transport->obj_name, right_key);
248                 break;
249         case OBJ_SEARCH_PARTIAL_KEY:
250                 /*
251                  * We could also use a partial key struct containing a length
252                  * so strlen() does not get called for every comparison instead.
253                  */
254                 cmp = strncmp(object_left->transport->obj_name, right_key, strlen(right_key));
255                 break;
256         default:
257                 /*
258                  * What arg points to is specific to this traversal callback
259                  * and has no special meaning to astobj2.
260                  */
261                 cmp = 0;
262                 break;
263         }
264
265         return !cmp ? CMP_MATCH : 0;
266 }
267
268 static void keepalive_global_loaded(const char *object_type)
269 {
270         unsigned int new_interval = ast_sip_get_keep_alive_interval();
271
272         if (new_interval) {
273                 keepalive_interval = new_interval;
274         } else if (keepalive_interval) {
275                 ast_log(LOG_NOTICE, "Keepalive support can not be disabled once activated.\n");
276                 return;
277         } else {
278                 /* This will occur if no keepalive interval has been specified at initial start */
279                 return;
280         }
281
282         if (keepalive_thread != AST_PTHREADT_NULL) {
283                 return;
284         }
285
286         if (ast_pthread_create(&keepalive_thread, NULL, keepalive_transport_thread, NULL)) {
287                 ast_log(LOG_ERROR, "Could not create thread for sending keepalive messages.\n");
288                 keepalive_thread = AST_PTHREADT_NULL;
289                 keepalive_interval = 0;
290         }
291 }
292
293 /*! \brief Observer which is used to update our interval when the global setting changes */
294 static struct ast_sorcery_observer keepalive_global_observer = {
295         .loaded = keepalive_global_loaded,
296 };
297
298 /*!
299  * \brief
300  * On incoming TCP connections, when we receive a SIP request, we mark that we have
301  * received a valid SIP request. This way, we will not shut the transport down for
302  * idleness
303  */
304 static pj_bool_t idle_monitor_on_rx_request(pjsip_rx_data *rdata)
305 {
306         struct ao2_container *transports;
307         struct monitored_transport *idle_trans;
308
309         transports = ao2_global_obj_ref(monitored_transports);
310         if (!transports) {
311                 return PJ_FALSE;
312         }
313
314         idle_trans = ao2_find(transports, rdata->tp_info.transport->obj_name, OBJ_SEARCH_KEY);
315         ao2_ref(transports, -1);
316         if (!idle_trans) {
317                 return PJ_FALSE;
318         }
319
320         idle_trans->sip_received = 1;
321         ao2_ref(idle_trans, -1);
322
323         return PJ_FALSE;
324 }
325
326 static pjsip_module idle_monitor_module = {
327         .name = {"idle monitor module", 19},
328         .priority = PJSIP_MOD_PRIORITY_TRANSPORT_LAYER + 3,
329         .on_rx_request = idle_monitor_on_rx_request,
330 };
331
332 int ast_sip_initialize_transport_management(void)
333 {
334         struct ao2_container *transports;
335
336         transports = ao2_container_alloc(TRANSPORTS_BUCKETS, monitored_transport_hash_fn,
337                 monitored_transport_cmp_fn);
338         if (!transports) {
339                 ast_log(LOG_ERROR, "Could not create container for transports to perform keepalive on.\n");
340                 return AST_MODULE_LOAD_DECLINE;
341         }
342         ao2_global_obj_replace_unref(monitored_transports, transports);
343         ao2_ref(transports, -1);
344
345         sched = ast_sched_context_create();
346         if (!sched) {
347                 ast_log(LOG_ERROR, "Failed to create keepalive scheduler context.\n");
348                 ao2_global_obj_release(monitored_transports);
349                 return AST_MODULE_LOAD_DECLINE;
350         }
351
352         if (ast_sched_start_thread(sched)) {
353                 ast_log(LOG_ERROR, "Failed to start keepalive scheduler thread\n");
354                 ast_sched_context_destroy(sched);
355                 sched = NULL;
356                 ao2_global_obj_release(monitored_transports);
357                 return AST_MODULE_LOAD_DECLINE;
358         }
359
360         ast_sip_register_service(&idle_monitor_module);
361
362         ast_sip_transport_state_register(&monitored_transport_reg);
363
364         ast_sorcery_observer_add(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
365         ast_sorcery_reload_object(ast_sip_get_sorcery(), "global");
366
367         return AST_MODULE_LOAD_SUCCESS;
368 }
369
370 void ast_sip_destroy_transport_management(void)
371 {
372         if (keepalive_interval) {
373                 keepalive_interval = 0;
374                 if (keepalive_thread != AST_PTHREADT_NULL) {
375                         pthread_kill(keepalive_thread, SIGURG);
376                         pthread_join(keepalive_thread, NULL);
377                         keepalive_thread = AST_PTHREADT_NULL;
378                 }
379         }
380
381         ast_sorcery_observer_remove(ast_sip_get_sorcery(), "global", &keepalive_global_observer);
382
383         ast_sip_transport_state_unregister(&monitored_transport_reg);
384
385         ast_sip_unregister_service(&idle_monitor_module);
386
387         ast_sched_context_destroy(sched);
388         sched = NULL;
389
390         ao2_global_obj_release(monitored_transports);
391 }