Stasis: Fix unsafe use of stasis_unsubscribe in modules.
[asterisk/asterisk.git] / res / res_stasis_device_state.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Kevin Harwell <kharwell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend type="module">res_stasis</depend>
21         <support_level>core</support_level>
22  ***/
23
24 #include "asterisk.h"
25
26 ASTERISK_REGISTER_FILE()
27
28 #include "asterisk/astdb.h"
29 #include "asterisk/astobj2.h"
30 #include "asterisk/module.h"
31 #include "asterisk/stasis_app_impl.h"
32 #include "asterisk/stasis_app_device_state.h"
33
34 #define DEVICE_STATE_SIZE 64
35 /*! astdb family name */
36 #define DEVICE_STATE_FAMILY "StasisDeviceState"
37 /*! Stasis device state provider */
38 #define DEVICE_STATE_PROVIDER_STASIS "Stasis"
39 /*! Scheme for custom device states */
40 #define DEVICE_STATE_SCHEME_STASIS "Stasis:"
41 /*! Scheme for device state subscriptions */
42 #define DEVICE_STATE_SCHEME_SUB "deviceState:"
43
44 /*! Number of hash buckets for device state subscriptions */
45 #define DEVICE_STATE_BUCKETS 37
46
47 /*! Container for subscribed device states */
48 static struct ao2_container *device_state_subscriptions;
49
50 /*!
51  * \brief Device state subscription object.
52  */
53 struct device_state_subscription {
54         AST_DECLARE_STRING_FIELDS(
55                 AST_STRING_FIELD(app_name);
56                 AST_STRING_FIELD(device_name);
57         );
58         /*! The subscription object */
59         struct stasis_subscription *sub;
60 };
61
62 static int device_state_subscriptions_hash(const void *obj, const int flags)
63 {
64         const struct device_state_subscription *object;
65
66         switch (flags & OBJ_SEARCH_MASK) {
67         case OBJ_SEARCH_OBJECT:
68                 object = obj;
69                 return ast_str_hash(object->device_name);
70         case OBJ_SEARCH_KEY:
71         default:
72                 /* Hash can only work on something with a full key. */
73                 ast_assert(0);
74                 return 0;
75         }
76 }
77
78 static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
79 {
80         const struct device_state_subscription *object_left = obj;
81         const struct device_state_subscription *object_right = arg;
82         int cmp;
83
84         switch (flags & OBJ_SEARCH_MASK) {
85         case OBJ_SEARCH_OBJECT:
86                 /* find objects matching both device and app names */
87                 if (strcmp(object_left->device_name,
88                            object_right->device_name)) {
89                         return 0;
90                 }
91                 cmp = strcmp(object_left->app_name, object_right->app_name);
92                 break;
93         case OBJ_SEARCH_KEY:
94         case OBJ_SEARCH_PARTIAL_KEY:
95                 ast_assert(0); /* not supported by container */
96                 /* fall through */
97         default:
98                 cmp = 0;
99                 break;
100         }
101
102         return cmp ? 0 : CMP_MATCH | CMP_STOP;
103 }
104
105 static void device_state_subscription_destroy(void *obj)
106 {
107         struct device_state_subscription *sub = obj;
108         sub->sub = stasis_unsubscribe_and_join(sub->sub);
109         ast_string_field_free_memory(sub);
110 }
111
112 static struct device_state_subscription *device_state_subscription_create(
113         const struct stasis_app *app, const char *device_name)
114 {
115         struct device_state_subscription *sub = ao2_alloc(
116                 sizeof(*sub), device_state_subscription_destroy);
117         const char *app_name = stasis_app_name(app);
118         size_t size = strlen(device_name) + strlen(app_name) + 2;
119
120         if (!sub) {
121                 return NULL;
122         }
123
124         if (ast_string_field_init(sub, size)) {
125                 ao2_ref(sub, -1);
126                 return NULL;
127         }
128
129         ast_string_field_set(sub, app_name, app_name);
130         ast_string_field_set(sub, device_name, device_name);
131         return sub;
132 }
133
134 static struct device_state_subscription *find_device_state_subscription(
135         struct stasis_app *app, const char *name)
136 {
137         struct device_state_subscription dummy_sub = {
138                 .app_name = stasis_app_name(app),
139                 .device_name = name
140         };
141
142         return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT);
143 }
144
145 static void remove_device_state_subscription(
146         struct device_state_subscription *sub)
147 {
148         ao2_unlink(device_state_subscriptions, sub);
149 }
150
151 struct ast_json *stasis_app_device_state_to_json(
152         const char *name, enum ast_device_state state)
153 {
154         return ast_json_pack("{s: s, s: s}",
155                              "name", name,
156                              "state", ast_devstate_str(state));
157 }
158
159 struct ast_json *stasis_app_device_states_to_json(void)
160 {
161         struct ast_json *array = ast_json_array_create();
162         RAII_VAR(struct ast_db_entry *, tree,
163                  ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
164         struct ast_db_entry *entry;
165
166         for (entry = tree; entry; entry = entry->next) {
167                 const char *name = strrchr(entry->key, '/');
168                 if (!ast_strlen_zero(name)) {
169                         struct ast_str *device = ast_str_alloca(DEVICE_STATE_SIZE);
170                         ast_str_set(&device, 0, "%s%s",
171                                     DEVICE_STATE_SCHEME_STASIS, ++name);
172                         ast_json_array_append(
173                                 array, stasis_app_device_state_to_json(
174                                         ast_str_buffer(device),
175                                         ast_device_state(ast_str_buffer(device))));
176                 }
177         }
178
179         return array;
180 }
181
182 static void send_device_state(struct device_state_subscription *sub,
183                               const char *name, enum ast_device_state state)
184 {
185         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
186
187         json = ast_json_pack("{s:s, s:s, s:o, s:o}",
188                              "type", "DeviceStateChanged",
189                              "application", sub->app_name,
190                              "timestamp", ast_json_timeval(ast_tvnow(), NULL),
191                              "device_state", stasis_app_device_state_to_json(
192                                      name, state));
193
194         if (!json) {
195                 ast_log(LOG_ERROR, "Unable to create device state json object\n");
196                 return;
197         }
198
199         stasis_app_send(sub->app_name, json);
200 }
201
202 enum stasis_device_state_result stasis_app_device_state_update(
203         const char *name, const char *value)
204 {
205         size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
206         enum ast_device_state state;
207
208         ast_debug(3, "Updating device name = %s, value = %s", name, value);
209
210         if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
211                 ast_log(LOG_ERROR, "Update can only be used to set "
212                         "'%s' device state!\n", DEVICE_STATE_SCHEME_STASIS);
213                 return STASIS_DEVICE_STATE_NOT_CONTROLLED;
214         }
215
216         name += size;
217         if (ast_strlen_zero(name)) {
218                 ast_log(LOG_ERROR, "Update requires custom device name!\n");
219                 return STASIS_DEVICE_STATE_MISSING;
220         }
221
222         if (!value || (state = ast_devstate_val(value)) == AST_DEVICE_UNKNOWN) {
223                 ast_log(LOG_ERROR, "Unknown device state "
224                         "value '%s'\n", value);
225                 return STASIS_DEVICE_STATE_UNKNOWN;
226         }
227
228         ast_db_put(DEVICE_STATE_FAMILY, name, value);
229         ast_devstate_changed(state, AST_DEVSTATE_CACHABLE, "%s%s",
230                              DEVICE_STATE_SCHEME_STASIS, name);
231
232         return STASIS_DEVICE_STATE_OK;
233 }
234
235 enum stasis_device_state_result stasis_app_device_state_delete(const char *name)
236 {
237         const char *full_name = name;
238         size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
239
240         if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
241                 ast_log(LOG_ERROR, "Can only delete '%s' device states!\n",
242                         DEVICE_STATE_SCHEME_STASIS);
243                 return STASIS_DEVICE_STATE_NOT_CONTROLLED;
244         }
245
246         name += size;
247         if (ast_strlen_zero(name)) {
248                 ast_log(LOG_ERROR, "Delete requires a device name!\n");
249                 return STASIS_DEVICE_STATE_MISSING;
250         }
251
252         if (ast_device_state_clear_cache(full_name)) {
253                 return STASIS_DEVICE_STATE_UNKNOWN;
254         }
255
256         ast_db_del(DEVICE_STATE_FAMILY, name);
257
258         /* send state change for delete */
259         ast_devstate_changed(
260                 AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "%s%s",
261                 DEVICE_STATE_SCHEME_STASIS, name);
262
263         return STASIS_DEVICE_STATE_OK;
264 }
265
266 static void populate_cache(void)
267 {
268         RAII_VAR(struct ast_db_entry *, tree,
269                  ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
270         struct ast_db_entry *entry;
271
272         for (entry = tree; entry; entry = entry->next) {
273                 const char *name = strrchr(entry->key, '/');
274                 if (!ast_strlen_zero(name)) {
275                         ast_devstate_changed(
276                                 ast_devstate_val(entry->data),
277                                 AST_DEVSTATE_CACHABLE, "%s%s\n",
278                                 DEVICE_STATE_SCHEME_STASIS, name + 1);
279                 }
280         }
281 }
282
283 static enum ast_device_state stasis_device_state_cb(const char *data)
284 {
285         char buf[DEVICE_STATE_SIZE] = "";
286
287         ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
288
289         return ast_devstate_val(buf);
290 }
291
292 static void device_state_cb(void *data, struct stasis_subscription *sub,
293                             struct stasis_message *msg)
294 {
295         struct ast_device_state_message *device_state;
296
297         if (ast_device_state_message_type() != stasis_message_type(msg)) {
298                 return;
299         }
300
301         device_state = stasis_message_data(msg);
302         if (device_state->eid) {
303                 /* ignore non-aggregate states */
304                 return;
305         }
306
307         send_device_state(data, device_state->device, device_state->state);
308 }
309
310 static void *find_device_state(const struct stasis_app *app, const char *name)
311 {
312         return device_state_subscription_create(app, name);
313 }
314
315 static int is_subscribed_device_state(struct stasis_app *app, const char *name)
316 {
317         RAII_VAR(struct device_state_subscription *, sub,
318                  find_device_state_subscription(app, name), ao2_cleanup);
319         return sub != NULL;
320 }
321
322 static int subscribe_device_state(struct stasis_app *app, void *obj)
323 {
324         struct device_state_subscription *sub = obj;
325
326         ast_debug(3, "Subscribing to device %s", sub->device_name);
327
328         if (is_subscribed_device_state(app, sub->device_name)) {
329                 ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
330                 return 0;
331         }
332
333         if (!(sub->sub = stasis_subscribe_pool(
334                         ast_device_state_topic(sub->device_name),
335                         device_state_cb, sub))) {
336                 ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
337                         sub->device_name);
338                 return -1;
339         }
340
341         ao2_link(device_state_subscriptions, sub);
342         return 0;
343 }
344
345 static int unsubscribe_device_state(struct stasis_app *app, const char *name)
346 {
347         RAII_VAR(struct device_state_subscription *, sub,
348                  find_device_state_subscription(app, name), ao2_cleanup);
349         remove_device_state_subscription(sub);
350         return 0;
351 }
352
353 static int device_to_json_cb(void *obj, void *arg, void *data, int flags)
354 {
355         struct device_state_subscription *sub = obj;
356         const char *app_name = arg;
357         struct ast_json *array = data;
358
359         if (strcmp(sub->app_name, app_name)) {
360                 return 0;
361         }
362
363         ast_json_array_append(
364                 array, ast_json_string_create(sub->device_name));
365         return 0;
366
367 }
368
369 static void devices_to_json(const struct stasis_app *app, struct ast_json *json)
370 {
371         struct ast_json *array = ast_json_array_create();
372         ao2_callback_data(device_state_subscriptions, OBJ_NODATA,
373                           device_to_json_cb, (void *)stasis_app_name(app), array);
374         ast_json_object_set(json, "device_names", array);
375 }
376
377 struct stasis_app_event_source device_state_event_source = {
378         .scheme = DEVICE_STATE_SCHEME_SUB,
379         .find = find_device_state,
380         .subscribe = subscribe_device_state,
381         .unsubscribe = unsubscribe_device_state,
382         .is_subscribed = is_subscribed_device_state,
383         .to_json = devices_to_json
384 };
385
386 static int load_module(void)
387 {
388         populate_cache();
389         if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
390                                   stasis_device_state_cb)) {
391                 return AST_MODULE_LOAD_FAILURE;
392         }
393
394         if (!(device_state_subscriptions = ao2_container_alloc(
395                       DEVICE_STATE_BUCKETS, device_state_subscriptions_hash,
396                       device_state_subscriptions_cmp))) {
397                 return AST_MODULE_LOAD_FAILURE;
398         }
399
400         stasis_app_register_event_source(&device_state_event_source);
401         return AST_MODULE_LOAD_SUCCESS;
402 }
403
404 static int unload_module(void)
405 {
406         ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
407         stasis_app_unregister_event_source(&device_state_event_source);
408         ao2_cleanup(device_state_subscriptions);
409         device_state_subscriptions = NULL;
410         return 0;
411 }
412
413 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
414         .support_level = AST_MODULE_SUPPORT_CORE,
415         .load = load_module,
416         .unload = unload_module,
417         .nonoptreq = "res_stasis"
418 );