Rework stasis cache clear events
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/hashtab.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/utils.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \internal */
46 struct stasis_caching_topic {
47         struct ao2_container *cache;
48         struct stasis_topic *topic;
49         struct stasis_subscription *sub;
50         snapshot_get_id id_fn;
51 };
52
53 static void stasis_caching_topic_dtor(void *obj) {
54         struct stasis_caching_topic *caching_topic = obj;
55         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
56         ast_assert(stasis_subscription_is_done(caching_topic->sub));
57         ao2_cleanup(caching_topic->sub);
58         caching_topic->sub = NULL;
59         ao2_cleanup(caching_topic->cache);
60         caching_topic->cache = NULL;
61         ao2_cleanup(caching_topic->topic);
62         caching_topic->topic = NULL;
63 }
64
65 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
66 {
67         return caching_topic->topic;
68 }
69
70 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
71 {
72         if (caching_topic) {
73                 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
74                         /* Increment the reference to hold on to it past the
75                          * unsubscribe */
76                         ao2_ref(caching_topic->sub, +1);
77                         stasis_unsubscribe(caching_topic->sub);
78                 } else {
79                         ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
80                 }
81         }
82         return NULL;
83 }
84
85 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
86 {
87         if (!caching_topic) {
88                 return NULL;
89         }
90
91         /* Hold a ref past the unsubscribe */
92         ao2_ref(caching_topic, +1);
93         stasis_caching_unsubscribe(caching_topic);
94         stasis_subscription_join(caching_topic->sub);
95         ao2_cleanup(caching_topic);
96         return NULL;
97 }
98
99 struct cache_entry {
100         struct stasis_message_type *type;
101         char *id;
102         struct stasis_message *snapshot;
103 };
104
105 static void cache_entry_dtor(void *obj)
106 {
107         struct cache_entry *entry = obj;
108         ao2_cleanup(entry->type);
109         entry->type = NULL;
110         ast_free(entry->id);
111         entry->id = NULL;
112         ao2_cleanup(entry->snapshot);
113         entry->snapshot = NULL;
114 }
115
116 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
117 {
118         RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
119
120         ast_assert(type != NULL);
121         ast_assert(id != NULL);
122
123         entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
124         if (!entry) {
125                 return NULL;
126         }
127
128         entry->id = ast_strdup(id);
129         if (!entry->id) {
130                 return NULL;
131         }
132
133         ao2_ref(type, +1);
134         entry->type = type;
135         if (snapshot != NULL) {
136                 ao2_ref(snapshot, +1);
137                 entry->snapshot = snapshot;
138         }
139
140         ao2_ref(entry, +1);
141         return entry;
142 }
143
144 static int cache_entry_hash(const void *obj, int flags)
145 {
146         const struct cache_entry *entry = obj;
147         int hash = 0;
148
149         ast_assert(!(flags & OBJ_KEY));
150
151         hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
152         hash += ast_hashtab_hash_string(entry->id);
153         return hash;
154 }
155
156 static int cache_entry_cmp(void *obj, void *arg, int flags)
157 {
158         const struct cache_entry *left = obj;
159         const struct cache_entry *right = arg;
160
161         ast_assert(!(flags & OBJ_KEY));
162
163         if (left->type == right->type && strcmp(left->id, right->id) == 0) {
164                 return CMP_MATCH | CMP_STOP;
165         }
166
167         return 0;
168 }
169
170 static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
171 {
172         RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
173         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
174         struct stasis_message *old_snapshot = NULL;
175
176         ast_assert(caching_topic->cache != NULL);
177
178         new_entry = cache_entry_create(type, id, new_snapshot);
179
180         if (new_snapshot == NULL) {
181                 /* Remove entry from cache */
182                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
183                 if (cached_entry) {
184                         old_snapshot = cached_entry->snapshot;
185                         cached_entry->snapshot = NULL;
186                 }
187         } else {
188                 /* Insert/update cache */
189                 SCOPED_AO2LOCK(lock, caching_topic->cache);
190
191                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
192                 if (cached_entry) {
193                         /* Update cache. Because objects are moving, no need to update refcounts. */
194                         old_snapshot = cached_entry->snapshot;
195                         cached_entry->snapshot = new_entry->snapshot;
196                         new_entry->snapshot = NULL;
197                 } else {
198                         /* Insert into the cache */
199                         ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
200                 }
201
202         }
203
204         return old_snapshot;
205 }
206
207 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
208 {
209         RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
210         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
211
212         ast_assert(caching_topic->cache != NULL);
213
214         search_entry = cache_entry_create(type, id, NULL);
215         if (search_entry == NULL) {
216                 return NULL;
217         }
218
219         cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
220         if (cached_entry == NULL) {
221                 return NULL;
222         }
223
224         ast_assert(cached_entry->snapshot != NULL);
225         ao2_ref(cached_entry->snapshot, +1);
226         return cached_entry->snapshot;
227 }
228
229 struct cache_dump_data {
230         struct ao2_container *cached;
231         struct stasis_message_type *type;
232 };
233
234 static int cache_dump_cb(void *obj, void *arg, int flags)
235 {
236         struct cache_dump_data *cache_dump = arg;
237         struct cache_entry *entry = obj;
238
239         if (!cache_dump->type || entry->type == cache_dump->type) {
240                 ao2_link(cache_dump->cached, entry->snapshot);
241         }
242
243         return 0;
244 }
245
246 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
247 {
248         struct cache_dump_data cache_dump;
249
250         ast_assert(caching_topic->cache != NULL);
251
252         cache_dump.type = type;
253         cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
254         if (!cache_dump.cached) {
255                 return NULL;
256         }
257
258         ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
259         return cache_dump.cached;
260 }
261
262 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
263 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
264
265 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
266 {
267         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
268
269         msg = stasis_message_create(stasis_cache_clear_type(), id_message);
270         if (!msg) {
271                 return NULL;
272         }
273
274         ao2_ref(msg, +1);
275         return msg;
276 }
277
278 static void stasis_cache_update_dtor(void *obj)
279 {
280         struct stasis_cache_update *update = obj;
281         ao2_cleanup(update->topic);
282         update->topic = NULL;
283         ao2_cleanup(update->old_snapshot);
284         update->old_snapshot = NULL;
285         ao2_cleanup(update->new_snapshot);
286         update->new_snapshot = NULL;
287         ao2_cleanup(update->type);
288         update->type = NULL;
289 }
290
291 static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
292 {
293         RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
294         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
295
296         ast_assert(topic != NULL);
297         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
298
299         update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
300         if (!update) {
301                 return NULL;
302         }
303
304         ao2_ref(topic, +1);
305         update->topic = topic;
306         if (old_snapshot) {
307                 ao2_ref(old_snapshot, +1);
308                 update->old_snapshot = old_snapshot;
309                 if (!new_snapshot) {
310                         ao2_ref(stasis_message_type(old_snapshot), +1);
311                         update->type = stasis_message_type(old_snapshot);
312                 }
313         }
314         if (new_snapshot) {
315                 ao2_ref(new_snapshot, +1);
316                 update->new_snapshot = new_snapshot;
317                 ao2_ref(stasis_message_type(new_snapshot), +1);
318                 update->type = stasis_message_type(new_snapshot);
319         }
320
321         msg = stasis_message_create(stasis_cache_update_type(), update);
322         if (!msg) {
323                 return NULL;
324         }
325
326         ao2_ref(msg, +1);
327         return msg;
328 }
329
330 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
331 {
332         RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
333         struct stasis_caching_topic *caching_topic = data;
334         const char *id = NULL;
335
336         ast_assert(caching_topic->topic != NULL);
337         ast_assert(caching_topic->id_fn != NULL);
338
339         if (stasis_subscription_final_message(sub, message)) {
340                 caching_topic_needs_unref = caching_topic;
341         }
342
343         /* Handle cache clear event */
344         if (stasis_cache_clear_type() == stasis_message_type(message)) {
345                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
346                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
347                 struct stasis_message *clear_msg = stasis_message_data(message);
348                 const char *clear_id = caching_topic->id_fn(clear_msg);
349                 struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
350
351                 ast_assert(clear_type != NULL);
352
353                 if (clear_id) {
354                         old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
355                         if (old_snapshot) {
356                                 update = update_create(topic, old_snapshot, NULL);
357                                 stasis_publish(caching_topic->topic, update);
358                                 return;
359                         }
360
361                         ast_log(LOG_ERROR,
362                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
363                                 stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
364                         return;
365                 }
366         }
367
368         id = caching_topic->id_fn(message);
369         if (id == NULL) {
370                 /* Object isn't cached; forward */
371                 stasis_forward_message(caching_topic->topic, topic, message);
372         } else {
373                 /* Update the cache */
374                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
375                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
376
377                 old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
378
379                 update = update_create(topic, old_snapshot, message);
380                 if (update == NULL) {
381                         return;
382                 }
383
384                 stasis_publish(caching_topic->topic, update);
385         }
386
387         if (stasis_subscription_final_message(sub, message)) {
388                 ao2_cleanup(caching_topic);
389         }
390 }
391
392 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
393 {
394         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
395         struct stasis_subscription *sub;
396         RAII_VAR(char *, new_name, NULL, free);
397         int ret;
398
399         ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
400         if (ret < 0) {
401                 return NULL;
402         }
403
404         caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
405         if (caching_topic == NULL) {
406                 return NULL;
407         }
408
409         caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
410         if (!caching_topic->cache) {
411                 ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
412                 return NULL;
413         }
414
415         caching_topic->topic = stasis_topic_create(new_name);
416         if (caching_topic->topic == NULL) {
417                 return NULL;
418         }
419
420         caching_topic->id_fn = id_fn;
421
422         sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
423         if (sub == NULL) {
424                 return NULL;
425         }
426         /* This is for the reference contained in the subscription above */
427         ao2_ref(caching_topic, +1);
428         caching_topic->sub = sub;
429
430         ao2_ref(caching_topic, +1);
431         return caching_topic;
432 }
433
434 static void stasis_cache_cleanup(void)
435 {
436         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
437         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
438 }
439
440 int stasis_cache_init(void)
441 {
442         ast_register_cleanup(stasis_cache_cleanup);
443
444         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
445                 return -1;
446         }
447
448         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
449                 return -1;
450         }
451
452         return 0;
453 }
454