stasis: Add internal filtering of messages.
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/hashtab.h"
34 #include "asterisk/stasis_internal.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/utils.h"
37 #include "asterisk/vector.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \internal */
46 struct stasis_cache {
47         struct ao2_container *entries;
48         snapshot_get_id id_fn;
49         cache_aggregate_calc_fn aggregate_calc_fn;
50         cache_aggregate_publish_fn aggregate_publish_fn;
51         int registered;
52 };
53
54 /*! \internal */
55 struct stasis_caching_topic {
56         struct stasis_cache *cache;
57         struct stasis_topic *topic;
58         struct stasis_topic *original_topic;
59         struct stasis_subscription *sub;
60 };
61
62 static void stasis_caching_topic_dtor(void *obj)
63 {
64         struct stasis_caching_topic *caching_topic = obj;
65
66         /* Caching topics contain subscriptions, and must be manually
67          * unsubscribed. */
68         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
69         /* If there are any messages in flight to this subscription; that would
70          * be bad. */
71         ast_assert(stasis_subscription_is_done(caching_topic->sub));
72
73         ao2_container_unregister(stasis_topic_name(caching_topic->topic));
74
75         ao2_cleanup(caching_topic->sub);
76         caching_topic->sub = NULL;
77         ao2_cleanup(caching_topic->cache);
78         caching_topic->cache = NULL;
79         ao2_cleanup(caching_topic->topic);
80         caching_topic->topic = NULL;
81         ao2_cleanup(caching_topic->original_topic);
82         caching_topic->original_topic = NULL;
83 }
84
85 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
86 {
87         return caching_topic->topic;
88 }
89
90 int stasis_caching_accept_message_type(struct stasis_caching_topic *caching_topic,
91         struct stasis_message_type *type)
92 {
93         int res;
94
95         if (!caching_topic) {
96                 return -1;
97         }
98
99         /* We wait to accept the stasis specific message types until now so that by default everything
100          * will flow to us.
101          */
102         res = stasis_subscription_accept_message_type(caching_topic->sub, stasis_cache_clear_type());
103         res |= stasis_subscription_accept_message_type(caching_topic->sub, stasis_subscription_change_type());
104         res |= stasis_subscription_accept_message_type(caching_topic->sub, type);
105
106         return res;
107 }
108
109 int stasis_caching_set_filter(struct stasis_caching_topic *caching_topic,
110         enum stasis_subscription_message_filter filter)
111 {
112         if (!caching_topic) {
113                 return -1;
114         }
115         return stasis_subscription_set_filter(caching_topic->sub, filter);
116 }
117
118
119 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
120 {
121         if (!caching_topic) {
122                 return NULL;
123         }
124
125         /*
126          * The subscription may hold the last reference to this caching
127          * topic, but we want to make sure the unsubscribe finishes
128          * before kicking of the caching topic's dtor.
129          */
130         ao2_ref(caching_topic, +1);
131
132         if (stasis_subscription_is_subscribed(caching_topic->sub)) {
133                 /*
134                  * Increment the reference to hold on to it past the
135                  * unsubscribe. Will be cleaned up in dtor.
136                  */
137                 ao2_ref(caching_topic->sub, +1);
138                 stasis_unsubscribe(caching_topic->sub);
139         } else {
140                 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
141         }
142         ao2_cleanup(caching_topic);
143         return NULL;
144 }
145
146 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
147 {
148         if (!caching_topic) {
149                 return NULL;
150         }
151
152         /* Hold a ref past the unsubscribe */
153         ao2_ref(caching_topic, +1);
154         stasis_caching_unsubscribe(caching_topic);
155         stasis_subscription_join(caching_topic->sub);
156         ao2_cleanup(caching_topic);
157         return NULL;
158 }
159
160 /*!
161  * \brief The key for an entry in the cache
162  * \note The items in this struct must be immutable for the item in the cache
163  */
164 struct cache_entry_key {
165         /*! The message type of the item stored in the cache */
166         struct stasis_message_type *type;
167         /*! The unique ID of the item stored in the cache */
168         const char *id;
169         /*! The hash, computed from \c type and \c id */
170         unsigned int hash;
171 };
172
173 struct stasis_cache_entry {
174         struct cache_entry_key key;
175         /*! Aggregate snapshot of the stasis cache. */
176         struct stasis_message *aggregate;
177         /*! Local entity snapshot of the stasis event. */
178         struct stasis_message *local;
179         /*! Remote entity snapshots of the stasis event. */
180         AST_VECTOR(, struct stasis_message *) remote;
181 };
182
183 static void cache_entry_dtor(void *obj)
184 {
185         struct stasis_cache_entry *entry = obj;
186         size_t idx;
187
188         entry->key.type = NULL;
189         ast_free((char *) entry->key.id);
190         entry->key.id = NULL;
191
192         ao2_cleanup(entry->aggregate);
193         entry->aggregate = NULL;
194         ao2_cleanup(entry->local);
195         entry->local = NULL;
196
197         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
198                 struct stasis_message *remote;
199
200                 remote = AST_VECTOR_GET(&entry->remote, idx);
201                 ao2_cleanup(remote);
202         }
203         AST_VECTOR_FREE(&entry->remote);
204 }
205
206 static void cache_entry_compute_hash(struct cache_entry_key *key)
207 {
208         key->hash = stasis_message_type_hash(key->type);
209         key->hash += ast_hashtab_hash_string(key->id);
210 }
211
212 static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
213 {
214         struct stasis_cache_entry *entry;
215         int is_remote;
216
217         ast_assert(id != NULL);
218         ast_assert(snapshot != NULL);
219
220         if (!type) {
221                 return NULL;
222         }
223
224         entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
225                 AO2_ALLOC_OPT_LOCK_NOLOCK);
226         if (!entry) {
227                 return NULL;
228         }
229
230         entry->key.id = ast_strdup(id);
231         if (!entry->key.id) {
232                 ao2_cleanup(entry);
233                 return NULL;
234         }
235         /*
236          * Normal ao2 ref counting rules says we should increment the message
237          * type ref here and decrement it in cache_entry_dtor().  However, the
238          * stasis message snapshot is cached here, will always have the same type
239          * as the cache entry, and can legitimately cause the type ref count to
240          * hit the excessive ref count assertion.  Since the cache entry will
241          * always have a snapshot we can get away with not holding a ref here.
242          */
243         ast_assert(type == stasis_message_type(snapshot));
244         entry->key.type = type;
245         cache_entry_compute_hash(&entry->key);
246
247         is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
248         if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
249                 ao2_cleanup(entry);
250                 return NULL;
251         }
252
253         if (is_remote) {
254                 if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
255                         ao2_cleanup(entry);
256                         return NULL;
257                 }
258         } else {
259                 entry->local = snapshot;
260         }
261         ao2_bump(snapshot);
262
263         return entry;
264 }
265
266 static int cache_entry_hash(const void *obj, int flags)
267 {
268         const struct stasis_cache_entry *object;
269         const struct cache_entry_key *key;
270
271         switch (flags & OBJ_SEARCH_MASK) {
272         case OBJ_SEARCH_KEY:
273                 key = obj;
274                 break;
275         case OBJ_SEARCH_OBJECT:
276                 object = obj;
277                 key = &object->key;
278                 break;
279         default:
280                 /* Hash can only work on something with a full key. */
281                 ast_assert(0);
282                 return 0;
283         }
284
285         return (int)key->hash;
286 }
287
288 static int cache_entry_cmp(void *obj, void *arg, int flags)
289 {
290         const struct stasis_cache_entry *object_left = obj;
291         const struct stasis_cache_entry *object_right = arg;
292         const struct cache_entry_key *right_key = arg;
293         int cmp;
294
295         switch (flags & OBJ_SEARCH_MASK) {
296         case OBJ_SEARCH_OBJECT:
297                 right_key = &object_right->key;
298                 /* Fall through */
299         case OBJ_SEARCH_KEY:
300                 cmp = object_left->key.type != right_key->type
301                         || strcmp(object_left->key.id, right_key->id);
302                 break;
303         case OBJ_SEARCH_PARTIAL_KEY:
304                 /* Not supported by container */
305                 ast_assert(0);
306                 cmp = -1;
307                 break;
308         default:
309                 /*
310                  * What arg points to is specific to this traversal callback
311                  * and has no special meaning to astobj2.
312                  */
313                 cmp = 0;
314                 break;
315         }
316         if (cmp) {
317                 return 0;
318         }
319         /*
320          * At this point the traversal callback is identical to a sorted
321          * container.
322          */
323         return CMP_MATCH;
324 }
325
326 static void cache_dtor(void *obj)
327 {
328         struct stasis_cache *cache = obj;
329
330         ao2_cleanup(cache->entries);
331         cache->entries = NULL;
332 }
333
334 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
335         cache_aggregate_calc_fn aggregate_calc_fn,
336         cache_aggregate_publish_fn aggregate_publish_fn)
337 {
338         struct stasis_cache *cache;
339
340         cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
341                 AO2_ALLOC_OPT_LOCK_NOLOCK);
342         if (!cache) {
343                 return NULL;
344         }
345
346         cache->entries = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
347                 NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
348         if (!cache->entries) {
349                 ao2_cleanup(cache);
350                 return NULL;
351         }
352
353         cache->id_fn = id_fn;
354         cache->aggregate_calc_fn = aggregate_calc_fn;
355         cache->aggregate_publish_fn = aggregate_publish_fn;
356
357         return cache;
358 }
359
360 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
361 {
362         return stasis_cache_create_full(id_fn, NULL, NULL);
363 }
364
365 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
366 {
367         return entry->aggregate;
368 }
369
370 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
371 {
372         return entry->local;
373 }
374
375 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
376 {
377         if (idx < AST_VECTOR_SIZE(&entry->remote)) {
378                 return AST_VECTOR_GET(&entry->remote, idx);
379         }
380         return NULL;
381 }
382
383 /*!
384  * \internal
385  * \brief Find the cache entry in the cache entries container.
386  *
387  * \param entries Container of cached entries.
388  * \param type Type of message to retrieve the cache entry.
389  * \param id Identity of the snapshot to retrieve the cache entry.
390  *
391  * \note The entries container is already locked.
392  *
393  * \retval Cache-entry on success.
394  * \retval NULL Not in cache.
395  */
396 static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
397 {
398         struct cache_entry_key search_key;
399         struct stasis_cache_entry *entry;
400
401         search_key.type = type;
402         search_key.id = id;
403         cache_entry_compute_hash(&search_key);
404         entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
405
406         /* Ensure that what we looked for is what we found. */
407         ast_assert(!entry
408                 || (!strcmp(stasis_message_type_name(entry->key.type),
409                         stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
410         return entry;
411 }
412
413 /*!
414  * \internal
415  * \brief Remove the stasis snapshot in the cache entry determined by eid.
416  *
417  * \param entries Container of cached entries.
418  * \param cached_entry The entry to remove the snapshot from.
419  * \param eid Which snapshot in the cached entry.
420  *
421  * \note The entries container is already locked.
422  *
423  * \return Previous stasis entry snapshot.
424  */
425 static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
426 {
427         struct stasis_message *old_snapshot;
428         int is_remote;
429
430         is_remote = ast_eid_cmp(eid, &ast_eid_default);
431         if (!is_remote) {
432                 old_snapshot = cached_entry->local;
433                 cached_entry->local = NULL;
434         } else {
435                 int idx;
436
437                 old_snapshot = NULL;
438                 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
439                         struct stasis_message *cur;
440
441                         cur = AST_VECTOR_GET(&cached_entry->remote, idx);
442                         if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
443                                 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
444                                 break;
445                         }
446                 }
447         }
448
449         if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
450                 ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
451         }
452
453         return old_snapshot;
454 }
455
456 /*!
457  * \internal
458  * \brief Update the stasis snapshot in the cache entry determined by eid.
459  *
460  * \param cached_entry The entry to remove the snapshot from.
461  * \param eid Which snapshot in the cached entry.
462  * \param new_snapshot Snapshot to replace the old snapshot.
463  *
464  * \return Previous stasis entry snapshot.
465  */
466 static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
467 {
468         struct stasis_message *old_snapshot;
469         int is_remote;
470         int idx;
471
472         is_remote = ast_eid_cmp(eid, &ast_eid_default);
473         if (!is_remote) {
474                 old_snapshot = cached_entry->local;
475                 cached_entry->local = ao2_bump(new_snapshot);
476                 return old_snapshot;
477         }
478
479         old_snapshot = NULL;
480         for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
481                 struct stasis_message *cur;
482
483                 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
484                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
485                         old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
486                         break;
487                 }
488         }
489         if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
490                 ao2_bump(new_snapshot);
491         }
492
493         return old_snapshot;
494 }
495
496 struct cache_put_snapshots {
497         /*! Old cache eid snapshot. */
498         struct stasis_message *old;
499         /*! Old cache aggregate snapshot. */
500         struct stasis_message *aggregate_old;
501         /*! New cache aggregate snapshot. */
502         struct stasis_message *aggregate_new;
503 };
504
505 static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
506         struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
507         struct stasis_message *new_snapshot)
508 {
509         struct stasis_cache_entry *cached_entry;
510         struct cache_put_snapshots snapshots;
511
512         ast_assert(cache->entries != NULL);
513         ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
514         ast_assert(new_snapshot == NULL ||
515                 type == stasis_message_type(new_snapshot));
516
517         memset(&snapshots, 0, sizeof(snapshots));
518
519         ao2_wrlock(cache->entries);
520
521         cached_entry = cache_find(cache->entries, type, id);
522
523         /* Update the eid snapshot. */
524         if (!new_snapshot) {
525                 /* Remove snapshot from cache */
526                 if (cached_entry) {
527                         snapshots.old = cache_remove(cache->entries, cached_entry, eid);
528                 }
529         } else if (cached_entry) {
530                 /* Update snapshot in cache */
531                 snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
532         } else {
533                 /* Insert into the cache */
534                 cached_entry = cache_entry_create(type, id, new_snapshot);
535                 if (cached_entry) {
536                         ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
537                 }
538         }
539
540         /* Update the aggregate snapshot. */
541         if (cache->aggregate_calc_fn && cached_entry) {
542                 snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
543                 snapshots.aggregate_old = cached_entry->aggregate;
544                 cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
545         }
546
547         ao2_unlock(cache->entries);
548
549         ao2_cleanup(cached_entry);
550         return snapshots;
551 }
552
553 /*!
554  * \internal
555  * \brief Dump all entity snapshots in the cache entry into the given container.
556  *
557  * \param snapshots Container to put all snapshots in the cache entry.
558  * \param entry Cache entry to use.
559  *
560  * \retval 0 on success.
561  * \retval non-zero on error.
562  */
563 static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
564 {
565         int idx;
566         int err = 0;
567
568         ast_assert(snapshots != NULL);
569         ast_assert(entry != NULL);
570
571         /* The aggregate snapshot is not a snapshot from an entity. */
572
573         if (entry->local) {
574                 err |= !ao2_link(snapshots, entry->local);
575         }
576
577         for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
578                 struct stasis_message *snapshot;
579
580                 snapshot = AST_VECTOR_GET(&entry->remote, idx);
581                 err |= !ao2_link(snapshots, snapshot);
582         }
583
584         return err;
585 }
586
587 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
588 {
589         struct stasis_cache_entry *cached_entry;
590         struct ao2_container *found;
591
592         ast_assert(cache != NULL);
593         ast_assert(cache->entries != NULL);
594         ast_assert(id != NULL);
595
596         if (!type) {
597                 return NULL;
598         }
599
600         found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
601         if (!found) {
602                 return NULL;
603         }
604
605         ao2_rdlock(cache->entries);
606
607         cached_entry = cache_find(cache->entries, type, id);
608         if (cached_entry && cache_entry_dump(found, cached_entry)) {
609                 ao2_cleanup(found);
610                 found = NULL;
611         }
612
613         ao2_unlock(cache->entries);
614
615         ao2_cleanup(cached_entry);
616         return found;
617 }
618
619 /*!
620  * \internal
621  * \brief Retrieve an item from the cache entry for a specific eid.
622  *
623  * \param entry Cache entry to use.
624  * \param eid Specific entity id to retrieve.  NULL for aggregate.
625  *
626  * \note The returned snapshot has not had its reference bumped.
627  *
628  * \retval Snapshot from the cache.
629  * \retval \c NULL if snapshot is not found.
630  */
631 static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
632 {
633         int is_remote;
634         int idx;
635
636         if (!eid) {
637                 /* Get aggregate. */
638                 return entry->aggregate;
639         }
640
641         /* Get snapshot with specific eid. */
642         is_remote = ast_eid_cmp(eid, &ast_eid_default);
643         if (!is_remote) {
644                 return entry->local;
645         }
646
647         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
648                 struct stasis_message *cur;
649
650                 cur = AST_VECTOR_GET(&entry->remote, idx);
651                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
652                         return cur;
653                 }
654         }
655
656         return NULL;
657 }
658
659 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
660 {
661         struct stasis_cache_entry *cached_entry;
662         struct stasis_message *snapshot = NULL;
663
664         ast_assert(cache != NULL);
665         ast_assert(cache->entries != NULL);
666         ast_assert(id != NULL);
667
668         if (!type) {
669                 return NULL;
670         }
671
672         ao2_rdlock(cache->entries);
673
674         cached_entry = cache_find(cache->entries, type, id);
675         if (cached_entry) {
676                 snapshot = cache_entry_by_eid(cached_entry, eid);
677                 ao2_bump(snapshot);
678         }
679
680         ao2_unlock(cache->entries);
681
682         ao2_cleanup(cached_entry);
683         return snapshot;
684 }
685
686 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
687 {
688         return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
689 }
690
691 struct cache_dump_data {
692         struct ao2_container *container;
693         struct stasis_message_type *type;
694         const struct ast_eid *eid;
695 };
696
697 static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
698 {
699         struct cache_dump_data *cache_dump = arg;
700         struct stasis_cache_entry *entry = obj;
701
702         if (!cache_dump->type || entry->key.type == cache_dump->type) {
703                 struct stasis_message *snapshot;
704
705                 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
706                 if (snapshot) {
707                         if (!ao2_link(cache_dump->container, snapshot)) {
708                                 ao2_cleanup(cache_dump->container);
709                                 cache_dump->container = NULL;
710                                 return CMP_STOP;
711                         }
712                 }
713         }
714
715         return 0;
716 }
717
718 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
719 {
720         struct cache_dump_data cache_dump;
721
722         ast_assert(cache != NULL);
723         ast_assert(cache->entries != NULL);
724
725         cache_dump.eid = eid;
726         cache_dump.type = type;
727         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
728         if (!cache_dump.container) {
729                 return NULL;
730         }
731
732         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
733         return cache_dump.container;
734 }
735
736 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
737 {
738         return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
739 }
740
741 static int cache_dump_all_cb(void *obj, void *arg, int flags)
742 {
743         struct cache_dump_data *cache_dump = arg;
744         struct stasis_cache_entry *entry = obj;
745
746         if (!cache_dump->type || entry->key.type == cache_dump->type) {
747                 if (cache_entry_dump(cache_dump->container, entry)) {
748                         ao2_cleanup(cache_dump->container);
749                         cache_dump->container = NULL;
750                         return CMP_STOP;
751                 }
752         }
753
754         return 0;
755 }
756
757 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
758 {
759         struct cache_dump_data cache_dump;
760
761         ast_assert(cache != NULL);
762         ast_assert(cache->entries != NULL);
763
764         cache_dump.eid = NULL;
765         cache_dump.type = type;
766         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
767         if (!cache_dump.container) {
768                 return NULL;
769         }
770
771         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
772         return cache_dump.container;
773 }
774
775 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
776 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
777
778 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
779 {
780         return stasis_message_create(stasis_cache_clear_type(), id_message);
781 }
782
783 static void stasis_cache_update_dtor(void *obj)
784 {
785         struct stasis_cache_update *update = obj;
786
787         ao2_cleanup(update->old_snapshot);
788         update->old_snapshot = NULL;
789         ao2_cleanup(update->new_snapshot);
790         update->new_snapshot = NULL;
791         ao2_cleanup(update->type);
792         update->type = NULL;
793 }
794
795 static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
796 {
797         struct stasis_cache_update *update;
798         struct stasis_message *msg;
799
800         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
801
802         if (!stasis_cache_update_type()) {
803                 return NULL;
804         }
805
806         update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
807                 AO2_ALLOC_OPT_LOCK_NOLOCK);
808         if (!update) {
809                 return NULL;
810         }
811
812         if (old_snapshot) {
813                 ao2_ref(old_snapshot, +1);
814                 update->old_snapshot = old_snapshot;
815                 if (!new_snapshot) {
816                         ao2_ref(stasis_message_type(old_snapshot), +1);
817                         update->type = stasis_message_type(old_snapshot);
818                 }
819         }
820         if (new_snapshot) {
821                 ao2_ref(new_snapshot, +1);
822                 update->new_snapshot = new_snapshot;
823                 ao2_ref(stasis_message_type(new_snapshot), +1);
824                 update->type = stasis_message_type(new_snapshot);
825         }
826
827         msg = stasis_message_create(stasis_cache_update_type(), update);
828
829         ao2_cleanup(update);
830         return msg;
831 }
832
833 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
834         struct stasis_message *message)
835 {
836         struct stasis_caching_topic *caching_topic_needs_unref;
837         struct stasis_caching_topic *caching_topic = data;
838         struct stasis_message *msg;
839         struct stasis_message *msg_put;
840         struct stasis_message_type *msg_type;
841         const struct ast_eid *msg_eid;
842         const char *msg_id;
843
844         ast_assert(caching_topic != NULL);
845         ast_assert(caching_topic->topic != NULL);
846         ast_assert(caching_topic->cache != NULL);
847         ast_assert(caching_topic->cache->id_fn != NULL);
848
849         if (stasis_subscription_final_message(sub, message)) {
850                 caching_topic_needs_unref = caching_topic;
851         } else {
852                 caching_topic_needs_unref = NULL;
853         }
854
855         msg_type = stasis_message_type(message);
856
857         /*
858          * app_voicemail used to rely on the cache containing every topic subscribe and
859          * unsubscribe in order to determine if anyone was currently subscribed to a
860          * particular mailbox.  This caused the cache to grow unabated for the life of
861          * the asterisk instance.  Since it no longer needs the cache of these message
862          * types, and no other function needs them, we no longer cache them.
863          */
864         if (stasis_subscription_change_type() == msg_type) {
865                 ao2_cleanup(caching_topic_needs_unref);
866                 return;
867         } else if (stasis_cache_clear_type() == msg_type) {
868                 /* Cache clear event. */
869                 msg_put = NULL;
870                 msg = stasis_message_data(message);
871                 msg_type = stasis_message_type(msg);
872         } else {
873                 /* Normal cache update event. */
874                 msg_put = message;
875                 msg = message;
876         }
877         ast_assert(msg_type != NULL);
878
879         msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
880         msg_id = caching_topic->cache->id_fn(msg);
881         if (msg_id && msg_eid) {
882                 struct stasis_message *update;
883                 struct cache_put_snapshots snapshots;
884
885                 /* Update the cache */
886                 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
887                 if (snapshots.old || msg_put) {
888                         if (stasis_topic_subscribers(caching_topic->topic)) {
889                                 update = update_create(snapshots.old, msg_put);
890                                 if (update) {
891                                         stasis_publish(caching_topic->topic, update);
892                                         ao2_ref(update, -1);
893                                 }
894                         }
895                 } else {
896                         ast_debug(1,
897                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
898                                 stasis_topic_name(caching_topic->topic),
899                                 stasis_message_type_name(msg_type), msg_id);
900                 }
901
902                 if (snapshots.aggregate_old != snapshots.aggregate_new) {
903                         if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
904                                 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
905                                         snapshots.aggregate_new);
906                         }
907                         if (stasis_topic_subscribers(caching_topic->topic)) {
908                                 update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
909                                 if (update) {
910                                         stasis_publish(caching_topic->topic, update);
911                                         ao2_ref(update, -1);
912                                 }
913                         }
914                 }
915
916                 ao2_cleanup(snapshots.old);
917                 ao2_cleanup(snapshots.aggregate_old);
918                 ao2_cleanup(snapshots.aggregate_new);
919         }
920
921         ao2_cleanup(caching_topic_needs_unref);
922 }
923
924 static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
925 {
926         struct stasis_cache_entry *entry = v_obj;
927
928         if (!entry) {
929                 return;
930         }
931         prnt(where, "Type: %s  ID: %s  Hash: %u", stasis_message_type_name(entry->key.type),
932                 entry->key.id, entry->key.hash);
933 }
934
935 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
936 {
937         struct stasis_caching_topic *caching_topic;
938         char *new_name;
939         int ret;
940
941         ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
942         if (ret < 0) {
943                 return NULL;
944         }
945
946         caching_topic = ao2_alloc_options(sizeof(*caching_topic),
947                 stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
948         if (caching_topic == NULL) {
949                 ast_free(new_name);
950
951                 return NULL;
952         }
953
954         caching_topic->topic = stasis_topic_create(new_name);
955         if (caching_topic->topic == NULL) {
956                 ao2_ref(caching_topic, -1);
957                 ast_free(new_name);
958
959                 return NULL;
960         }
961
962         ao2_ref(cache, +1);
963         caching_topic->cache = cache;
964         if (!cache->registered) {
965                 if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
966                         ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
967                                 cache->entries, new_name);
968                 } else {
969                         cache->registered = 1;
970                 }
971         }
972         ast_free(new_name);
973
974         caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
975         if (caching_topic->sub == NULL) {
976                 ao2_ref(caching_topic, -1);
977
978                 return NULL;
979         }
980
981         ao2_ref(original_topic, +1);
982         caching_topic->original_topic = original_topic;
983
984         /* The subscription holds the reference, so no additional ref bump. */
985         return caching_topic;
986 }
987
988 static void stasis_cache_cleanup(void)
989 {
990         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
991         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
992 }
993
994 int stasis_cache_init(void)
995 {
996         ast_register_cleanup(stasis_cache_cleanup);
997
998         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
999                 return -1;
1000         }
1001
1002         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
1003                 return -1;
1004         }
1005
1006         return 0;
1007 }