Add message dump capability to stasis cache layer
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/hashtab.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/utils.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 struct stasis_caching_topic {
46         struct ao2_container *cache;
47         struct stasis_topic *topic;
48         struct stasis_subscription *sub;
49         snapshot_get_id id_fn;
50 };
51
52 static void stasis_caching_topic_dtor(void *obj) {
53         struct stasis_caching_topic *caching_topic = obj;
54         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
55         caching_topic->sub = NULL;
56         ao2_cleanup(caching_topic->cache);
57         caching_topic->cache = NULL;
58         ao2_cleanup(caching_topic->topic);
59         caching_topic->topic = NULL;
60 }
61
62 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
63 {
64         return caching_topic->topic;
65 }
66
67 void stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
68 {
69         if (caching_topic) {
70                 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
71                         stasis_unsubscribe(caching_topic->sub);
72                 } else {
73                         ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
74                 }
75         }
76 }
77
78 struct cache_entry {
79         struct stasis_message_type *type;
80         char *id;
81         struct stasis_message *snapshot;
82 };
83
84 static void cache_entry_dtor(void *obj)
85 {
86         struct cache_entry *entry = obj;
87         ao2_cleanup(entry->type);
88         entry->type = NULL;
89         ast_free(entry->id);
90         entry->id = NULL;
91         ao2_cleanup(entry->snapshot);
92         entry->snapshot = NULL;
93 }
94
95 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
96 {
97         RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
98
99         ast_assert(type != NULL);
100         ast_assert(id != NULL);
101
102         entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
103         if (!entry) {
104                 return NULL;
105         }
106
107         entry->id = ast_strdup(id);
108         if (!entry->id) {
109                 return NULL;
110         }
111
112         ao2_ref(type, +1);
113         entry->type = type;
114         if (snapshot != NULL) {
115                 ao2_ref(snapshot, +1);
116                 entry->snapshot = snapshot;
117         }
118
119         ao2_ref(entry, +1);
120         return entry;
121 }
122
123 static int cache_entry_hash(const void *obj, int flags)
124 {
125         const struct cache_entry *entry = obj;
126         int hash = 0;
127
128         ast_assert(!(flags & OBJ_KEY));
129
130         hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
131         hash += ast_hashtab_hash_string(entry->id);
132         return hash;
133 }
134
135 static int cache_entry_cmp(void *obj, void *arg, int flags)
136 {
137         const struct cache_entry *left = obj;
138         const struct cache_entry *right = arg;
139
140         ast_assert(!(flags & OBJ_KEY));
141
142         if (left->type == right->type && strcmp(left->id, right->id) == 0) {
143                 return CMP_MATCH | CMP_STOP;
144         }
145
146         return 0;
147 }
148
149 static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
150 {
151         RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
152         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
153         struct stasis_message *old_snapshot = NULL;
154
155         ast_assert(caching_topic->cache != NULL);
156
157         new_entry = cache_entry_create(type, id, new_snapshot);
158
159         if (new_snapshot == NULL) {
160                 /* Remove entry from cache */
161                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
162                 if (cached_entry) {
163                         old_snapshot = cached_entry->snapshot;
164                         cached_entry->snapshot = NULL;
165                 }
166         } else {
167                 /* Insert/update cache */
168                 SCOPED_AO2LOCK(lock, caching_topic->cache);
169
170                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
171                 if (cached_entry) {
172                         /* Update cache. Because objects are moving, no need to update refcounts. */
173                         old_snapshot = cached_entry->snapshot;
174                         cached_entry->snapshot = new_entry->snapshot;
175                         new_entry->snapshot = NULL;
176                 } else {
177                         /* Insert into the cache */
178                         ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
179                 }
180
181         }
182
183         return old_snapshot;
184 }
185
186 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
187 {
188         RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
189         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
190
191         ast_assert(caching_topic->cache != NULL);
192
193         search_entry = cache_entry_create(type, id, NULL);
194         if (search_entry == NULL) {
195                 return NULL;
196         }
197
198         cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
199         if (cached_entry == NULL) {
200                 return NULL;
201         }
202
203         ast_assert(cached_entry->snapshot != NULL);
204         ao2_ref(cached_entry->snapshot, +1);
205         return cached_entry->snapshot;
206 }
207
208 struct cache_dump_data {
209         struct ao2_container *cached;
210         struct stasis_message_type *type;
211 };
212
213 static int cache_dump_cb(void *obj, void *arg, int flags)
214 {
215         struct cache_dump_data *cache_dump = arg;
216         struct cache_entry *entry = obj;
217
218         if (!cache_dump->type || entry->type == cache_dump->type) {
219                 ao2_link(cache_dump->cached, entry->snapshot);
220         }
221
222         return 0;
223 }
224
225 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
226 {
227         struct cache_dump_data cache_dump;
228
229         ast_assert(caching_topic->cache != NULL);
230
231         cache_dump.type = type;
232         cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
233         if (!cache_dump.cached) {
234                 return NULL;
235         }
236
237         ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
238         return cache_dump.cached;
239 }
240
241 static struct stasis_message_type *__cache_clear_data;
242
243 static struct stasis_message_type *cache_clear_data(void)
244 {
245         ast_assert(__cache_clear_data != NULL);
246         return __cache_clear_data;
247 }
248
249 static struct stasis_message_type *__cache_update;
250
251 struct stasis_message_type *stasis_cache_update(void)
252 {
253         ast_assert(__cache_update != NULL);
254         return __cache_update;
255 }
256
257 struct cache_clear_data {
258         struct stasis_message_type *type;
259         char *id;
260 };
261
262 static void cache_clear_data_dtor(void *obj)
263 {
264         struct cache_clear_data *ev = obj;
265         ast_free(ev->id);
266         ev->id = NULL;
267         ao2_cleanup(ev->type);
268         ev->type = NULL;
269 }
270
271 struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
272 {
273         RAII_VAR(struct cache_clear_data *, ev, NULL, ao2_cleanup);
274         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
275
276         ev = ao2_alloc(sizeof(*ev), cache_clear_data_dtor);
277         if (!ev) {
278                 return NULL;
279         }
280
281         ev->id = ast_strdup(id);
282         if (!ev->id) {
283                 return NULL;
284         }
285         ao2_ref(type, +1);
286         ev->type = type;
287
288         msg = stasis_message_create(cache_clear_data(), ev);
289
290         if (!msg) {
291                 return NULL;
292         }
293
294         ao2_ref(msg, +1);
295         return msg;
296 }
297
298 static void stasis_cache_update_dtor(void *obj)
299 {
300         struct stasis_cache_update *update = obj;
301         ao2_cleanup(update->topic);
302         update->topic = NULL;
303         ao2_cleanup(update->old_snapshot);
304         update->old_snapshot = NULL;
305         ao2_cleanup(update->new_snapshot);
306         update->new_snapshot = NULL;
307         ao2_cleanup(update->type);
308         update->type = NULL;
309 }
310
311 static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
312 {
313         RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
314         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
315
316         ast_assert(topic != NULL);
317         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
318
319         update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
320         if (!update) {
321                 return NULL;
322         }
323
324         ao2_ref(topic, +1);
325         update->topic = topic;
326         if (old_snapshot) {
327                 ao2_ref(old_snapshot, +1);
328                 update->old_snapshot = old_snapshot;
329                 if (!new_snapshot) {
330                         ao2_ref(stasis_message_type(old_snapshot), +1);
331                         update->type = stasis_message_type(old_snapshot);
332                 }
333         }
334         if (new_snapshot) {
335                 ao2_ref(new_snapshot, +1);
336                 update->new_snapshot = new_snapshot;
337                 ao2_ref(stasis_message_type(new_snapshot), +1);
338                 update->type = stasis_message_type(new_snapshot);
339         }
340
341         msg = stasis_message_create(stasis_cache_update(), update);
342         if (!msg) {
343                 return NULL;
344         }
345
346         ao2_ref(msg, +1);
347         return msg;
348 }
349
350 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
351 {
352         RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
353         struct stasis_caching_topic *caching_topic = data;
354         const char *id = NULL;
355
356         ast_assert(caching_topic->topic != NULL);
357         ast_assert(caching_topic->id_fn != NULL);
358
359         if (stasis_subscription_final_message(sub, message)) {
360                 caching_topic_needs_unref = caching_topic;
361         }
362
363         /* Handle cache clear event */
364         if (cache_clear_data() == stasis_message_type(message)) {
365                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
366                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
367                 struct cache_clear_data *clear = stasis_message_data(message);
368                 ast_assert(clear->type != NULL);
369                 ast_assert(clear->id != NULL);
370                 old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
371                 if (old_snapshot) {
372                         update = update_create(topic, old_snapshot, NULL);
373                         stasis_publish(caching_topic->topic, update);
374                 } else {
375                         ast_log(LOG_ERROR,
376                                 "Attempting to remove an item from the cache that isn't there: %s %s\n",
377                                 stasis_message_type_name(clear->type), clear->id);
378                 }
379                 return;
380         }
381
382         id = caching_topic->id_fn(message);
383         if (id == NULL) {
384                 /* Object isn't cached; forward */
385                 stasis_forward_message(caching_topic->topic, topic, message);
386         } else {
387                 /* Update the cache */
388                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
389                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
390
391                 old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
392
393                 update = update_create(topic, old_snapshot, message);
394                 if (update == NULL) {
395                         return;
396                 }
397
398                 stasis_publish(caching_topic->topic, update);
399         }
400
401         if (stasis_subscription_final_message(sub, message)) {
402                 ao2_cleanup(caching_topic);
403         }
404 }
405
406 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
407 {
408         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
409         struct stasis_subscription *sub;
410         RAII_VAR(char *, new_name, NULL, free);
411         int ret;
412
413         ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
414         if (ret < 0) {
415                 return NULL;
416         }
417
418         caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
419         if (caching_topic == NULL) {
420                 return NULL;
421         }
422
423         caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
424         if (!caching_topic->cache) {
425                 ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
426                 return NULL;
427         }
428
429         caching_topic->topic = stasis_topic_create(new_name);
430         if (caching_topic->topic == NULL) {
431                 return NULL;
432         }
433
434         caching_topic->id_fn = id_fn;
435
436         sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
437         if (sub == NULL) {
438                 return NULL;
439         }
440         /* This is for the reference contained in the subscription above */
441         ao2_ref(caching_topic, +1);
442         caching_topic->sub = sub;
443
444         ao2_ref(caching_topic, +1);
445         return caching_topic;
446 }
447
448 static void stasis_cache_exit(void)
449 {
450         ao2_cleanup(__cache_clear_data);
451         __cache_clear_data = NULL;
452         ao2_cleanup(__cache_update);
453         __cache_update = NULL;
454 }
455
456 int stasis_cache_init(void)
457 {
458         ast_register_atexit(stasis_cache_exit);
459
460         if (__cache_clear_data || __cache_update) {
461                 ast_log(LOG_ERROR, "Stasis cache double initialized\n");
462                 return -1;
463         }
464
465         __cache_update = stasis_message_type_create("stasis_cache_update");
466         if (!__cache_update) {
467                 return -1;
468         }
469
470         __cache_clear_data = stasis_message_type_create("StasisCacheClear");
471         if (!__cache_clear_data) {
472                 return -1;
473         }
474         return 0;
475 }
476