CLI: Create ast_cli_completion_add function.
[asterisk/asterisk.git] / main / endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Asterisk endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/endpoints.h"
34 #include "asterisk/stasis.h"
35 #include "asterisk/stasis_channels.h"
36 #include "asterisk/stasis_endpoints.h"
37 #include "asterisk/stasis_message_router.h"
38 #include "asterisk/stringfields.h"
39 #include "asterisk/_private.h"
40
41 /*! Buckets for endpoint->channel mappings. Keep it prime! */
42 #define ENDPOINT_CHANNEL_BUCKETS 127
43
44 /*! Buckets for endpoint hash. Keep it prime! */
45 #define ENDPOINT_BUCKETS 127
46
47 /*! Buckets for technology endpoints. */
48 #define TECH_ENDPOINT_BUCKETS 11
49
50 static struct ao2_container *endpoints;
51
52 static struct ao2_container *tech_endpoints;
53
54 struct ast_endpoint {
55         AST_DECLARE_STRING_FIELDS(
56                 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
57                 AST_STRING_FIELD(resource);     /*!< Name, unique to the tech. */
58                 AST_STRING_FIELD(id);   /*!< tech/resource id */
59                 );
60         /*! Endpoint's current state */
61         enum ast_endpoint_state state;
62         /*!
63          * \brief Max channels for this endpoint. -1 means unlimited or unknown.
64          *
65          * Note that this simply documents the limits of an endpoint, and does
66          * nothing to try to enforce the limit.
67          */
68         int max_channels;
69         /*! Topic for this endpoint's messages */
70         struct stasis_cp_single *topics;
71         /*! Router for handling this endpoint's messages */
72         struct stasis_message_router *router;
73         /*! ast_str_container of channels associated with this endpoint */
74         struct ao2_container *channel_ids;
75         /*! Forwarding subscription from an endpoint to its tech endpoint */
76         struct stasis_forward *tech_forward;
77 };
78
79 static int endpoint_hash(const void *obj, int flags)
80 {
81         const struct ast_endpoint *endpoint;
82         const char *key;
83
84         switch (flags & OBJ_SEARCH_MASK) {
85         case OBJ_SEARCH_KEY:
86                 key = obj;
87                 return ast_str_hash(key);
88         case OBJ_SEARCH_OBJECT:
89                 endpoint = obj;
90                 return ast_str_hash(endpoint->id);
91         default:
92                 /* Hash can only work on something with a full key. */
93                 ast_assert(0);
94                 return 0;
95         }
96 }
97
98 static int endpoint_cmp(void *obj, void *arg, int flags)
99 {
100         const struct ast_endpoint *left = obj;
101         const struct ast_endpoint *right = arg;
102         const char *right_key = arg;
103         int cmp;
104
105         switch (flags & OBJ_SEARCH_MASK) {
106         case OBJ_SEARCH_OBJECT:
107                 right_key = right->id;
108                 /* Fall through */
109         case OBJ_SEARCH_KEY:
110                 cmp = strcmp(left->id, right_key);
111                 break;
112         case OBJ_SEARCH_PARTIAL_KEY:
113                 cmp = strncmp(left->id, right_key, strlen(right_key));
114                 break;
115         default:
116                 ast_assert(0);
117                 cmp = 0;
118                 break;
119         }
120         if (cmp) {
121                 return 0;
122         }
123
124         return CMP_MATCH;
125 }
126
127 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
128 {
129         struct ast_endpoint *endpoint = ao2_find(endpoints, id, OBJ_KEY);
130
131         if (!endpoint) {
132                 endpoint = ao2_find(tech_endpoints, id, OBJ_KEY);
133         }
134
135         return endpoint;
136 }
137
138 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
139 {
140         if (!endpoint) {
141                 return ast_endpoint_topic_all();
142         }
143         return stasis_cp_single_topic(endpoint->topics);
144 }
145
146 struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
147 {
148         if (!endpoint) {
149                 return ast_endpoint_topic_all_cached();
150         }
151         return stasis_cp_single_topic_cached(endpoint->topics);
152 }
153
154 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
155 {
156         switch (state) {
157         case AST_ENDPOINT_UNKNOWN:
158                 return "unknown";
159         case AST_ENDPOINT_OFFLINE:
160                 return "offline";
161         case AST_ENDPOINT_ONLINE:
162                 return "online";
163         }
164         return "?";
165 }
166
167 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
168 {
169         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
170         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
171
172         ast_assert(endpoint != NULL);
173         ast_assert(endpoint->topics != NULL);
174
175         if (!ast_endpoint_snapshot_type()) {
176                 return;
177         }
178
179         snapshot = ast_endpoint_snapshot_create(endpoint);
180         if (!snapshot) {
181                 return;
182         }
183         message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
184         if (!message) {
185                 return;
186         }
187         stasis_publish(ast_endpoint_topic(endpoint), message);
188 }
189
190 static void endpoint_dtor(void *obj)
191 {
192         struct ast_endpoint *endpoint = obj;
193
194         /* The router should be shut down already */
195         ast_assert(stasis_message_router_is_done(endpoint->router));
196         ao2_cleanup(endpoint->router);
197         endpoint->router = NULL;
198
199         stasis_cp_single_unsubscribe(endpoint->topics);
200         endpoint->topics = NULL;
201
202         ao2_cleanup(endpoint->channel_ids);
203         endpoint->channel_ids = NULL;
204
205         ast_string_field_free_memory(endpoint);
206 }
207
208
209 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
210         struct ast_channel *chan)
211 {
212         ast_assert(chan != NULL);
213         ast_assert(endpoint != NULL);
214         ast_assert(!ast_strlen_zero(endpoint->resource));
215
216         ast_channel_forward_endpoint(chan, endpoint);
217
218         ao2_lock(endpoint);
219         ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
220         ao2_unlock(endpoint);
221
222         endpoint_publish_snapshot(endpoint);
223
224         return 0;
225 }
226
227 /*! \brief Handler for channel snapshot cache clears */
228 static void endpoint_cache_clear(void *data,
229         struct stasis_subscription *sub,
230         struct stasis_message *message)
231 {
232         struct ast_endpoint *endpoint = data;
233         struct stasis_message *clear_msg = stasis_message_data(message);
234         struct ast_channel_snapshot *clear_snapshot;
235
236         if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
237                 return;
238         }
239
240         clear_snapshot = stasis_message_data(clear_msg);
241
242         ast_assert(endpoint != NULL);
243
244         ao2_lock(endpoint);
245         ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
246         ao2_unlock(endpoint);
247         endpoint_publish_snapshot(endpoint);
248 }
249
250 static void endpoint_default(void *data,
251         struct stasis_subscription *sub,
252         struct stasis_message *message)
253 {
254         struct stasis_endpoint *endpoint = data;
255
256         if (stasis_subscription_final_message(sub, message)) {
257                 ao2_cleanup(endpoint);
258         }
259 }
260
261 static struct ast_endpoint *endpoint_internal_create(const char *tech, const char *resource)
262 {
263         RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
264         RAII_VAR(struct ast_endpoint *, tech_endpoint, NULL, ao2_cleanup);
265         int r = 0;
266
267         /* Get/create the technology endpoint */
268         if (!ast_strlen_zero(resource)) {
269                 tech_endpoint = ao2_find(tech_endpoints, tech, OBJ_KEY);
270                 if (!tech_endpoint) {
271                         tech_endpoint = endpoint_internal_create(tech, NULL);
272                         if (!tech_endpoint) {
273                                 return NULL;
274                         }
275                 }
276         }
277
278         endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
279         if (!endpoint) {
280                 return NULL;
281         }
282
283         endpoint->max_channels = -1;
284         endpoint->state = AST_ENDPOINT_UNKNOWN;
285
286         if (ast_string_field_init(endpoint, 80) != 0) {
287                 return NULL;
288         }
289         ast_string_field_set(endpoint, tech, tech);
290         ast_string_field_set(endpoint, resource, S_OR(resource, ""));
291         ast_string_field_build(endpoint, id, "%s%s%s",
292                 tech,
293                 !ast_strlen_zero(resource) ? "/" : "",
294                 S_OR(resource, ""));
295
296         /* All access to channel_ids should be covered by the endpoint's
297          * lock; no extra lock needed. */
298         endpoint->channel_ids = ast_str_container_alloc_options(
299                 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
300         if (!endpoint->channel_ids) {
301                 return NULL;
302         }
303
304         if (!ast_strlen_zero(resource)) {
305
306                 endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
307                         endpoint->id);
308                 if (!endpoint->topics) {
309                         return NULL;
310                 }
311
312                 endpoint->router = stasis_message_router_create_pool(ast_endpoint_topic(endpoint));
313                 if (!endpoint->router) {
314                         return NULL;
315                 }
316                 r |= stasis_message_router_add(endpoint->router,
317                         stasis_cache_clear_type(), endpoint_cache_clear,
318                         endpoint);
319                 r |= stasis_message_router_set_default(endpoint->router,
320                         endpoint_default, endpoint);
321                 if (r) {
322                         return NULL;
323                 }
324
325                 endpoint->tech_forward = stasis_forward_all(stasis_cp_single_topic(endpoint->topics),
326                         stasis_cp_single_topic(tech_endpoint->topics));
327
328                 endpoint_publish_snapshot(endpoint);
329                 ao2_link(endpoints, endpoint);
330         } else {
331                 endpoint->topics = stasis_cp_sink_create(ast_endpoint_cache_all(),
332                         endpoint->id);
333                 if (!endpoint->topics) {
334                         return NULL;
335                 }
336
337                 ao2_link(tech_endpoints, endpoint);
338         }
339
340         ao2_ref(endpoint, +1);
341         return endpoint;
342 }
343
344 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
345 {
346         if (ast_strlen_zero(tech)) {
347                 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
348                 return NULL;
349         }
350
351         if (ast_strlen_zero(resource)) {
352                 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
353                 return NULL;
354         }
355
356         return endpoint_internal_create(tech, resource);
357 }
358
359 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
360 {
361         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
362
363         if (!ast_endpoint_snapshot_type()) {
364                 return NULL;
365         }
366
367         snapshot = ast_endpoint_snapshot_create(endpoint);
368         if (!snapshot) {
369                 return NULL;
370         }
371
372         return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
373 }
374
375 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
376 {
377         RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
378
379         if (endpoint == NULL) {
380                 return;
381         }
382
383         ao2_unlink(endpoints, endpoint);
384         endpoint->tech_forward = stasis_forward_cancel(endpoint->tech_forward);
385
386         clear_msg = create_endpoint_snapshot_message(endpoint);
387         if (clear_msg) {
388                 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
389                 message = stasis_cache_clear_create(clear_msg);
390                 if (message) {
391                         stasis_publish(ast_endpoint_topic(endpoint), message);
392                 }
393         }
394
395         /* Bump refcount to hold on to the router */
396         ao2_ref(endpoint->router, +1);
397         stasis_message_router_unsubscribe(endpoint->router);
398 }
399
400 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
401 {
402         if (!endpoint) {
403                 return NULL;
404         }
405         return endpoint->tech;
406 }
407
408 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
409 {
410         if (!endpoint) {
411                 return NULL;
412         }
413         return endpoint->resource;
414 }
415
416 const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
417 {
418         if (!endpoint) {
419                 return NULL;
420         }
421         return endpoint->id;
422 }
423
424 enum ast_endpoint_state ast_endpoint_get_state(const struct ast_endpoint *endpoint)
425 {
426         if (!endpoint) {
427                 return AST_ENDPOINT_UNKNOWN;
428         }
429         return endpoint->state;
430 }
431
432 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
433         enum ast_endpoint_state state)
434 {
435         ast_assert(endpoint != NULL);
436         ast_assert(!ast_strlen_zero(endpoint->resource));
437
438         ao2_lock(endpoint);
439         endpoint->state = state;
440         ao2_unlock(endpoint);
441         endpoint_publish_snapshot(endpoint);
442 }
443
444 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
445         int max_channels)
446 {
447         ast_assert(endpoint != NULL);
448         ast_assert(!ast_strlen_zero(endpoint->resource));
449
450         ao2_lock(endpoint);
451         endpoint->max_channels = max_channels;
452         ao2_unlock(endpoint);
453         endpoint_publish_snapshot(endpoint);
454 }
455
456 static void endpoint_snapshot_dtor(void *obj)
457 {
458         struct ast_endpoint_snapshot *snapshot = obj;
459         int channel;
460
461         ast_assert(snapshot != NULL);
462
463         for (channel = 0; channel < snapshot->num_channels; channel++) {
464                 ao2_ref(snapshot->channel_ids[channel], -1);
465         }
466
467         ast_string_field_free_memory(snapshot);
468 }
469
470 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
471         struct ast_endpoint *endpoint)
472 {
473         struct ast_endpoint_snapshot *snapshot;
474         int channel_count;
475         struct ao2_iterator i;
476         void *obj;
477         SCOPED_AO2LOCK(lock, endpoint);
478
479         ast_assert(endpoint != NULL);
480         ast_assert(!ast_strlen_zero(endpoint->resource));
481
482         channel_count = ao2_container_count(endpoint->channel_ids);
483
484         snapshot = ao2_alloc_options(
485                 sizeof(*snapshot) + channel_count * sizeof(char *),
486                 endpoint_snapshot_dtor,
487                 AO2_ALLOC_OPT_LOCK_NOLOCK);
488
489         if (!snapshot || ast_string_field_init(snapshot, 80) != 0) {
490                 ao2_cleanup(snapshot);
491                 return NULL;
492         }
493
494         ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
495                 endpoint->resource);
496         ast_string_field_set(snapshot, tech, endpoint->tech);
497         ast_string_field_set(snapshot, resource, endpoint->resource);
498
499         snapshot->state = endpoint->state;
500         snapshot->max_channels = endpoint->max_channels;
501
502         i = ao2_iterator_init(endpoint->channel_ids, 0);
503         while ((obj = ao2_iterator_next(&i))) {
504                 /* The reference is kept so the channel id does not go away until the snapshot is gone */
505                 snapshot->channel_ids[snapshot->num_channels++] = obj;
506         }
507         ao2_iterator_destroy(&i);
508
509         return snapshot;
510 }
511
512 static void endpoint_cleanup(void)
513 {
514         ao2_cleanup(endpoints);
515         endpoints = NULL;
516
517         ao2_cleanup(tech_endpoints);
518         tech_endpoints = NULL;
519 }
520
521 int ast_endpoint_init(void)
522 {
523         ast_register_cleanup(endpoint_cleanup);
524
525         endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
526                 endpoint_cmp);
527         if (!endpoints) {
528                 return -1;
529         }
530
531         tech_endpoints = ao2_container_alloc(TECH_ENDPOINT_BUCKETS, endpoint_hash,
532                 endpoint_cmp);
533         if (!tech_endpoints) {
534                 return -1;
535         }
536
537         return 0;
538 }