Fix memory leak while loading modules, adding formats, and destroying endpoints
[asterisk/asterisk.git] / main / endpoints.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Asterisk endpoint API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/endpoints.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/stasis_channels.h"
38 #include "asterisk/stasis_endpoints.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/stringfields.h"
41
42 /*! Buckets for endpoint->channel mappings. Keep it prime! */
43 #define ENDPOINT_BUCKETS 127
44
45 struct ast_endpoint {
46         AST_DECLARE_STRING_FIELDS(
47                 AST_STRING_FIELD(tech); /*!< Technology (SIP, IAX2, etc.). */
48                 AST_STRING_FIELD(resource);     /*!< Name, unique to the tech. */
49                 AST_STRING_FIELD(id);   /*!< tech/resource id */
50                 );
51         /*! Endpoint's current state */
52         enum ast_endpoint_state state;
53         /*!
54          * \brief Max channels for this endpoint. -1 means unlimited or unknown.
55          *
56          * Note that this simply documents the limits of an endpoint, and does
57          * nothing to try to enforce the limit.
58          */
59         int max_channels;
60         /*! Topic for this endpoint's messages */
61         struct stasis_topic *topic;
62         /*!
63          * Forwarding subscription sending messages to ast_endpoint_topic_all()
64          */
65         struct stasis_subscription *forward;
66         /*! Router for handling this endpoint's messages */
67         struct stasis_message_router *router;
68         /*! ast_str_container of channels associated with this endpoint */
69         struct ao2_container *channel_ids;
70 };
71
72 const char *ast_endpoint_state_to_string(enum ast_endpoint_state state)
73 {
74         switch (state) {
75         case AST_ENDPOINT_UNKNOWN:
76                 return "unknown";
77         case AST_ENDPOINT_OFFLINE:
78                 return "offline";
79         case AST_ENDPOINT_ONLINE:
80                 return "online";
81         }
82         return "?";
83 }
84
85 static void endpoint_publish_snapshot(struct ast_endpoint *endpoint)
86 {
87         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
88         RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
89
90         ast_assert(endpoint != NULL);
91         ast_assert(endpoint->topic != NULL);
92
93         snapshot = ast_endpoint_snapshot_create(endpoint);
94         if (!snapshot) {
95                 return;
96         }
97         message = stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
98         if (!message) {
99                 return;
100         }
101         stasis_publish(endpoint->topic, message);
102 }
103
104 static void endpoint_dtor(void *obj)
105 {
106         struct ast_endpoint *endpoint = obj;
107
108         /* The router should be shut down already */
109         ast_assert(stasis_message_router_is_done(endpoint->router));
110         ao2_cleanup(endpoint->router);
111         endpoint->router = NULL;
112
113         stasis_unsubscribe(endpoint->forward);
114         endpoint->forward = NULL;
115
116         ao2_cleanup(endpoint->topic);
117         endpoint->topic = NULL;
118
119         ao2_cleanup(endpoint->channel_ids);
120         endpoint->channel_ids = NULL;
121
122         ast_string_field_free_memory(endpoint);
123 }
124
125 static void endpoint_channel_snapshot(void *data,
126         struct stasis_subscription *sub, struct stasis_topic *topic,
127         struct stasis_message *message)
128 {
129         struct ast_endpoint *endpoint = data;
130         struct ast_channel_snapshot *snapshot = stasis_message_data(message);
131         RAII_VAR(char *, existing_id, NULL, ao2_cleanup);
132         int publish = 0;
133
134         ast_assert(endpoint != NULL);
135         ast_assert(snapshot != NULL);
136
137         ao2_lock(endpoint);
138         existing_id = ao2_find(endpoint->channel_ids, snapshot->uniqueid,
139                 OBJ_POINTER);
140         if (!existing_id) {
141                 ast_str_container_add(endpoint->channel_ids,
142                         snapshot->uniqueid);
143                 publish = 1;
144         }
145         ao2_unlock(endpoint);
146         if (publish) {
147                 endpoint_publish_snapshot(endpoint);
148         }
149 }
150
151 /*! \brief Handler for channel snapshot cache clears */
152 static void endpoint_cache_clear(void *data,
153         struct stasis_subscription *sub, struct stasis_topic *topic,
154         struct stasis_message *message)
155 {
156         struct ast_endpoint *endpoint = data;
157         struct stasis_message *clear_msg = stasis_message_data(message);
158         struct ast_channel_snapshot *clear_snapshot;
159
160         if (stasis_message_type(clear_msg) != ast_channel_snapshot_type()) {
161                 return;
162         }
163
164         clear_snapshot = stasis_message_data(clear_msg);
165
166         ast_assert(endpoint != NULL);
167
168         ao2_lock(endpoint);
169         ast_str_container_remove(endpoint->channel_ids, clear_snapshot->uniqueid);
170         ao2_unlock(endpoint);
171         endpoint_publish_snapshot(endpoint);
172 }
173
174 static void endpoint_default(void *data,
175         struct stasis_subscription *sub, struct stasis_topic *topic,
176         struct stasis_message *message)
177 {
178         struct stasis_endpoint *endpoint = data;
179
180         if (stasis_subscription_final_message(sub, message)) {
181                 ao2_cleanup(endpoint);
182         }
183 }
184
185 struct ast_endpoint *ast_endpoint_create(const char *tech, const char *resource)
186 {
187         RAII_VAR(struct ast_endpoint *, endpoint, NULL, ao2_cleanup);
188         int r = 0;
189
190         if (ast_strlen_zero(tech)) {
191                 ast_log(LOG_ERROR, "Endpoint tech cannot be empty\n");
192                 return NULL;
193         }
194
195         if (ast_strlen_zero(resource)) {
196                 ast_log(LOG_ERROR, "Endpoint resource cannot be empty\n");
197                 return NULL;
198         }
199
200         endpoint = ao2_alloc(sizeof(*endpoint), endpoint_dtor);
201         if (!endpoint) {
202                 return NULL;
203         }
204
205         endpoint->max_channels = -1;
206         endpoint->state = AST_ENDPOINT_UNKNOWN;
207
208         if (ast_string_field_init(endpoint, 80) != 0) {
209                 return NULL;
210         }
211
212         ast_string_field_set(endpoint, tech, tech);
213         ast_string_field_set(endpoint, resource, resource);
214         ast_string_field_build(endpoint, id, "%s/%s", tech, resource);
215
216         /* All access to channel_ids should be covered by the endpoint's
217          * lock; no extra lock needed. */
218         endpoint->channel_ids = ast_str_container_alloc_options(
219                 AO2_ALLOC_OPT_LOCK_NOLOCK, ENDPOINT_BUCKETS);
220         if (!endpoint->channel_ids) {
221                 return NULL;
222         }
223
224         endpoint->topic = stasis_topic_create(endpoint->id);
225         if (!endpoint->topic) {
226                 return NULL;
227         }
228
229         endpoint->forward =
230                 stasis_forward_all(endpoint->topic, ast_endpoint_topic_all());
231         if (!endpoint->forward) {
232                 return NULL;
233         }
234
235         endpoint->router = stasis_message_router_create(endpoint->topic);
236         if (!endpoint->router) {
237                 return NULL;
238         }
239         r |= stasis_message_router_add(endpoint->router,
240                 ast_channel_snapshot_type(), endpoint_channel_snapshot,
241                 endpoint);
242         r |= stasis_message_router_add(endpoint->router,
243                 stasis_cache_clear_type(), endpoint_cache_clear,
244                 endpoint);
245         r |= stasis_message_router_set_default(endpoint->router,
246                 endpoint_default, endpoint);
247
248         endpoint_publish_snapshot(endpoint);
249
250         ao2_ref(endpoint, +1);
251         return endpoint;
252 }
253
254 const char *ast_endpoint_get_tech(const struct ast_endpoint *endpoint)
255 {
256         ast_assert(endpoint != NULL);
257         return endpoint->tech;
258 }
259
260 static struct stasis_message *create_endpoint_snapshot_message(struct ast_endpoint *endpoint)
261 {
262         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
263         snapshot = ast_endpoint_snapshot_create(endpoint);
264         if (!snapshot) {
265                 return NULL;
266         }
267
268         return stasis_message_create(ast_endpoint_snapshot_type(), snapshot);
269 }
270
271 void ast_endpoint_shutdown(struct ast_endpoint *endpoint)
272 {
273         RAII_VAR(struct stasis_message *, clear_msg, NULL, ao2_cleanup);
274
275         if (endpoint == NULL) {
276                 return;
277         }
278
279         clear_msg = create_endpoint_snapshot_message(endpoint);
280         if (clear_msg) {
281                 RAII_VAR(struct stasis_message *, message, NULL, ao2_cleanup);
282                 message = stasis_cache_clear_create(clear_msg);
283                 if (message) {
284                         stasis_publish(endpoint->topic, message);
285                 }
286         }
287
288         /* Bump refcount to hold on to the router */
289         ao2_ref(endpoint->router, +1);
290         stasis_message_router_unsubscribe(endpoint->router);
291 }
292
293 const char *ast_endpoint_get_resource(const struct ast_endpoint *endpoint)
294 {
295         return endpoint->resource;
296 }
297
298 struct stasis_topic *ast_endpoint_topic(struct ast_endpoint *endpoint)
299 {
300         return endpoint ? endpoint->topic : ast_endpoint_topic_all();
301 }
302
303 void ast_endpoint_set_state(struct ast_endpoint *endpoint,
304         enum ast_endpoint_state state)
305 {
306         ast_assert(endpoint != NULL);
307         ao2_lock(endpoint);
308         endpoint->state = state;
309         ao2_unlock(endpoint);
310         endpoint_publish_snapshot(endpoint);
311 }
312
313 void ast_endpoint_set_max_channels(struct ast_endpoint *endpoint,
314         int max_channels)
315 {
316         ast_assert(endpoint != NULL);
317         ao2_lock(endpoint);
318         endpoint->max_channels = max_channels;
319         ao2_unlock(endpoint);
320         endpoint_publish_snapshot(endpoint);
321 }
322
323 static void endpoint_snapshot_dtor(void *obj)
324 {
325         struct ast_endpoint_snapshot *snapshot = obj;
326
327         ast_assert(snapshot != NULL);
328         ast_string_field_free_memory(snapshot);
329 }
330
331 struct ast_endpoint_snapshot *ast_endpoint_snapshot_create(
332         struct ast_endpoint *endpoint)
333 {
334         RAII_VAR(struct ast_endpoint_snapshot *, snapshot, NULL, ao2_cleanup);
335         int channel_count;
336         struct ao2_iterator i;
337         void *obj;
338         SCOPED_AO2LOCK(lock, endpoint);
339
340         channel_count = ao2_container_count(endpoint->channel_ids);
341
342         snapshot = ao2_alloc(
343                 sizeof(*snapshot) + channel_count * sizeof(char *),
344                 endpoint_snapshot_dtor);
345
346         if (ast_string_field_init(snapshot, 80) != 0) {
347                 return NULL;
348         }
349
350         ast_string_field_build(snapshot, id, "%s/%s", endpoint->tech,
351                 endpoint->resource);
352         ast_string_field_set(snapshot, tech, endpoint->tech);
353         ast_string_field_set(snapshot, resource, endpoint->resource);
354
355         snapshot->state = endpoint->state;
356         snapshot->max_channels = endpoint->max_channels;
357
358         i = ao2_iterator_init(endpoint->channel_ids, 0);
359         while ((obj = ao2_iterator_next(&i))) {
360                 RAII_VAR(char *, channel_id, obj, ao2_cleanup);
361                 snapshot->channel_ids[snapshot->num_channels++] = channel_id;
362         }
363         ao2_iterator_destroy(&i);
364
365         ao2_ref(snapshot, +1);
366         return snapshot;
367 }