Add support for requiring that all queued messages on a caching topic have been handl...
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/hashtab.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/utils.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \internal */
46 struct stasis_caching_topic {
47         struct ao2_container *cache;
48         struct stasis_topic *topic;
49         struct stasis_topic *original_topic;
50         struct stasis_subscription *sub;
51         snapshot_get_id id_fn;
52 };
53
54 static struct stasis_message_type *cache_guarantee_type(void);
55
56 static void stasis_caching_topic_dtor(void *obj) {
57         struct stasis_caching_topic *caching_topic = obj;
58         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
59         ast_assert(stasis_subscription_is_done(caching_topic->sub));
60         ao2_cleanup(caching_topic->sub);
61         caching_topic->sub = NULL;
62         ao2_cleanup(caching_topic->cache);
63         caching_topic->cache = NULL;
64         ao2_cleanup(caching_topic->topic);
65         caching_topic->topic = NULL;
66         ao2_cleanup(caching_topic->original_topic);
67         caching_topic->original_topic = NULL;
68 }
69
70 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
71 {
72         return caching_topic->topic;
73 }
74
75 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
76 {
77         if (caching_topic) {
78                 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
79                         /* Increment the reference to hold on to it past the
80                          * unsubscribe */
81                         ao2_ref(caching_topic->sub, +1);
82                         stasis_unsubscribe(caching_topic->sub);
83                 } else {
84                         ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
85                 }
86         }
87         return NULL;
88 }
89
90 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
91 {
92         if (!caching_topic) {
93                 return NULL;
94         }
95
96         /* Hold a ref past the unsubscribe */
97         ao2_ref(caching_topic, +1);
98         stasis_caching_unsubscribe(caching_topic);
99         stasis_subscription_join(caching_topic->sub);
100         ao2_cleanup(caching_topic);
101         return NULL;
102 }
103
104 struct cache_entry {
105         struct stasis_message_type *type;
106         char *id;
107         struct stasis_message *snapshot;
108 };
109
110 static void cache_entry_dtor(void *obj)
111 {
112         struct cache_entry *entry = obj;
113         ao2_cleanup(entry->type);
114         entry->type = NULL;
115         ast_free(entry->id);
116         entry->id = NULL;
117         ao2_cleanup(entry->snapshot);
118         entry->snapshot = NULL;
119 }
120
121 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
122 {
123         RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
124
125         ast_assert(type != NULL);
126         ast_assert(id != NULL);
127
128         entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
129         if (!entry) {
130                 return NULL;
131         }
132
133         entry->id = ast_strdup(id);
134         if (!entry->id) {
135                 return NULL;
136         }
137
138         ao2_ref(type, +1);
139         entry->type = type;
140         if (snapshot != NULL) {
141                 ao2_ref(snapshot, +1);
142                 entry->snapshot = snapshot;
143         }
144
145         ao2_ref(entry, +1);
146         return entry;
147 }
148
149 static int cache_entry_hash(const void *obj, int flags)
150 {
151         const struct cache_entry *entry = obj;
152         int hash = 0;
153
154         ast_assert(!(flags & OBJ_KEY));
155
156         hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
157         hash += ast_hashtab_hash_string(entry->id);
158         return hash;
159 }
160
161 static int cache_entry_cmp(void *obj, void *arg, int flags)
162 {
163         const struct cache_entry *left = obj;
164         const struct cache_entry *right = arg;
165
166         ast_assert(!(flags & OBJ_KEY));
167
168         if (left->type == right->type && strcmp(left->id, right->id) == 0) {
169                 return CMP_MATCH | CMP_STOP;
170         }
171
172         return 0;
173 }
174
175 static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
176 {
177         RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
178         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
179         struct stasis_message *old_snapshot = NULL;
180
181         ast_assert(caching_topic->cache != NULL);
182
183         new_entry = cache_entry_create(type, id, new_snapshot);
184
185         if (new_snapshot == NULL) {
186                 /* Remove entry from cache */
187                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
188                 if (cached_entry) {
189                         old_snapshot = cached_entry->snapshot;
190                         cached_entry->snapshot = NULL;
191                 }
192         } else {
193                 /* Insert/update cache */
194                 SCOPED_AO2LOCK(lock, caching_topic->cache);
195
196                 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
197                 if (cached_entry) {
198                         /* Update cache. Because objects are moving, no need to update refcounts. */
199                         old_snapshot = cached_entry->snapshot;
200                         cached_entry->snapshot = new_entry->snapshot;
201                         new_entry->snapshot = NULL;
202                 } else {
203                         /* Insert into the cache */
204                         ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
205                 }
206
207         }
208
209         return old_snapshot;
210 }
211
212 /*! \internal */
213 struct caching_guarantee {
214         ast_mutex_t lock;
215         ast_cond_t cond;
216         unsigned int done:1;
217 };
218
219 static void caching_guarantee_dtor(void *obj)
220 {
221         struct caching_guarantee *guarantee = obj;
222
223         ast_assert(guarantee->done == 1);
224
225         ast_mutex_destroy(&guarantee->lock);
226         ast_cond_destroy(&guarantee->cond);
227 }
228
229 static struct stasis_message *caching_guarantee_create(void)
230 {
231         RAII_VAR(struct caching_guarantee *, guarantee, NULL, ao2_cleanup);
232         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
233
234         if (!(guarantee = ao2_alloc(sizeof(*guarantee), caching_guarantee_dtor))) {
235                 return NULL;
236         }
237
238         ast_mutex_init(&guarantee->lock);
239         ast_cond_init(&guarantee->cond, NULL);
240
241         if (!(msg = stasis_message_create(cache_guarantee_type(), guarantee))) {
242                 return NULL;
243         }
244
245         ao2_ref(msg, +1);
246         return msg;
247 }
248
249 struct stasis_message *stasis_cache_get_extended(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, unsigned int guaranteed)
250 {
251         RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
252         RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
253
254         ast_assert(caching_topic->cache != NULL);
255
256         if (guaranteed) {
257                 RAII_VAR(struct stasis_message *, msg, caching_guarantee_create(), ao2_cleanup);
258                 struct caching_guarantee *guarantee = stasis_message_data(msg);
259
260                 ast_mutex_lock(&guarantee->lock);
261                 stasis_publish(caching_topic->original_topic, msg);
262                 while (!guarantee->done) {
263                         ast_cond_wait(&guarantee->cond, &guarantee->lock);
264                 }
265                 ast_mutex_unlock(&guarantee->lock);
266         }
267
268         search_entry = cache_entry_create(type, id, NULL);
269         if (search_entry == NULL) {
270                 return NULL;
271         }
272
273         cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
274         if (cached_entry == NULL) {
275                 return NULL;
276         }
277
278         ast_assert(cached_entry->snapshot != NULL);
279         ao2_ref(cached_entry->snapshot, +1);
280         return cached_entry->snapshot;
281 }
282
283 struct cache_dump_data {
284         struct ao2_container *cached;
285         struct stasis_message_type *type;
286 };
287
288 static int cache_dump_cb(void *obj, void *arg, int flags)
289 {
290         struct cache_dump_data *cache_dump = arg;
291         struct cache_entry *entry = obj;
292
293         if (!cache_dump->type || entry->type == cache_dump->type) {
294                 ao2_link(cache_dump->cached, entry->snapshot);
295         }
296
297         return 0;
298 }
299
300 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
301 {
302         struct cache_dump_data cache_dump;
303
304         ast_assert(caching_topic->cache != NULL);
305
306         cache_dump.type = type;
307         cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
308         if (!cache_dump.cached) {
309                 return NULL;
310         }
311
312         ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
313         return cache_dump.cached;
314 }
315
316 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
317 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
318 STASIS_MESSAGE_TYPE_DEFN(cache_guarantee_type);
319
320 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
321 {
322         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
323
324         msg = stasis_message_create(stasis_cache_clear_type(), id_message);
325         if (!msg) {
326                 return NULL;
327         }
328
329         ao2_ref(msg, +1);
330         return msg;
331 }
332
333 static void stasis_cache_update_dtor(void *obj)
334 {
335         struct stasis_cache_update *update = obj;
336         ao2_cleanup(update->topic);
337         update->topic = NULL;
338         ao2_cleanup(update->old_snapshot);
339         update->old_snapshot = NULL;
340         ao2_cleanup(update->new_snapshot);
341         update->new_snapshot = NULL;
342         ao2_cleanup(update->type);
343         update->type = NULL;
344 }
345
346 static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
347 {
348         RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
349         RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
350
351         ast_assert(topic != NULL);
352         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
353
354         update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
355         if (!update) {
356                 return NULL;
357         }
358
359         ao2_ref(topic, +1);
360         update->topic = topic;
361         if (old_snapshot) {
362                 ao2_ref(old_snapshot, +1);
363                 update->old_snapshot = old_snapshot;
364                 if (!new_snapshot) {
365                         ao2_ref(stasis_message_type(old_snapshot), +1);
366                         update->type = stasis_message_type(old_snapshot);
367                 }
368         }
369         if (new_snapshot) {
370                 ao2_ref(new_snapshot, +1);
371                 update->new_snapshot = new_snapshot;
372                 ao2_ref(stasis_message_type(new_snapshot), +1);
373                 update->type = stasis_message_type(new_snapshot);
374         }
375
376         msg = stasis_message_create(stasis_cache_update_type(), update);
377         if (!msg) {
378                 return NULL;
379         }
380
381         ao2_ref(msg, +1);
382         return msg;
383 }
384
385 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
386 {
387         RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
388         struct stasis_caching_topic *caching_topic = data;
389         const char *id = NULL;
390
391         ast_assert(caching_topic->topic != NULL);
392         ast_assert(caching_topic->id_fn != NULL);
393
394         if (stasis_subscription_final_message(sub, message)) {
395                 caching_topic_needs_unref = caching_topic;
396         }
397
398         /* Handle cache guarantee event */
399         if (cache_guarantee_type() == stasis_message_type(message)) {
400                 struct caching_guarantee *guarantee = stasis_message_data(message);
401
402                 ast_mutex_lock(&guarantee->lock);
403                 guarantee->done = 1;
404                 ast_cond_signal(&guarantee->cond);
405                 ast_mutex_unlock(&guarantee->lock);
406
407                 return;
408         }
409
410         /* Handle cache clear event */
411         if (stasis_cache_clear_type() == stasis_message_type(message)) {
412                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
413                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
414                 struct stasis_message *clear_msg = stasis_message_data(message);
415                 const char *clear_id = caching_topic->id_fn(clear_msg);
416                 struct stasis_message_type *clear_type = stasis_message_type(clear_msg);
417
418                 ast_assert(clear_type != NULL);
419
420                 if (clear_id) {
421                         old_snapshot = cache_put(caching_topic, clear_type, clear_id, NULL);
422                         if (old_snapshot) {
423                                 update = update_create(topic, old_snapshot, NULL);
424                                 stasis_publish(caching_topic->topic, update);
425                                 return;
426                         }
427
428                         ast_log(LOG_ERROR,
429                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
430                                 stasis_topic_name(caching_topic->topic), stasis_message_type_name(clear_type), clear_id);
431                         return;
432                 }
433         }
434
435         id = caching_topic->id_fn(message);
436         if (id == NULL) {
437                 /* Object isn't cached; forward */
438                 stasis_forward_message(caching_topic->topic, topic, message);
439         } else {
440                 /* Update the cache */
441                 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
442                 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
443
444                 old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
445
446                 update = update_create(topic, old_snapshot, message);
447                 if (update == NULL) {
448                         return;
449                 }
450
451                 stasis_publish(caching_topic->topic, update);
452         }
453
454         if (stasis_subscription_final_message(sub, message)) {
455                 ao2_cleanup(caching_topic);
456         }
457 }
458
459 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
460 {
461         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
462         struct stasis_subscription *sub;
463         RAII_VAR(char *, new_name, NULL, free);
464         int ret;
465
466         ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
467         if (ret < 0) {
468                 return NULL;
469         }
470
471         caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
472         if (caching_topic == NULL) {
473                 return NULL;
474         }
475
476         caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
477         if (!caching_topic->cache) {
478                 ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
479                 return NULL;
480         }
481
482         caching_topic->topic = stasis_topic_create(new_name);
483         if (caching_topic->topic == NULL) {
484                 return NULL;
485         }
486
487         caching_topic->id_fn = id_fn;
488
489         sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
490         if (sub == NULL) {
491                 return NULL;
492         }
493
494         ao2_ref(original_topic, +1);
495         caching_topic->original_topic = original_topic;
496
497         /* This is for the reference contained in the subscription above */
498         ao2_ref(caching_topic, +1);
499         caching_topic->sub = sub;
500
501         ao2_ref(caching_topic, +1);
502         return caching_topic;
503 }
504
505 static void stasis_cache_cleanup(void)
506 {
507         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
508         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
509         STASIS_MESSAGE_TYPE_CLEANUP(cache_guarantee_type);
510 }
511
512 int stasis_cache_init(void)
513 {
514         ast_register_cleanup(stasis_cache_cleanup);
515
516         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
517                 return -1;
518         }
519
520         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
521                 return -1;
522         }
523
524         if (STASIS_MESSAGE_TYPE_INIT(cache_guarantee_type) != 0) {
525                 return -1;
526         }
527
528         return 0;
529 }
530