Merge "asterisk.c: When astcanary dies on linux, reset priority on all threads."
[asterisk/asterisk.git] / main / endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Asterisk endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_REGISTER_FILE()
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/endpoints.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/stringfields.h"
41 #include "asterisk/_private.h"
42
43 /*! Buckets for endpoint->channel mappings. Keep it prime! */
44 #define ENDPOINT_CHANNEL_BUCKETS 127
45
46 /*! Buckets for endpoint hash. Keep it prime! */
47 #define ENDPOINT_BUCKETS 127
48
49 /*! Buckets for technology endpoints. */
50 #define TECH_ENDPOINT_BUCKETS 11
51
52 static struct ao2_container *endpoints;
53
54 static struct ao2_container *tech_endpoints;
55
56 struct ast_endpoint {
57         AST_DECLARE_STRING_FIELDS(
58                 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
59                 AST_STRING_FIELD(resource);     /*!< Name, unique to the tech. */
60                 AST_STRING_FIELD(id);   /*!< tech/resource id */
61                 );
62         /*! Endpoint's current state */
63         enum ast_endpoint_state state;
64         /*!
65          * \brief Max channels for this endpoint. -1 means unlimited or unknown.
66          *
67          * Note that this simply documents the limits of an endpoint, and does
68          * nothing to try to enforce the limit.
69          */
70         int max_channels;
71         /*! Topic for this endpoint's messages */
72         struct stasis_cp_single *topics;
73         /*! Router for handling this endpoint's messages */
74         struct stasis_message_router *router;
75         /*! ast_str_container of channels associated with this endpoint */
76         struct ao2_container *channel_ids;
77         /*! Forwarding subscription from an endpoint to its tech endpoint */
78         struct stasis_forward *tech_forward;
79 };
80
81 static int endpoint_hash(const void *obj, int flags)
82 {
83         const struct ast_endpoint *endpoint;
84         const char *key;
85
86         switch (flags & OBJ_SEARCH_MASK) {
87         case OBJ_SEARCH_KEY:
88                 key = obj;
89                 return ast_str_hash(key);
90         case OBJ_SEARCH_OBJECT:
91                 endpoint = obj;
92                 return ast_str_hash(endpoint->id);
93         default:
94                 /* Hash can only work on something with a full key. */
95                 ast_assert(0);
96                 return 0;
97         }
98 }
99
100 static int endpoint_cmp(void *obj, void *arg, int flags)
101 {
102         const struct ast_endpoint *left = obj;
103         const struct ast_endpoint *right = arg;
104         const char *right_key = arg;
105         int cmp;
106
107         switch (flags & OBJ_SEARCH_MASK) {
108         case OBJ_SEARCH_OBJECT:
109                 right_key = right->id;
110                 /* Fall through */
111         case OBJ_SEARCH_KEY:
112                 cmp = strcmp(left->id, right_key);
113                 break;
114         case OBJ_SEARCH_PARTIAL_KEY:
115                 cmp = strncmp(left->id, right_key, strlen(right_key));
116                 break;
117         default:
118                 ast_assert(0);
119                 cmp = 0;
120                 break;
121         }
122         if (cmp) {
123                 return 0;
124         }
125
126         return CMP_MATCH;
127 }
128
129 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
130 {
131         struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
132
133         if (!endpoint) {
134                 endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
135         }
136
137         return endpoint;
138 }
139
140 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
141 {
142         if (!endpoint) {
143                 return ast_endpoint_topic_all();
144         }
145         return stasis_cp_single_topic(endpoint->topics);
146 }
147
148 struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
149 {
150         if (!endpoint) {
151                 return ast_endpoint_topic_all_cached();
152         }
153         return stasis_cp_single_topic_cached(endpoint->topics);
154 }
155
156 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
157 {
158         switch (state) {
159         case AST_ENDPOINT_UNKNOWN:
160                 return "unknown";
161         case AST_ENDPOINT_OFFLINE:
162                 return "offline";
163         case AST_ENDPOINT_ONLINE:
164                 return "online";
165         }
166         return "?";
167 }
168
169 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
170 {
171         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
172         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
173
174         ast_assert(endpoint != NULL);
175         ast_assert(endpoint->topics != NULL);
176
177         if (!ast_endpoint_snapshot_type()) {
178                 return;
179         }
180
181         snapshot = ast_endpoint_snapshot_create(endpoint);
182         if (!snapshot) {
183                 return;
184         }
185         message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
186         if (!message) {
187                 return;
188         }
189         stasis_publish(ast_endpoint_topic(endpoint), message);
190 }
191
192 static void endpoint_dtor(void *obj)
193 {
194         struct ast_endpoint *endpoint = obj;
195
196         /* The router should be shut down already */
197         ast_assert(stasis_message_router_is_done(endpoint->router));
198         ao2_cleanup(endpoint->router);
199         endpoint->router = NULL;
200
201         stasis_cp_single_unsubscribe(endpoint->topics);
202         endpoint->topics = NULL;
203
204         ao2_cleanup(endpoint->channel_ids);
205         endpoint->channel_ids = NULL;
206
207         ast_string_field_free_memory(endpoint);
208 }
209
210
211 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
212         struct ast_channel *chan)
213 {
214         ast_assert(chan != NULL);
215         ast_assert(endpoint != NULL);
216         ast_assert(!ast_strlen_zero(endpoint->resource));
217
218         ast_channel_forward_endpoint(chan, endpoint);
219
220         ao2_lock(endpoint);
221         ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
222         ao2_unlock(endpoint);
223
224         endpoint_publish_snapshot(endpoint);
225
226         return 0;
227 }
228
229 /*! \brief Handler for channel snapshot cache clears */
230 static void endpoint_cache_clear(void *data,
231         struct stasis_subscription *sub,
232         struct stasis_message *message)
233 {
234         struct ast_endpoint *endpoint = data;
235         struct stasis_message *clear_msg = stasis_message_data(message);
236         struct ast_channel_snapshot *clear_snapshot;
237
238         if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
239                 return;
240         }
241
242         clear_snapshot = stasis_message_data(clear_msg);
243
244         ast_assert(endpoint != NULL);
245
246         ao2_lock(endpoint);
247         ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
248         ao2_unlock(endpoint);
249         endpoint_publish_snapshot(endpoint);
250 }
251
252 static void endpoint_default(void *data,
253         struct stasis_subscription *sub,
254         struct stasis_message *message)
255 {
256         struct stasis_endpoint *endpoint = data;
257
258         if (stasis_subscription_final_message(sub, message)) {
259                 ao2_cleanup(endpoint);
260         }
261 }
262
263 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
264 {
265         RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
266         RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
267         int r = 0;
268
269         /* Get/create the technology endpoint */
270         if (!ast_strlen_zero(resource)) {
271                 tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
272                 if (!tech_endpoint) {
273                         tech_endpoint = endpoint_internal_create(tech, NULL);
274                         if (!tech_endpoint) {
275                                 return NULL;
276                         }
277                 }
278         }
279
280         endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
281         if (!endpoint) {
282                 return NULL;
283         }
284
285         endpoint->max_channels = -1;
286         endpoint->state = AST_ENDPOINT_UNKNOWN;
287
288         if (ast_string_field_init(endpoint, 80) != 0) {
289                 return NULL;
290         }
291         ast_string_field_set(endpoint, tech, tech);
292         ast_string_field_set(endpoint, resource, S_OR(resource, ""));
293         ast_string_field_build(endpoint, id, "%s%s%s",
294                 tech,
295                 !ast_strlen_zero(resource) ? "/" : "",
296                 S_OR(resource, ""));
297
298         /* All access to channel_ids should be covered by the endpoint's
299          * lock; no extra lock needed. */
300         endpoint->channel_ids = ast_str_container_alloc_options(
301                 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
302         if (!endpoint->channel_ids) {
303                 return NULL;
304         }
305
306         if (!ast_strlen_zero(resource)) {
307
308                 endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
309                         endpoint->id);
310                 if (!endpoint->topics) {
311                         return NULL;
312                 }
313
314                 endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
315                 if (!endpoint->router) {
316                         return NULL;
317                 }
318                 r |= stasis_message_router_add(endpoint->router,
319                         stasis_cache_clear_type(), endpoint_cache_clear,
320                         endpoint);
321                 r |= stasis_message_router_set_default(endpoint->router,
322                         endpoint_default, endpoint);
323                 if (r) {
324                         return NULL;
325                 }
326
327                 endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
328                         stasis_cp_single_topic(tech_endpoint->topics));
329
330                 endpoint_publish_snapshot(endpoint);
331                 ao2_link(endpoints, endpoint);
332         } else {
333                 endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
334                         endpoint->id);
335                 if (!endpoint->topics) {
336                         return NULL;
337                 }
338
339                 ao2_link(tech_endpoints, endpoint);
340         }
341
342         ao2_ref(endpoint, +1);
343         return endpoint;
344 }
345
346 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
347 {
348         if (ast_strlen_zero(tech)) {
349                 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
350                 return NULL;
351         }
352
353         if (ast_strlen_zero(resource)) {
354                 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
355                 return NULL;
356         }
357
358         return endpoint_internal_create(tech, resource);
359 }
360
361 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
362 {
363         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
364
365         if (!ast_endpoint_snapshot_type()) {
366                 return NULL;
367         }
368
369         snapshot = ast_endpoint_snapshot_create(endpoint);
370         if (!snapshot) {
371                 return NULL;
372         }
373
374         return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
375 }
376
377 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
378 {
379         RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
380
381         if (endpoint == NULL) {
382                 return;
383         }
384
385         ao2_unlink(endpoints, endpoint);
386         endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
387
388         clear_msg = create_endpoint_snapshot_message(endpoint);
389         if (clear_msg) {
390                 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
391                 message = stasis_cache_clear_create(clear_msg);
392                 if (message) {
393                         stasis_publish(ast_endpoint_topic(endpoint), message);
394                 }
395         }
396
397         /* Bump refcount to hold on to the router */
398         ao2_ref(endpoint->router, +1);
399         stasis_message_router_unsubscribe(endpoint->router);
400 }
401
402 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
403 {
404         if (!endpoint) {
405                 return NULL;
406         }
407         return endpoint->tech;
408 }
409
410 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
411 {
412         if (!endpoint) {
413                 return NULL;
414         }
415         return endpoint->resource;
416 }
417
418 const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
419 {
420         if (!endpoint) {
421                 return NULL;
422         }
423         return endpoint->id;
424 }
425
426 enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
427 {
428         if (!endpoint) {
429                 return AST_ENDPOINT_UNKNOWN;
430         }
431         return endpoint->state;
432 }
433
434 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
435         enum ast_endpoint_state state)
436 {
437         ast_assert(endpoint != NULL);
438         ast_assert(!ast_strlen_zero(endpoint->resource));
439
440         ao2_lock(endpoint);
441         endpoint->state = state;
442         ao2_unlock(endpoint);
443         endpoint_publish_snapshot(endpoint);
444 }
445
446 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
447         int max_channels)
448 {
449         ast_assert(endpoint != NULL);
450         ast_assert(!ast_strlen_zero(endpoint->resource));
451
452         ao2_lock(endpoint);
453         endpoint->max_channels = max_channels;
454         ao2_unlock(endpoint);
455         endpoint_publish_snapshot(endpoint);
456 }
457
458 static void endpoint_snapshot_dtor(void *obj)
459 {
460         struct ast_endpoint_snapshot *snapshot = obj;
461         int channel;
462
463         ast_assert(snapshot != NULL);
464
465         for (channel = 0; channel < snapshot->num_channels; channel++) {
466                 ao2_ref(snapshot->channel_ids[channel], -1);
467         }
468
469         ast_string_field_free_memory(snapshot);
470 }
471
472 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
473         struct ast_endpoint *endpoint)
474 {
475         struct ast_endpoint_snapshot *snapshot;
476         int channel_count;
477         struct ao2_iterator i;
478         void *obj;
479         SCOPED_AO2LOCK(lock, endpoint);
480
481         ast_assert(endpoint != NULL);
482         ast_assert(!ast_strlen_zero(endpoint->resource));
483
484         channel_count = ao2_container_count(endpoint->channel_ids);
485
486         snapshot = ao2_alloc_options(
487                 sizeof(*snapshot) + channel_count * sizeof(char *),
488                 endpoint_snapshot_dtor,
489                 AO2_ALLOC_OPT_LOCK_NOLOCK);
490
491         if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
492                 ao2_cleanup(snapshot);
493                 return NULL;
494         }
495
496         ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
497                 endpoint->resource);
498         ast_string_field_set(snapshot, tech, endpoint->tech);
499         ast_string_field_set(snapshot, resource, endpoint->resource);
500
501         snapshot->state = endpoint->state;
502         snapshot->max_channels = endpoint->max_channels;
503
504         i = ao2_iterator_init(endpoint->channel_ids, 0);
505         while ((obj = ao2_iterator_next(&i))) {
506                 /* The reference is kept so the channel id does not go away until the snapshot is gone */
507                 snapshot->channel_ids[snapshot->num_channels++] = obj;
508         }
509         ao2_iterator_destroy(&i);
510
511         return snapshot;
512 }
513
514 static void endpoint_cleanup(void)
515 {
516         ao2_cleanup(endpoints);
517         endpoints = NULL;
518
519         ao2_cleanup(tech_endpoints);
520         tech_endpoints = NULL;
521 }
522
523 int ast_endpoint_init(void)
524 {
525         ast_register_cleanup(endpoint_cleanup);
526
527         endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
528                 endpoint_cmp);
529         if (!endpoints) {
530                 return -1;
531         }
532
533         tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash,
534                 endpoint_cmp);
535         if (!tech_endpoints) {
536                 return -1;
537         }
538
539         return 0;
540 }