Merge "rtp_engine/res_rtp_asterisk: Fix RTP struct reentrancy crashes."
[asterisk/asterisk.git] / res / res_stasis_device_state.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * Kevin Harwell <kharwell@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*** MODULEINFO
20         <depend type="module">res_stasis</depend>
21         <support_level>core</support_level>
22  ***/
23
24 #include "asterisk.h"
25
26 #include "asterisk/astdb.h"
27 #include "asterisk/astobj2.h"
28 #include "asterisk/module.h"
29 #include "asterisk/stasis_app_impl.h"
30 #include "asterisk/stasis_app_device_state.h"
31
32 #define DEVICE_STATE_SIZE 64
33 /*! astdb family name */
34 #define DEVICE_STATE_FAMILY "StasisDeviceState"
35 /*! Stasis device state provider */
36 #define DEVICE_STATE_PROVIDER_STASIS "Stasis"
37 /*! Scheme for custom device states */
38 #define DEVICE_STATE_SCHEME_STASIS "Stasis:"
39 /*! Scheme for device state subscriptions */
40 #define DEVICE_STATE_SCHEME_SUB "deviceState:"
41
42 /*! Number of hash buckets for device state subscriptions */
43 #define DEVICE_STATE_BUCKETS 37
44
45 /*! The key used for tracking a subscription to all device states */
46 #define DEVICE_STATE_ALL "__AST_DEVICE_STATE_ALL_TOPIC"
47
48 /*! Container for subscribed device states */
49 static struct ao2_container *device_state_subscriptions;
50
51 /*!
52  * \brief Device state subscription object.
53  */
54 struct device_state_subscription {
55         AST_DECLARE_STRING_FIELDS(
56                 AST_STRING_FIELD(app_name);
57                 AST_STRING_FIELD(device_name);
58         );
59         /*! The subscription object */
60         struct stasis_subscription *sub;
61 };
62
63 static int device_state_subscriptions_hash(const void *obj, const int flags)
64 {
65         const struct device_state_subscription *object;
66
67         switch (flags & OBJ_SEARCH_MASK) {
68         case OBJ_SEARCH_OBJECT:
69                 object = obj;
70                 return ast_str_hash(object->device_name);
71         case OBJ_SEARCH_KEY:
72         default:
73                 /* Hash can only work on something with a full key. */
74                 ast_assert(0);
75                 return 0;
76         }
77 }
78
79 static int device_state_subscriptions_cmp(void *obj, void *arg, int flags)
80 {
81         const struct device_state_subscription *object_left = obj;
82         const struct device_state_subscription *object_right = arg;
83         int cmp;
84
85         switch (flags & OBJ_SEARCH_MASK) {
86         case OBJ_SEARCH_OBJECT:
87                 /* find objects matching both device and app names */
88                 if (strcmp(object_left->device_name,
89                            object_right->device_name)) {
90                         return 0;
91                 }
92                 cmp = strcmp(object_left->app_name, object_right->app_name);
93                 break;
94         case OBJ_SEARCH_KEY:
95         case OBJ_SEARCH_PARTIAL_KEY:
96                 ast_assert(0); /* not supported by container */
97                 /* fall through */
98         default:
99                 cmp = 0;
100                 break;
101         }
102
103         return cmp ? 0 : CMP_MATCH | CMP_STOP;
104 }
105
106 static void device_state_subscription_destroy(void *obj)
107 {
108         struct device_state_subscription *sub = obj;
109         sub->sub = stasis_unsubscribe_and_join(sub->sub);
110         ast_string_field_free_memory(sub);
111 }
112
113 static struct device_state_subscription *device_state_subscription_create(
114         const struct stasis_app *app, const char *device_name)
115 {
116         struct device_state_subscription *sub;
117         const char *app_name = stasis_app_name(app);
118         size_t size;
119
120         if (ast_strlen_zero(device_name)) {
121                 device_name = DEVICE_STATE_ALL;
122         }
123
124         size = strlen(device_name) + strlen(app_name) + 2;
125
126         sub = ao2_alloc(sizeof(*sub), device_state_subscription_destroy);
127         if (!sub) {
128                 return NULL;
129         }
130
131         if (ast_string_field_init(sub, size)) {
132                 ao2_ref(sub, -1);
133                 return NULL;
134         }
135
136         ast_string_field_set(sub, app_name, app_name);
137         ast_string_field_set(sub, device_name, device_name);
138         return sub;
139 }
140
141 static struct device_state_subscription *find_device_state_subscription(
142         struct stasis_app *app, const char *name)
143 {
144         struct device_state_subscription dummy_sub = {
145                 .app_name = stasis_app_name(app),
146                 .device_name = name
147         };
148
149         return ao2_find(device_state_subscriptions, &dummy_sub, OBJ_SEARCH_OBJECT | OBJ_NOLOCK);
150 }
151
152 static void remove_device_state_subscription(
153         struct device_state_subscription *sub)
154 {
155         ao2_unlink_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
156 }
157
158 struct ast_json *stasis_app_device_state_to_json(
159         const char *name, enum ast_device_state state)
160 {
161         return ast_json_pack("{s: s, s: s}",
162                              "name", name,
163                              "state", ast_devstate_str(state));
164 }
165
166 struct ast_json *stasis_app_device_states_to_json(void)
167 {
168         struct ast_json *array = ast_json_array_create();
169         RAII_VAR(struct ast_db_entry *, tree,
170                  ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
171         struct ast_db_entry *entry;
172
173         for (entry = tree; entry; entry = entry->next) {
174                 const char *name = strrchr(entry->key, '/');
175                 if (!ast_strlen_zero(name)) {
176                         struct ast_str *device = ast_str_alloca(DEVICE_STATE_SIZE);
177                         ast_str_set(&device, 0, "%s%s",
178                                     DEVICE_STATE_SCHEME_STASIS, ++name);
179                         ast_json_array_append(
180                                 array, stasis_app_device_state_to_json(
181                                         ast_str_buffer(device),
182                                         ast_device_state(ast_str_buffer(device))));
183                 }
184         }
185
186         return array;
187 }
188
189 static void send_device_state(struct device_state_subscription *sub,
190                               const char *name, enum ast_device_state state)
191 {
192         RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
193
194         json = ast_json_pack("{s:s, s:s, s:o, s:o}",
195                              "type", "DeviceStateChanged",
196                              "application", sub->app_name,
197                              "timestamp", ast_json_timeval(ast_tvnow(), NULL),
198                              "device_state", stasis_app_device_state_to_json(
199                                      name, state));
200
201         if (!json) {
202                 ast_log(LOG_ERROR, "Unable to create device state json object\n");
203                 return;
204         }
205
206         stasis_app_send(sub->app_name, json);
207 }
208
209 enum stasis_device_state_result stasis_app_device_state_update(
210         const char *name, const char *value)
211 {
212         size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
213         enum ast_device_state state;
214
215         ast_debug(3, "Updating device name = %s, value = %s", name, value);
216
217         if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
218                 ast_log(LOG_ERROR, "Update can only be used to set "
219                         "'%s' device state!\n", DEVICE_STATE_SCHEME_STASIS);
220                 return STASIS_DEVICE_STATE_NOT_CONTROLLED;
221         }
222
223         name += size;
224         if (ast_strlen_zero(name)) {
225                 ast_log(LOG_ERROR, "Update requires custom device name!\n");
226                 return STASIS_DEVICE_STATE_MISSING;
227         }
228
229         if (!value || (state = ast_devstate_val(value)) == AST_DEVICE_UNKNOWN) {
230                 ast_log(LOG_ERROR, "Unknown device state "
231                         "value '%s'\n", value);
232                 return STASIS_DEVICE_STATE_UNKNOWN;
233         }
234
235         ast_db_put(DEVICE_STATE_FAMILY, name, value);
236         ast_devstate_changed(state, AST_DEVSTATE_CACHABLE, "%s%s",
237                              DEVICE_STATE_SCHEME_STASIS, name);
238
239         return STASIS_DEVICE_STATE_OK;
240 }
241
242 enum stasis_device_state_result stasis_app_device_state_delete(const char *name)
243 {
244         const char *full_name = name;
245         size_t size = strlen(DEVICE_STATE_SCHEME_STASIS);
246
247         if (strncasecmp(name, DEVICE_STATE_SCHEME_STASIS, size)) {
248                 ast_log(LOG_ERROR, "Can only delete '%s' device states!\n",
249                         DEVICE_STATE_SCHEME_STASIS);
250                 return STASIS_DEVICE_STATE_NOT_CONTROLLED;
251         }
252
253         name += size;
254         if (ast_strlen_zero(name)) {
255                 ast_log(LOG_ERROR, "Delete requires a device name!\n");
256                 return STASIS_DEVICE_STATE_MISSING;
257         }
258
259         if (ast_device_state_clear_cache(full_name)) {
260                 return STASIS_DEVICE_STATE_UNKNOWN;
261         }
262
263         ast_db_del(DEVICE_STATE_FAMILY, name);
264
265         /* send state change for delete */
266         ast_devstate_changed(
267                 AST_DEVICE_UNKNOWN, AST_DEVSTATE_CACHABLE, "%s%s",
268                 DEVICE_STATE_SCHEME_STASIS, name);
269
270         return STASIS_DEVICE_STATE_OK;
271 }
272
273 static void populate_cache(void)
274 {
275         RAII_VAR(struct ast_db_entry *, tree,
276                  ast_db_gettree(DEVICE_STATE_FAMILY, NULL), ast_db_freetree);
277         struct ast_db_entry *entry;
278
279         for (entry = tree; entry; entry = entry->next) {
280                 const char *name = strrchr(entry->key, '/');
281                 if (!ast_strlen_zero(name)) {
282                         ast_devstate_changed(
283                                 ast_devstate_val(entry->data),
284                                 AST_DEVSTATE_CACHABLE, "%s%s\n",
285                                 DEVICE_STATE_SCHEME_STASIS, name + 1);
286                 }
287         }
288 }
289
290 static enum ast_device_state stasis_device_state_cb(const char *data)
291 {
292         char buf[DEVICE_STATE_SIZE] = "";
293
294         ast_db_get(DEVICE_STATE_FAMILY, data, buf, sizeof(buf));
295
296         return ast_devstate_val(buf);
297 }
298
299 static void device_state_cb(void *data, struct stasis_subscription *sub,
300                             struct stasis_message *msg)
301 {
302         struct ast_device_state_message *device_state;
303
304         if (stasis_subscription_final_message(sub, msg)) {
305                 /* Remove stasis subscription's reference to device_state_subscription */
306                 ao2_ref(data, -1);
307                 return;
308         }
309
310         if (ast_device_state_message_type() != stasis_message_type(msg)) {
311                 return;
312         }
313
314         device_state = stasis_message_data(msg);
315         if (device_state->eid) {
316                 /* ignore non-aggregate states */
317                 return;
318         }
319
320         send_device_state(data, device_state->device, device_state->state);
321 }
322
323 static void *find_device_state(const struct stasis_app *app, const char *name)
324 {
325         return device_state_subscription_create(app, name);
326 }
327
328 static int is_subscribed_device_state(struct stasis_app *app, const char *name)
329 {
330         struct device_state_subscription *sub;
331
332         sub = find_device_state_subscription(app, DEVICE_STATE_ALL);
333         if (sub) {
334                 ao2_ref(sub, -1);
335                 return 1;
336         }
337
338         sub = find_device_state_subscription(app, name);
339         if (sub) {
340                 ao2_ref(sub, -1);
341                 return 1;
342         }
343
344         return 0;
345 }
346
347 static int is_subscribed_device_state_lock(struct stasis_app *app, const char *name)
348 {
349         int is_subscribed;
350
351         ao2_lock(device_state_subscriptions);
352         is_subscribed = is_subscribed_device_state(app, name);
353         ao2_unlock(device_state_subscriptions);
354
355         return is_subscribed;
356 }
357
358 static int subscribe_device_state(struct stasis_app *app, void *obj)
359 {
360         struct device_state_subscription *sub = obj;
361         struct stasis_topic *topic;
362
363         if (!sub) {
364                 sub = device_state_subscription_create(app, NULL);
365                 if (!sub) {
366                         return -1;
367                 }
368         }
369
370         if (strcmp(sub->device_name, DEVICE_STATE_ALL)) {
371                 topic = ast_device_state_topic(sub->device_name);
372         } else {
373                 topic = ast_device_state_topic_all();
374         }
375
376         ao2_lock(device_state_subscriptions);
377
378         if (is_subscribed_device_state(app, sub->device_name)) {
379                 ao2_unlock(device_state_subscriptions);
380                 ast_debug(3, "App %s is already subscribed to %s\n", stasis_app_name(app), sub->device_name);
381                 return 0;
382         }
383
384         ast_debug(3, "Subscribing to device %s\n", sub->device_name);
385
386         sub->sub = stasis_subscribe_pool(topic, device_state_cb, ao2_bump(sub));
387         if (!sub->sub) {
388                 ao2_unlock(device_state_subscriptions);
389                 ast_log(LOG_ERROR, "Unable to subscribe to device %s\n",
390                         sub->device_name);
391                 /* Reference we added when attempting to stasis_subscribe_pool */
392                 ao2_ref(sub, -1);
393                 return -1;
394         }
395
396         ao2_link_flags(device_state_subscriptions, sub, OBJ_NOLOCK);
397         ao2_unlock(device_state_subscriptions);
398
399         return 0;
400 }
401
402 static int unsubscribe_device_state(struct stasis_app *app, const char *name)
403 {
404         struct device_state_subscription *sub;
405
406         ao2_lock(device_state_subscriptions);
407         sub = find_device_state_subscription(app, name);
408         if (sub) {
409                 remove_device_state_subscription(sub);
410         }
411         ao2_unlock(device_state_subscriptions);
412
413         ao2_cleanup(sub);
414
415         return 0;
416 }
417
418 static int device_to_json_cb(void *obj, void *arg, void *data, int flags)
419 {
420         struct device_state_subscription *sub = obj;
421         const char *app_name = arg;
422         struct ast_json *array = data;
423
424         if (strcmp(sub->app_name, app_name)) {
425                 return 0;
426         }
427
428         ast_json_array_append(
429                 array, ast_json_string_create(sub->device_name));
430         return 0;
431
432 }
433
434 static void devices_to_json(const struct stasis_app *app, struct ast_json *json)
435 {
436         struct ast_json *array = ast_json_array_create();
437         ao2_callback_data(device_state_subscriptions, OBJ_NODATA,
438                           device_to_json_cb, (void *)stasis_app_name(app), array);
439         ast_json_object_set(json, "device_names", array);
440 }
441
442 struct stasis_app_event_source device_state_event_source = {
443         .scheme = DEVICE_STATE_SCHEME_SUB,
444         .find = find_device_state,
445         .subscribe = subscribe_device_state,
446         .unsubscribe = unsubscribe_device_state,
447         .is_subscribed = is_subscribed_device_state_lock,
448         .to_json = devices_to_json
449 };
450
451 static int load_module(void)
452 {
453         populate_cache();
454         if (ast_devstate_prov_add(DEVICE_STATE_PROVIDER_STASIS,
455                                   stasis_device_state_cb)) {
456                 return AST_MODULE_LOAD_DECLINE;
457         }
458
459         if (!(device_state_subscriptions = ao2_container_alloc(
460                       DEVICE_STATE_BUCKETS, device_state_subscriptions_hash,
461                       device_state_subscriptions_cmp))) {
462                 ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
463                 return AST_MODULE_LOAD_DECLINE;
464         }
465
466         stasis_app_register_event_source(&device_state_event_source);
467         return AST_MODULE_LOAD_SUCCESS;
468 }
469
470 static int unload_module(void)
471 {
472         ast_devstate_prov_del(DEVICE_STATE_PROVIDER_STASIS);
473         stasis_app_unregister_event_source(&device_state_event_source);
474         ao2_cleanup(device_state_subscriptions);
475         device_state_subscriptions = NULL;
476         return 0;
477 }
478
479 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS, "Stasis application device state support",
480         .support_level = AST_MODULE_SUPPORT_CORE,
481         .load = load_module,
482         .unload = unload_module,
483         .nonoptreq = "res_stasis"
484 );