ARI: Add ability to raise arbitrary User Events
[asterisk/asterisk.git] / main / endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Asterisk endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/endpoints.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/stringfields.h"
41 #include "asterisk/_private.h"
42
43 /*! Buckets for endpoint->channel mappings. Keep it prime! */
44 #define ENDPOINT_CHANNEL_BUCKETS 127
45
46 /*! Buckets for endpoint hash. Keep it prime! */
47 #define ENDPOINT_BUCKETS 127
48
49 static struct ao2_container *endpoints;
50
51 struct ast_endpoint {
52         AST_DECLARE_STRING_FIELDS(
53                 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
54                 AST_STRING_FIELD(resource);     /*!< Name, unique to the tech. */
55                 AST_STRING_FIELD(id);   /*!< tech/resource id */
56                 );
57         /*! Endpoint's current state */
58         enum ast_endpoint_state state;
59         /*!
60          * \brief Max channels for this endpoint. -1 means unlimited or unknown.
61          *
62          * Note that this simply documents the limits of an endpoint, and does
63          * nothing to try to enforce the limit.
64          */
65         int max_channels;
66         /*! Topic for this endpoint's messages */
67         struct stasis_cp_single *topics;
68         /*! Router for handling this endpoint's messages */
69         struct stasis_message_router *router;
70         /*! ast_str_container of channels associated with this endpoint */
71         struct ao2_container *channel_ids;
72 };
73
74 static int endpoint_hash(const void *obj, int flags)
75 {
76         const struct ast_endpoint *endpoint;
77         const char *key;
78
79         switch (flags & OBJ_SEARCH_MASK) {
80         case OBJ_SEARCH_KEY:
81                 key = obj;
82                 return ast_str_hash(key);
83         case OBJ_SEARCH_OBJECT:
84                 endpoint = obj;
85                 return ast_str_hash(endpoint->id);
86         default:
87                 /* Hash can only work on something with a full key. */
88                 ast_assert(0);
89                 return 0;
90         }
91 }
92
93 static int endpoint_cmp(void *obj, void *arg, int flags)
94 {
95         const struct ast_endpoint *left = obj;
96         const struct ast_endpoint *right = arg;
97         const char *right_key = arg;
98         int cmp;
99
100         switch (flags & OBJ_SEARCH_MASK) {
101         case OBJ_SEARCH_OBJECT:
102                 right_key = right->id;
103                 /* Fall through */
104         case OBJ_SEARCH_KEY:
105                 cmp = strcmp(left->id, right_key);
106                 break;
107         case OBJ_SEARCH_PARTIAL_KEY:
108                 cmp = strncmp(left->id, right_key, strlen(right_key));
109                 break;
110         default:
111                 ast_assert(0);
112                 cmp = 0;
113                 break;
114         }
115         if (cmp) {
116                 return 0;
117         }
118
119         return CMP_MATCH;
120 }
121
122 struct ast_endpoint *ast_endpoint_find_by_id(const char *id)
123 {
124         return ao2_find(endpoints, id, OBJ_KEY);
125 }
126
127 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
128 {
129         if (!endpoint) {
130                 return ast_endpoint_topic_all();
131         }
132         return stasis_cp_single_topic(endpoint->topics);
133 }
134
135 struct stasis_topic *ast_endpoint_topic_cached(struct ast_endpoint *endpoint)
136 {
137         if (!endpoint) {
138                 return ast_endpoint_topic_all_cached();
139         }
140         return stasis_cp_single_topic_cached(endpoint->topics);
141 }
142
143 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
144 {
145         switch (state) {
146         case AST_ENDPOINT_UNKNOWN:
147                 return "unknown";
148         case AST_ENDPOINT_OFFLINE:
149                 return "offline";
150         case AST_ENDPOINT_ONLINE:
151                 return "online";
152         }
153         return "?";
154 }
155
156 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
157 {
158         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
159         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
160
161         ast_assert(endpoint != NULL);
162         ast_assert(endpoint->topics != NULL);
163
164         snapshot = ast_endpoint_snapshot_create(endpoint);
165         if (!snapshot) {
166                 return;
167         }
168         message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
169         if (!message) {
170                 return;
171         }
172         stasis_publish(ast_endpoint_topic(endpoint), message);
173 }
174
175 static void endpoint_dtor(void *obj)
176 {
177         struct ast_endpoint *endpoint = obj;
178
179         /* The router should be shut down already */
180         ast_assert(stasis_message_router_is_done(endpoint->router));
181         ao2_cleanup(endpoint->router);
182         endpoint->router = NULL;
183
184         stasis_cp_single_unsubscribe(endpoint->topics);
185         endpoint->topics = NULL;
186
187         ao2_cleanup(endpoint->channel_ids);
188         endpoint->channel_ids = NULL;
189
190         ast_string_field_free_memory(endpoint);
191 }
192
193
194 int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
195         struct ast_channel *chan)
196 {
197         ast_assert(chan != NULL);
198         ast_assert(endpoint != NULL);
199
200         ast_channel_forward_endpoint(chan, endpoint);
201
202         ao2_lock(endpoint);
203         ast_str_container_add(endpoint->channel_ids, ast_channel_uniqueid(chan));
204         ao2_unlock(endpoint);
205
206         ast_channel_lock(chan);
207         ast_publish_channel_state(chan);
208         ast_channel_unlock(chan);
209         endpoint_publish_snapshot(endpoint);
210
211         return 0;
212 }
213
214 /*! \brief Handler for channel snapshot cache clears */
215 static void endpoint_cache_clear(void *data,
216         struct stasis_subscription *sub,
217         struct stasis_message *message)
218 {
219         struct ast_endpoint *endpoint = data;
220         struct stasis_message *clear_msg = stasis_message_data(message);
221         struct ast_channel_snapshot *clear_snapshot;
222
223         if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
224                 return;
225         }
226
227         clear_snapshot = stasis_message_data(clear_msg);
228
229         ast_assert(endpoint != NULL);
230
231         ao2_lock(endpoint);
232         ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
233         ao2_unlock(endpoint);
234         endpoint_publish_snapshot(endpoint);
235 }
236
237 static void endpoint_default(void *data,
238         struct stasis_subscription *sub,
239         struct stasis_message *message)
240 {
241         struct stasis_endpoint *endpoint = data;
242
243         if (stasis_subscription_final_message(sub, message)) {
244                 ao2_cleanup(endpoint);
245         }
246 }
247
248 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
249 {
250         RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
251         int r = 0;
252
253         if (ast_strlen_zero(tech)) {
254                 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
255                 return NULL;
256         }
257
258         if (ast_strlen_zero(resource)) {
259                 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
260                 return NULL;
261         }
262
263         endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
264         if (!endpoint) {
265                 return NULL;
266         }
267
268         endpoint->max_channels = -1;
269         endpoint->state = AST_ENDPOINT_UNKNOWN;
270
271         if (ast_string_field_init(endpoint, 80) != 0) {
272                 return NULL;
273         }
274
275         ast_string_field_set(endpoint, tech, tech);
276         ast_string_field_set(endpoint, resource, resource);
277         ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
278
279         /* All access to channel_ids should be covered by the endpoint's
280          * lock; no extra lock needed. */
281         endpoint->channel_ids = ast_str_container_alloc_options(
282                 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_CHANNEL_BUCKETS);
283         if (!endpoint->channel_ids) {
284                 return NULL;
285         }
286
287         endpoint->topics = stasis_cp_single_create(ast_endpoint_cache_all(),
288                 endpoint->id);
289         if (!endpoint->topics) {
290                 return NULL;
291         }
292
293         endpoint->router = stasis_message_router_create(ast_endpoint_topic(endpoint));
294         if (!endpoint->router) {
295                 return NULL;
296         }
297         r |= stasis_message_router_add(endpoint->router,
298                 stasis_cache_clear_type(), endpoint_cache_clear,
299                 endpoint);
300         r |= stasis_message_router_set_default(endpoint->router,
301                 endpoint_default, endpoint);
302
303         endpoint_publish_snapshot(endpoint);
304
305         ao2_link(endpoints, endpoint);
306
307         ao2_ref(endpoint, +1);
308         return endpoint;
309 }
310
311 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
312 {
313         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
314         snapshot = ast_endpoint_snapshot_create(endpoint);
315         if (!snapshot) {
316                 return NULL;
317         }
318
319         return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
320 }
321
322 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
323 {
324         RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
325
326         if (endpoint == NULL) {
327                 return;
328         }
329
330         ao2_unlink(endpoints, endpoint);
331
332         clear_msg = create_endpoint_snapshot_message(endpoint);
333         if (clear_msg) {
334                 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
335                 message = stasis_cache_clear_create(clear_msg);
336                 if (message) {
337                         stasis_publish(ast_endpoint_topic(endpoint), message);
338                 }
339         }
340
341         /* Bump refcount to hold on to the router */
342         ao2_ref(endpoint->router, +1);
343         stasis_message_router_unsubscribe(endpoint->router);
344 }
345
346 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
347 {
348         if (!endpoint) {
349                 return NULL;
350         }
351         return endpoint->tech;
352 }
353
354 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
355 {
356         if (!endpoint) {
357                 return NULL;
358         }
359         return endpoint->resource;
360 }
361
362 const char *ast_endpoint_get_id(const struct ast_endpoint *endpoint)
363 {
364         if (!endpoint) {
365                 return NULL;
366         }
367         return endpoint->id;
368 }
369
370 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
371         enum ast_endpoint_state state)
372 {
373         ast_assert(endpoint != NULL);
374         ao2_lock(endpoint);
375         endpoint->state = state;
376         ao2_unlock(endpoint);
377         endpoint_publish_snapshot(endpoint);
378 }
379
380 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
381         int max_channels)
382 {
383         ast_assert(endpoint != NULL);
384         ao2_lock(endpoint);
385         endpoint->max_channels = max_channels;
386         ao2_unlock(endpoint);
387         endpoint_publish_snapshot(endpoint);
388 }
389
390 static void endpoint_snapshot_dtor(void *obj)
391 {
392         struct ast_endpoint_snapshot *snapshot = obj;
393         int channel;
394
395         ast_assert(snapshot != NULL);
396
397         for (channel = 0; channel < snapshot->num_channels; channel++) {
398                 ao2_ref(snapshot->channel_ids[channel], -1);
399         }
400
401         ast_string_field_free_memory(snapshot);
402 }
403
404 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
405         struct ast_endpoint *endpoint)
406 {
407         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
408         int channel_count;
409         struct ao2_iterator i;
410         void *obj;
411         SCOPED_AO2LOCK(lock, endpoint);
412
413         channel_count = ao2_container_count(endpoint->channel_ids);
414
415         snapshot = ao2_alloc(
416                 sizeof(*snapshot) + channel_count * sizeof(char *),
417                 endpoint_snapshot_dtor);
418
419         if (ast_string_field_init(snapshot, 80) != 0) {
420                 return NULL;
421         }
422
423         ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
424                 endpoint->resource);
425         ast_string_field_set(snapshot, tech, endpoint->tech);
426         ast_string_field_set(snapshot, resource, endpoint->resource);
427
428         snapshot->state = endpoint->state;
429         snapshot->max_channels = endpoint->max_channels;
430
431         i = ao2_iterator_init(endpoint->channel_ids, 0);
432         while ((obj = ao2_iterator_next(&i))) {
433                 /* The reference is kept so the channel id does not go away until the snapshot is gone */
434                 snapshot->channel_ids[snapshot->num_channels++] = obj;
435         }
436         ao2_iterator_destroy(&i);
437
438         ao2_ref(snapshot, +1);
439         return snapshot;
440 }
441
442 static void endpoint_cleanup(void)
443 {
444         ao2_cleanup(endpoints);
445         endpoints = NULL;
446 }
447
448 int ast_endpoint_init(void)
449 {
450         ast_register_cleanup(endpoint_cleanup);
451
452         endpoints = ao2_container_alloc(ENDPOINT_BUCKETS, endpoint_hash,
453                 endpoint_cmp);
454
455         if (!endpoints) {
456                 return -1;
457         }
458
459         return 0;
460 }