f8518ce09d2a5935f50d324fca5feeb78055b159
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/hashtab.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/utils.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \private */
46 struct stasis_caching_topic {
47         struct ao2_container *cache;
48         struct stasis_topic *topic;
49         struct stasis_subscription *sub;
50         snapshot_get_id id_fn;
51 };
52
53 static void stasis_caching_topic_dtor(void *obj) {
54         struct stasis_caching_topic *caching_topic = obj;
55         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
56         caching_topic->sub = NULL;
57         ao2_cleanup(caching_topic->cache);
58         caching_topic->cache = NULL;
59         ao2_cleanup(caching_topic->topic);
60         caching_topic->topic = NULL;
61 }
62
63 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
64 {
65         return caching_topic->topic;
66 }
67
68 void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
69 {
70         if (caching_topic) {
71                 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
72                         stasis_unsubscribe(caching_topic->sub);
73                 } else {
74                         ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
75                 }
76         }
77 }
78
79 struct cache_entry {
80         struct stasis_message_type *type;
81         char *id;
82         struct stasis_message *snapshot;
83 };
84
85 static void cache_entry_dtor(void *obj)
86 {
87         struct cache_entry *entry = obj;
88         ao2_cleanup(entry->type);
89         entry->type = NULL;
90         ast_free(entry->id);
91         entry->id = NULL;
92         ao2_cleanup(entry->snapshot);
93         entry->snapshot = NULL;
94 }
95
96 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
97 {
98         RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
99
100         ast_assert(type != NULL);
101         ast_assert(id != NULL);
102
103         entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
104         if (!entry) {
105                 return NULL;
106         }
107
108         entry->id = ast_strdup(id);
109         if (!entry->id) {
110                 return NULL;
111         }
112
113         ao2_ref(type, +1);
114         entry->type = type;
115         if (snapshot != NULL) {
116                 ao2_ref(snapshot, +1);
117                 entry->snapshot = snapshot;
118         }
119
120         ao2_ref(entry, +1);
121         return entry;
122 }
123
124 static int cache_entry_hash(const void *obj, int flags)
125 {
126         const struct cache_entry *entry = obj;
127         int hash = 0;
128
129         ast_assert(!(flags & OBJ_KEY));
130
131         hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
132         hash += ast_hashtab_hash_string(entry->id);
133         return hash;
134 }
135
136 static int cache_entry_cmp(void *obj, void *arg, int flags)
137 {
138         const struct cache_entry *left = obj;
139         const struct cache_entry *right = arg;
140
141         ast_assert(!(flags & OBJ_KEY));
142
143         if (left->type == right->type && strcmp(left->id, right->id) == 0) {
144                 return CMP_MATCH | CMP_STOP;
145         }
146
147         return 0;
148 }
149
150 static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
151 {
152         RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
153         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
154         struct stasis_message *old_snapshot = NULL;
155
156         ast_assert(caching_topic->cache != NULL);
157
158         new_entry = cache_entry_create(type, id, new_snapshot);
159
160         if (new_snapshot == NULL) {
161                 /* Remove entry from cache */
162                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
163                 if (cached_entry) {
164                         old_snapshot = cached_entry->snapshot;
165                         cached_entry->snapshot = NULL;
166                 }
167         } else {
168                 /* Insert/update cache */
169                 SCOPED_AO2LOCK(lock, caching_topic->cache);
170
171                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
172                 if (cached_entry) {
173                         /* Update cache. Because objects are moving, no need to update refcounts. */
174                         old_snapshot = cached_entry->snapshot;
175                         cached_entry->snapshot = new_entry->snapshot;
176                         new_entry->snapshot = NULL;
177                 } else {
178                         /* Insert into the cache */
179                         ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
180                 }
181
182         }
183
184         return old_snapshot;
185 }
186
187 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
188 {
189         RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
190         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
191
192         ast_assert(caching_topic->cache != NULL);
193
194         search_entry = cache_entry_create(type, id, NULL);
195         if (search_entry == NULL) {
196                 return NULL;
197         }
198
199         cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
200         if (cached_entry == NULL) {
201                 return NULL;
202         }
203
204         ast_assert(cached_entry->snapshot != NULL);
205         ao2_ref(cached_entry->snapshot, +1);
206         return cached_entry->snapshot;
207 }
208
209 struct cache_dump_data {
210         struct ao2_container *cached;
211         struct stasis_message_type *type;
212 };
213
214 static int cache_dump_cb(void *obj, void *arg, int flags)
215 {
216         struct cache_dump_data *cache_dump = arg;
217         struct cache_entry *entry = obj;
218
219         if (!cache_dump->type || entry->type == cache_dump->type) {
220                 ao2_link(cache_dump->cached, entry->snapshot);
221         }
222
223         return 0;
224 }
225
226 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
227 {
228         struct cache_dump_data cache_dump;
229
230         ast_assert(caching_topic->cache != NULL);
231
232         cache_dump.type = type;
233         cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
234         if (!cache_dump.cached) {
235                 return NULL;
236         }
237
238         ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
239         return cache_dump.cached;
240 }
241
242 static struct stasis_message_type *__cache_clear_data;
243
244 static struct stasis_message_type *cache_clear_data(void)
245 {
246         ast_assert(__cache_clear_data != NULL);
247         return __cache_clear_data;
248 }
249
250 static struct stasis_message_type *__cache_update;
251
252 struct stasis_message_type *stasis_cache_update(void)
253 {
254         ast_assert(__cache_update != NULL);
255         return __cache_update;
256 }
257
258 struct cache_clear_data {
259         struct stasis_message_type *type;
260         char *id;
261 };
262
263 static void cache_clear_data_dtor(void *obj)
264 {
265         struct cache_clear_data *ev = obj;
266         ast_free(ev->id);
267         ev->id = NULL;
268         ao2_cleanup(ev->type);
269         ev->type = NULL;
270 }
271
272 struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
273 {
274         RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
275         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
276
277         ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
278         if (!ev) {
279                 return NULL;
280         }
281
282         ev->id = ast_strdup(id);
283         if (!ev->id) {
284                 return NULL;
285         }
286         ao2_ref(type, +1);
287         ev->type = type;
288
289         msg = stasis_message_create(cache_clear_data(), ev);
290
291         if (!msg) {
292                 return NULL;
293         }
294
295         ao2_ref(msg, +1);
296         return msg;
297 }
298
299 static void stasis_cache_update_dtor(void *obj)
300 {
301         struct stasis_cache_update *update = obj;
302         ao2_cleanup(update->topic);
303         update->topic = NULL;
304         ao2_cleanup(update->old_snapshot);
305         update->old_snapshot = NULL;
306         ao2_cleanup(update->new_snapshot);
307         update->new_snapshot = NULL;
308         ao2_cleanup(update->type);
309         update->type = NULL;
310 }
311
312 static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
313 {
314         RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
315         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
316
317         ast_assert(topic != NULL);
318         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
319
320         update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
321         if (!update) {
322                 return NULL;
323         }
324
325         ao2_ref(topic, +1);
326         update->topic = topic;
327         if (old_snapshot) {
328                 ao2_ref(old_snapshot, +1);
329                 update->old_snapshot = old_snapshot;
330                 if (!new_snapshot) {
331                         ao2_ref(stasis_message_type(old_snapshot), +1);
332                         update->type = stasis_message_type(old_snapshot);
333                 }
334         }
335         if (new_snapshot) {
336                 ao2_ref(new_snapshot, +1);
337                 update->new_snapshot = new_snapshot;
338                 ao2_ref(stasis_message_type(new_snapshot), +1);
339                 update->type = stasis_message_type(new_snapshot);
340         }
341
342         msg = stasis_message_create(stasis_cache_update(), update);
343         if (!msg) {
344                 return NULL;
345         }
346
347         ao2_ref(msg, +1);
348         return msg;
349 }
350
351 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
352 {
353         RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
354         struct stasis_caching_topic *caching_topic = data;
355         const char *id = NULL;
356
357         ast_assert(caching_topic->topic != NULL);
358         ast_assert(caching_topic->id_fn != NULL);
359
360         if (stasis_subscription_final_message(sub, message)) {
361                 caching_topic_needs_unref = caching_topic;
362         }
363
364         /* Handle cache clear event */
365         if (cache_clear_data() == stasis_message_type(message)) {
366                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
367                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
368                 struct cache_clear_data *clear = stasis_message_data(message);
369                 ast_assert(clear->type != NULL);
370                 ast_assert(clear->id != NULL);
371                 old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
372                 if (old_snapshot) {
373                         update = update_create(topic, old_snapshot, NULL);
374                         stasis_publish(caching_topic->topic, update);
375                 } else {
376                         ast_log(LOG_ERROR,
377                                 "Attempting to remove an item from the cache that isn't there: %s %s\n",
378                                 stasis_message_type_name(clear->type), clear->id);
379                 }
380                 return;
381         }
382
383         id = caching_topic->id_fn(message);
384         if (id == NULL) {
385                 /* Object isn't cached; forward */
386                 stasis_forward_message(caching_topic->topic, topic, message);
387         } else {
388                 /* Update the cache */
389                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
390                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
391
392                 old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
393
394                 update = update_create(topic, old_snapshot, message);
395                 if (update == NULL) {
396                         return;
397                 }
398
399                 stasis_publish(caching_topic->topic, update);
400         }
401
402         if (stasis_subscription_final_message(sub, message)) {
403                 ao2_cleanup(caching_topic);
404         }
405 }
406
407 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
408 {
409         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
410         struct stasis_subscription *sub;
411         RAII_VAR(char *, new_name, NULL, free);
412         int ret;
413
414         ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
415         if (ret < 0) {
416                 return NULL;
417         }
418
419         caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
420         if (caching_topic == NULL) {
421                 return NULL;
422         }
423
424         caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
425         if (!caching_topic->cache) {
426                 ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
427                 return NULL;
428         }
429
430         caching_topic->topic = stasis_topic_create(new_name);
431         if (caching_topic->topic == NULL) {
432                 return NULL;
433         }
434
435         caching_topic->id_fn = id_fn;
436
437         sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
438         if (sub == NULL) {
439                 return NULL;
440         }
441         /* This is for the reference contained in the subscription above */
442         ao2_ref(caching_topic, +1);
443         caching_topic->sub = sub;
444
445         ao2_ref(caching_topic, +1);
446         return caching_topic;
447 }
448
449 static void stasis_cache_exit(void)
450 {
451         ao2_cleanup(__cache_clear_data);
452         __cache_clear_data = NULL;
453         ao2_cleanup(__cache_update);
454         __cache_update = NULL;
455 }
456
457 int stasis_cache_init(void)
458 {
459         ast_register_atexit(stasis_cache_exit);
460
461         if (__cache_clear_data || __cache_update) {
462                 ast_log(LOG_ERROR, "Stasis cache double initialized\n");
463                 return -1;
464         }
465
466         __cache_update = stasis_message_type_create("stasis_cache_update");
467         if (!__cache_update) {
468                 return -1;
469         }
470
471         __cache_clear_data = stasis_message_type_create("StasisCacheClear");
472         if (!__cache_clear_data) {
473                 return -1;
474         }
475         return 0;
476 }
477