CI: Various updates to buildAsterisk.sh
[asterisk/asterisk.git] / main / endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Asterisk endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/endpoints.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
38 #include "asterisk/stringfields.h"
39 #include "asterisk/_private.h"
40
41 /*! Buckets for endpoint->channel mappings. Keep it prime! */
42 #define ENDPOINT_CHANNEL_BUCKETS 127
43
44 /*! Buckets for endpoint hash. Keep it prime! */
45 #define ENDPOINT_BUCKETS 127
46
47 /*! Buckets for technology endpoints. */
48 #define TECH_ENDPOINT_BUCKETS 11
49
50 static struct ao2_container *endpoints;
51
52 static struct ao2_container *tech_endpoints;
53
54 struct ast_endpoint {
55         AST_DECLARE_STRING_FIELDS(
56                 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
57                 AST_STRING_FIELD(resource);     /*!< Name, unique to the tech. */
58                 AST_STRING_FIELD(id);   /*!< tech/resource id */
59                 );
60         /*! Endpoint's current state */
61         enum ast_endpoint_state state;
62         /*!
63          * \brief Max channels for this endpoint. -1 means unlimited or unknown.
64          *
65          * Note that this simply documents the limits of an endpoint, and does
66          * nothing to try to enforce the limit.
67          */
68         int max_channels;
69         /*! Topic for this endpoint's messages */
70         struct stasis_cp_single *topics;
71         /*! Router for handling this endpoint's messages */
72         struct stasis_message_router *router;
73         /*! ast_str_container of channels associated with this endpoint */
74         struct ao2_container *channel_ids;
75         /*! Forwarding subscription from an endpoint to its tech endpoint */
76         struct stasis_forward *tech_forward;
77 };
78
79 AO2_STRING_FIELD_HASH_FN(ast_endpoint, id)
80 AO2_STRING_FIELD_CMP_FN(ast_endpoint, id)
81
82 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
83 {
84         struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
85
86         if (!endpoint) {
87                 endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
88         }
89
90         return endpoint;
91 }
92
93 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
94 {
95         if (!endpoint) {
96                 return ast_endpoint_topic_all();
97         }
98         return stasis_cp_single_topic(endpoint->topics);
99 }
100
101 struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
102 {
103         if (!endpoint) {
104                 return ast_endpoint_topic_all_cached();
105         }
106         return stasis_cp_single_topic_cached(endpoint->topics);
107 }
108
109 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
110 {
111         switch (state) {
112         case AST_ENDPOINT_UNKNOWN:
113                 return "unknown";
114         case AST_ENDPOINT_OFFLINE:
115                 return "offline";
116         case AST_ENDPOINT_ONLINE:
117                 return "online";
118         }
119         return "?";
120 }
121
122 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
123 {
124         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
125         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
126
127         ast_assert(endpoint != NULL);
128         ast_assert(endpoint->topics != NULL);
129
130         if (!ast_endpoint_snapshot_type()) {
131                 return;
132         }
133
134         snapshot = ast_endpoint_snapshot_create(endpoint);
135         if (!snapshot) {
136                 return;
137         }
138         message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
139         if (!message) {
140                 return;
141         }
142         stasis_publish(ast_endpoint_topic(endpoint), message);
143 }
144
145 static void endpoint_dtor(void *obj)
146 {
147         struct ast_endpoint *endpoint = obj;
148
149         /* The router should be shut down already */
150         ast_assert(stasis_message_router_is_done(endpoint->router));
151         ao2_cleanup(endpoint->router);
152         endpoint->router = NULL;
153
154         stasis_cp_single_unsubscribe(endpoint->topics);
155         endpoint->topics = NULL;
156
157         ao2_cleanup(endpoint->channel_ids);
158         endpoint->channel_ids = NULL;
159
160         ast_string_field_free_memory(endpoint);
161 }
162
163
164 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
165         struct ast_channel *chan)
166 {
167         ast_assert(chan != NULL);
168         ast_assert(endpoint != NULL);
169         ast_assert(!ast_strlen_zero(endpoint->resource));
170
171         ast_channel_forward_endpoint(chan, endpoint);
172
173         ao2_lock(endpoint);
174         ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
175         ao2_unlock(endpoint);
176
177         endpoint_publish_snapshot(endpoint);
178
179         return 0;
180 }
181
182 /*! \brief Handler for channel snapshot update */
183 static void endpoint_cache_clear(void *data,
184         struct stasis_subscription *sub,
185         struct stasis_message *message)
186 {
187         struct ast_endpoint *endpoint = data;
188         struct ast_channel_snapshot_update *update = stasis_message_data(message);
189
190         /* Only when the channel is dead do we remove it */
191         if (!ast_test_flag(&update->new_snapshot->flags, AST_FLAG_DEAD)) {
192                 return;
193         }
194
195         ast_assert(endpoint != NULL);
196
197         ao2_lock(endpoint);
198         ast_str_container_remove(endpoint->channel_ids, update->new_snapshot->base->uniqueid);
199         ao2_unlock(endpoint);
200         endpoint_publish_snapshot(endpoint);
201 }
202
203 static void endpoint_subscription_change(void *data,
204         struct stasis_subscription *sub,
205         struct stasis_message *message)
206 {
207         struct stasis_endpoint *endpoint = data;
208
209         if (stasis_subscription_final_message(sub, message)) {
210                 ao2_cleanup(endpoint);
211         }
212 }
213
214 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
215 {
216         RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
217         RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
218         int r = 0;
219
220         /* Get/create the technology endpoint */
221         if (!ast_strlen_zero(resource)) {
222                 tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
223                 if (!tech_endpoint) {
224                         tech_endpoint = endpoint_internal_create(tech, NULL);
225                         if (!tech_endpoint) {
226                                 return NULL;
227                         }
228                 }
229         }
230
231         endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
232         if (!endpoint) {
233                 return NULL;
234         }
235
236         endpoint->max_channels = -1;
237         endpoint->state = AST_ENDPOINT_UNKNOWN;
238
239         if (ast_string_field_init(endpoint, 80) != 0) {
240                 return NULL;
241         }
242         ast_string_field_set(endpoint, tech, tech);
243         ast_string_field_set(endpoint, resource, S_OR(resource, ""));
244         ast_string_field_build(endpoint, id, "%s%s%s",
245                 tech,
246                 !ast_strlen_zero(resource) ? "/" : "",
247                 S_OR(resource, ""));
248
249         /* All access to channel_ids should be covered by the endpoint's
250          * lock; no extra lock needed. */
251         endpoint->channel_ids = ast_str_container_alloc_options(
252                 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
253         if (!endpoint->channel_ids) {
254                 return NULL;
255         }
256
257         if (!ast_strlen_zero(resource)) {
258
259                 endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
260                         endpoint->id);
261                 if (!endpoint->topics) {
262                         return NULL;
263                 }
264                 stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
265                 stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
266
267                 endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
268                 if (!endpoint->router) {
269                         return NULL;
270                 }
271                 r |= stasis_message_router_add(endpoint->router,
272                         ast_channel_snapshot_type(), endpoint_cache_clear,
273                         endpoint);
274                 r |= stasis_message_router_add(endpoint->router,
275                         stasis_subscription_change_type(), endpoint_subscription_change,
276                         endpoint);
277                 if (r) {
278                         return NULL;
279                 }
280
281                 endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
282                         stasis_cp_single_topic(tech_endpoint->topics));
283
284                 endpoint_publish_snapshot(endpoint);
285                 ao2_link(endpoints, endpoint);
286         } else {
287                 endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
288                         endpoint->id);
289                 if (!endpoint->topics) {
290                         return NULL;
291                 }
292                 stasis_cp_single_accept_message_type(endpoint->topics, ast_endpoint_snapshot_type());
293                 stasis_cp_single_set_filter(endpoint->topics, STASIS_SUBSCRIPTION_FILTER_SELECTIVE);
294
295                 ao2_link(tech_endpoints, endpoint);
296         }
297
298         ao2_ref(endpoint, +1);
299         return endpoint;
300 }
301
302 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
303 {
304         if (ast_strlen_zero(tech)) {
305                 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
306                 return NULL;
307         }
308
309         if (ast_strlen_zero(resource)) {
310                 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
311                 return NULL;
312         }
313
314         return endpoint_internal_create(tech, resource);
315 }
316
317 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
318 {
319         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
320
321         if (!ast_endpoint_snapshot_type()) {
322                 return NULL;
323         }
324
325         snapshot = ast_endpoint_snapshot_create(endpoint);
326         if (!snapshot) {
327                 return NULL;
328         }
329
330         return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
331 }
332
333 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
334 {
335         RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
336
337         if (endpoint == NULL) {
338                 return;
339         }
340
341         ao2_unlink(endpoints, endpoint);
342         endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
343
344         clear_msg = create_endpoint_snapshot_message(endpoint);
345         if (clear_msg) {
346                 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
347                 message = stasis_cache_clear_create(clear_msg);
348                 if (message) {
349                         stasis_publish(ast_endpoint_topic(endpoint), message);
350                 }
351         }
352
353         /* Bump refcount to hold on to the router */
354         ao2_ref(endpoint->router, +1);
355         stasis_message_router_unsubscribe(endpoint->router);
356 }
357
358 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
359 {
360         if (!endpoint) {
361                 return NULL;
362         }
363         return endpoint->tech;
364 }
365
366 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
367 {
368         if (!endpoint) {
369                 return NULL;
370         }
371         return endpoint->resource;
372 }
373
374 const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
375 {
376         if (!endpoint) {
377                 return NULL;
378         }
379         return endpoint->id;
380 }
381
382 enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
383 {
384         if (!endpoint) {
385                 return AST_ENDPOINT_UNKNOWN;
386         }
387         return endpoint->state;
388 }
389
390 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
391         enum ast_endpoint_state state)
392 {
393         ast_assert(endpoint != NULL);
394         ast_assert(!ast_strlen_zero(endpoint->resource));
395
396         ao2_lock(endpoint);
397         endpoint->state = state;
398         ao2_unlock(endpoint);
399         endpoint_publish_snapshot(endpoint);
400 }
401
402 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
403         int max_channels)
404 {
405         ast_assert(endpoint != NULL);
406         ast_assert(!ast_strlen_zero(endpoint->resource));
407
408         ao2_lock(endpoint);
409         endpoint->max_channels = max_channels;
410         ao2_unlock(endpoint);
411         endpoint_publish_snapshot(endpoint);
412 }
413
414 static void endpoint_snapshot_dtor(void *obj)
415 {
416         struct ast_endpoint_snapshot *snapshot = obj;
417         int channel;
418
419         ast_assert(snapshot != NULL);
420
421         for (channel = 0; channel < snapshot->num_channels; channel++) {
422                 ao2_ref(snapshot->channel_ids[channel], -1);
423         }
424
425         ast_string_field_free_memory(snapshot);
426 }
427
428 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
429         struct ast_endpoint *endpoint)
430 {
431         struct ast_endpoint_snapshot *snapshot;
432         int channel_count;
433         struct ao2_iterator i;
434         void *obj;
435         SCOPED_AO2LOCK(lock, endpoint);
436
437         ast_assert(endpoint != NULL);
438         ast_assert(!ast_strlen_zero(endpoint->resource));
439
440         channel_count = ao2_container_count(endpoint->channel_ids);
441
442         snapshot = ao2_alloc_options(
443                 sizeof(*snapshot) + channel_count * sizeof(char *),
444                 endpoint_snapshot_dtor,
445                 AO2_ALLOC_OPT_LOCK_NOLOCK);
446
447         if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
448                 ao2_cleanup(snapshot);
449                 return NULL;
450         }
451
452         ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
453                 endpoint->resource);
454         ast_string_field_set(snapshot, tech, endpoint->tech);
455         ast_string_field_set(snapshot, resource, endpoint->resource);
456
457         snapshot->state = endpoint->state;
458         snapshot->max_channels = endpoint->max_channels;
459
460         i = ao2_iterator_init(endpoint->channel_ids, 0);
461         while ((obj = ao2_iterator_next(&i))) {
462                 /* The reference is kept so the channel id does not go away until the snapshot is gone */
463                 snapshot->channel_ids[snapshot->num_channels++] = obj;
464         }
465         ao2_iterator_destroy(&i);
466
467         return snapshot;
468 }
469
470 static void endpoint_cleanup(void)
471 {
472         ao2_cleanup(endpoints);
473         endpoints = NULL;
474
475         ao2_cleanup(tech_endpoints);
476         tech_endpoints = NULL;
477 }
478
479 int ast_endpoint_init(void)
480 {
481         ast_register_cleanup(endpoint_cleanup);
482
483         endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0, ENDPOINT_BUCKETS,
484                 ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
485         if (!endpoints) {
486                 return -1;
487         }
488
489         tech_endpoints = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_MUTEX, 0,
490                 TECH_ENDPOINT_BUCKETS, ast_endpoint_hash_fn, NULL, ast_endpoint_cmp_fn);
491         if (!tech_endpoints) {
492                 return -1;
493         }
494
495         return 0;
496 }