stasis_cache: Stop caching stasis subscription change messages
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/hashtab.h"
34 #include "asterisk/stasis_internal.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/utils.h"
37 #include "asterisk/vector.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \internal */
46 struct stasis_cache {
47         struct ao2_container *entries;
48         snapshot_get_id id_fn;
49         cache_aggregate_calc_fn aggregate_calc_fn;
50         cache_aggregate_publish_fn aggregate_publish_fn;
51         int registered;
52 };
53
54 /*! \internal */
55 struct stasis_caching_topic {
56         struct stasis_cache *cache;
57         struct stasis_topic *topic;
58         struct stasis_topic *original_topic;
59         struct stasis_subscription *sub;
60 };
61
62 static void stasis_caching_topic_dtor(void *obj)
63 {
64         struct stasis_caching_topic *caching_topic = obj;
65
66         /* Caching topics contain subscriptions, and must be manually
67          * unsubscribed. */
68         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
69         /* If there are any messages in flight to this subscription; that would
70          * be bad. */
71         ast_assert(stasis_subscription_is_done(caching_topic->sub));
72
73         ao2_container_unregister(stasis_topic_name(caching_topic->topic));
74
75         ao2_cleanup(caching_topic->sub);
76         caching_topic->sub = NULL;
77         ao2_cleanup(caching_topic->cache);
78         caching_topic->cache = NULL;
79         ao2_cleanup(caching_topic->topic);
80         caching_topic->topic = NULL;
81         ao2_cleanup(caching_topic->original_topic);
82         caching_topic->original_topic = NULL;
83 }
84
85 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
86 {
87         return caching_topic->topic;
88 }
89
90 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
91 {
92         if (!caching_topic) {
93                 return NULL;
94         }
95
96         /*
97          * The subscription may hold the last reference to this caching
98          * topic, but we want to make sure the unsubscribe finishes
99          * before kicking of the caching topic's dtor.
100          */
101         ao2_ref(caching_topic, +1);
102
103         if (stasis_subscription_is_subscribed(caching_topic->sub)) {
104                 /*
105                  * Increment the reference to hold on to it past the
106                  * unsubscribe. Will be cleaned up in dtor.
107                  */
108                 ao2_ref(caching_topic->sub, +1);
109                 stasis_unsubscribe(caching_topic->sub);
110         } else {
111                 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
112         }
113         ao2_cleanup(caching_topic);
114         return NULL;
115 }
116
117 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
118 {
119         if (!caching_topic) {
120                 return NULL;
121         }
122
123         /* Hold a ref past the unsubscribe */
124         ao2_ref(caching_topic, +1);
125         stasis_caching_unsubscribe(caching_topic);
126         stasis_subscription_join(caching_topic->sub);
127         ao2_cleanup(caching_topic);
128         return NULL;
129 }
130
131 /*!
132  * \brief The key for an entry in the cache
133  * \note The items in this struct must be immutable for the item in the cache
134  */
135 struct cache_entry_key {
136         /*! The message type of the item stored in the cache */
137         struct stasis_message_type *type;
138         /*! The unique ID of the item stored in the cache */
139         const char *id;
140         /*! The hash, computed from \c type and \c id */
141         unsigned int hash;
142 };
143
144 struct stasis_cache_entry {
145         struct cache_entry_key key;
146         /*! Aggregate snapshot of the stasis cache. */
147         struct stasis_message *aggregate;
148         /*! Local entity snapshot of the stasis event. */
149         struct stasis_message *local;
150         /*! Remote entity snapshots of the stasis event. */
151         AST_VECTOR(, struct stasis_message *) remote;
152 };
153
154 static void cache_entry_dtor(void *obj)
155 {
156         struct stasis_cache_entry *entry = obj;
157         size_t idx;
158
159         ao2_cleanup(entry->key.type);
160         entry->key.type = NULL;
161         ast_free((char *) entry->key.id);
162         entry->key.id = NULL;
163
164         ao2_cleanup(entry->aggregate);
165         entry->aggregate = NULL;
166         ao2_cleanup(entry->local);
167         entry->local = NULL;
168
169         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
170                 struct stasis_message *remote;
171
172                 remote = AST_VECTOR_GET(&entry->remote, idx);
173                 ao2_cleanup(remote);
174         }
175         AST_VECTOR_FREE(&entry->remote);
176 }
177
178 static void cache_entry_compute_hash(struct cache_entry_key *key)
179 {
180         key->hash = stasis_message_type_hash(key->type);
181         key->hash += ast_hashtab_hash_string(key->id);
182 }
183
184 static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
185 {
186         struct stasis_cache_entry *entry;
187         int is_remote;
188
189         ast_assert(id != NULL);
190         ast_assert(snapshot != NULL);
191
192         if (!type) {
193                 return NULL;
194         }
195
196         entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
197                 AO2_ALLOC_OPT_LOCK_NOLOCK);
198         if (!entry) {
199                 return NULL;
200         }
201
202         entry->key.id = ast_strdup(id);
203         if (!entry->key.id) {
204                 ao2_cleanup(entry);
205                 return NULL;
206         }
207         entry->key.type = ao2_bump(type);
208         cache_entry_compute_hash(&entry->key);
209
210         is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
211         if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
212                 ao2_cleanup(entry);
213                 return NULL;
214         }
215
216         if (is_remote) {
217                 if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
218                         ao2_cleanup(entry);
219                         return NULL;
220                 }
221         } else {
222                 entry->local = snapshot;
223         }
224         ao2_bump(snapshot);
225
226         return entry;
227 }
228
229 static int cache_entry_hash(const void *obj, int flags)
230 {
231         const struct stasis_cache_entry *object;
232         const struct cache_entry_key *key;
233
234         switch (flags & OBJ_SEARCH_MASK) {
235         case OBJ_SEARCH_KEY:
236                 key = obj;
237                 break;
238         case OBJ_SEARCH_OBJECT:
239                 object = obj;
240                 key = &object->key;
241                 break;
242         default:
243                 /* Hash can only work on something with a full key. */
244                 ast_assert(0);
245                 return 0;
246         }
247
248         return (int)key->hash;
249 }
250
251 static int cache_entry_cmp(void *obj, void *arg, int flags)
252 {
253         const struct stasis_cache_entry *object_left = obj;
254         const struct stasis_cache_entry *object_right = arg;
255         const struct cache_entry_key *right_key = arg;
256         int cmp;
257
258         switch (flags & OBJ_SEARCH_MASK) {
259         case OBJ_SEARCH_OBJECT:
260                 right_key = &object_right->key;
261                 /* Fall through */
262         case OBJ_SEARCH_KEY:
263                 cmp = object_left->key.type != right_key->type
264                         || strcmp(object_left->key.id, right_key->id);
265                 break;
266         case OBJ_SEARCH_PARTIAL_KEY:
267                 /* Not supported by container */
268                 ast_assert(0);
269                 cmp = -1;
270                 break;
271         default:
272                 /*
273                  * What arg points to is specific to this traversal callback
274                  * and has no special meaning to astobj2.
275                  */
276                 cmp = 0;
277                 break;
278         }
279         if (cmp) {
280                 return 0;
281         }
282         /*
283          * At this point the traversal callback is identical to a sorted
284          * container.
285          */
286         return CMP_MATCH;
287 }
288
289 static void cache_dtor(void *obj)
290 {
291         struct stasis_cache *cache = obj;
292
293         ao2_cleanup(cache->entries);
294         cache->entries = NULL;
295 }
296
297 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
298         cache_aggregate_calc_fn aggregate_calc_fn,
299         cache_aggregate_publish_fn aggregate_publish_fn)
300 {
301         struct stasis_cache *cache;
302
303         cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
304                 AO2_ALLOC_OPT_LOCK_NOLOCK);
305         if (!cache) {
306                 return NULL;
307         }
308
309         cache->entries = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
310                 NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
311         if (!cache->entries) {
312                 ao2_cleanup(cache);
313                 return NULL;
314         }
315
316         cache->id_fn = id_fn;
317         cache->aggregate_calc_fn = aggregate_calc_fn;
318         cache->aggregate_publish_fn = aggregate_publish_fn;
319
320         return cache;
321 }
322
323 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
324 {
325         return stasis_cache_create_full(id_fn, NULL, NULL);
326 }
327
328 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
329 {
330         return entry->aggregate;
331 }
332
333 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
334 {
335         return entry->local;
336 }
337
338 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
339 {
340         if (idx < AST_VECTOR_SIZE(&entry->remote)) {
341                 return AST_VECTOR_GET(&entry->remote, idx);
342         }
343         return NULL;
344 }
345
346 /*!
347  * \internal
348  * \brief Find the cache entry in the cache entries container.
349  *
350  * \param entries Container of cached entries.
351  * \param type Type of message to retrieve the cache entry.
352  * \param id Identity of the snapshot to retrieve the cache entry.
353  *
354  * \note The entries container is already locked.
355  *
356  * \retval Cache-entry on success.
357  * \retval NULL Not in cache.
358  */
359 static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
360 {
361         struct cache_entry_key search_key;
362         struct stasis_cache_entry *entry;
363
364         search_key.type = type;
365         search_key.id = id;
366         cache_entry_compute_hash(&search_key);
367         entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
368
369         /* Ensure that what we looked for is what we found. */
370         ast_assert(!entry
371                 || (!strcmp(stasis_message_type_name(entry->key.type),
372                         stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
373         return entry;
374 }
375
376 /*!
377  * \internal
378  * \brief Remove the stasis snapshot in the cache entry determined by eid.
379  *
380  * \param entries Container of cached entries.
381  * \param cached_entry The entry to remove the snapshot from.
382  * \param eid Which snapshot in the cached entry.
383  *
384  * \note The entries container is already locked.
385  *
386  * \return Previous stasis entry snapshot.
387  */
388 static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
389 {
390         struct stasis_message *old_snapshot;
391         int is_remote;
392
393         is_remote = ast_eid_cmp(eid, &ast_eid_default);
394         if (!is_remote) {
395                 old_snapshot = cached_entry->local;
396                 cached_entry->local = NULL;
397         } else {
398                 int idx;
399
400                 old_snapshot = NULL;
401                 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
402                         struct stasis_message *cur;
403
404                         cur = AST_VECTOR_GET(&cached_entry->remote, idx);
405                         if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
406                                 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
407                                 break;
408                         }
409                 }
410         }
411
412         if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
413                 ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
414         }
415
416         return old_snapshot;
417 }
418
419 /*!
420  * \internal
421  * \brief Update the stasis snapshot in the cache entry determined by eid.
422  *
423  * \param cached_entry The entry to remove the snapshot from.
424  * \param eid Which snapshot in the cached entry.
425  * \param new_snapshot Snapshot to replace the old snapshot.
426  *
427  * \return Previous stasis entry snapshot.
428  */
429 static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
430 {
431         struct stasis_message *old_snapshot;
432         int is_remote;
433         int idx;
434
435         is_remote = ast_eid_cmp(eid, &ast_eid_default);
436         if (!is_remote) {
437                 old_snapshot = cached_entry->local;
438                 cached_entry->local = ao2_bump(new_snapshot);
439                 return old_snapshot;
440         }
441
442         old_snapshot = NULL;
443         for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
444                 struct stasis_message *cur;
445
446                 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
447                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
448                         old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
449                         break;
450                 }
451         }
452         if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
453                 ao2_bump(new_snapshot);
454         }
455
456         return old_snapshot;
457 }
458
459 struct cache_put_snapshots {
460         /*! Old cache eid snapshot. */
461         struct stasis_message *old;
462         /*! Old cache aggregate snapshot. */
463         struct stasis_message *aggregate_old;
464         /*! New cache aggregate snapshot. */
465         struct stasis_message *aggregate_new;
466 };
467
468 static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
469         struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
470         struct stasis_message *new_snapshot)
471 {
472         struct stasis_cache_entry *cached_entry;
473         struct cache_put_snapshots snapshots;
474
475         ast_assert(cache->entries != NULL);
476         ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
477         ast_assert(new_snapshot == NULL ||
478                 type == stasis_message_type(new_snapshot));
479
480         memset(&snapshots, 0, sizeof(snapshots));
481
482         ao2_wrlock(cache->entries);
483
484         cached_entry = cache_find(cache->entries, type, id);
485
486         /* Update the eid snapshot. */
487         if (!new_snapshot) {
488                 /* Remove snapshot from cache */
489                 if (cached_entry) {
490                         snapshots.old = cache_remove(cache->entries, cached_entry, eid);
491                 }
492         } else if (cached_entry) {
493                 /* Update snapshot in cache */
494                 snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
495         } else {
496                 /* Insert into the cache */
497                 cached_entry = cache_entry_create(type, id, new_snapshot);
498                 if (cached_entry) {
499                         ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
500                 }
501         }
502
503         /* Update the aggregate snapshot. */
504         if (cache->aggregate_calc_fn && cached_entry) {
505                 snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
506                 snapshots.aggregate_old = cached_entry->aggregate;
507                 cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
508         }
509
510         ao2_unlock(cache->entries);
511
512         ao2_cleanup(cached_entry);
513         return snapshots;
514 }
515
516 /*!
517  * \internal
518  * \brief Dump all entity snapshots in the cache entry into the given container.
519  *
520  * \param snapshots Container to put all snapshots in the cache entry.
521  * \param entry Cache entry to use.
522  *
523  * \retval 0 on success.
524  * \retval non-zero on error.
525  */
526 static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
527 {
528         int idx;
529         int err = 0;
530
531         ast_assert(snapshots != NULL);
532         ast_assert(entry != NULL);
533
534         /* The aggregate snapshot is not a snapshot from an entity. */
535
536         if (entry->local) {
537                 err |= !ao2_link(snapshots, entry->local);
538         }
539
540         for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
541                 struct stasis_message *snapshot;
542
543                 snapshot = AST_VECTOR_GET(&entry->remote, idx);
544                 err |= !ao2_link(snapshots, snapshot);
545         }
546
547         return err;
548 }
549
550 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
551 {
552         struct stasis_cache_entry *cached_entry;
553         struct ao2_container *found;
554
555         ast_assert(cache != NULL);
556         ast_assert(cache->entries != NULL);
557         ast_assert(id != NULL);
558
559         if (!type) {
560                 return NULL;
561         }
562
563         found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
564         if (!found) {
565                 return NULL;
566         }
567
568         ao2_rdlock(cache->entries);
569
570         cached_entry = cache_find(cache->entries, type, id);
571         if (cached_entry && cache_entry_dump(found, cached_entry)) {
572                 ao2_cleanup(found);
573                 found = NULL;
574         }
575
576         ao2_unlock(cache->entries);
577
578         ao2_cleanup(cached_entry);
579         return found;
580 }
581
582 /*!
583  * \internal
584  * \brief Retrieve an item from the cache entry for a specific eid.
585  *
586  * \param entry Cache entry to use.
587  * \param eid Specific entity id to retrieve.  NULL for aggregate.
588  *
589  * \note The returned snapshot has not had its reference bumped.
590  *
591  * \retval Snapshot from the cache.
592  * \retval \c NULL if snapshot is not found.
593  */
594 static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
595 {
596         int is_remote;
597         int idx;
598
599         if (!eid) {
600                 /* Get aggregate. */
601                 return entry->aggregate;
602         }
603
604         /* Get snapshot with specific eid. */
605         is_remote = ast_eid_cmp(eid, &ast_eid_default);
606         if (!is_remote) {
607                 return entry->local;
608         }
609
610         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
611                 struct stasis_message *cur;
612
613                 cur = AST_VECTOR_GET(&entry->remote, idx);
614                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
615                         return cur;
616                 }
617         }
618
619         return NULL;
620 }
621
622 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
623 {
624         struct stasis_cache_entry *cached_entry;
625         struct stasis_message *snapshot = NULL;
626
627         ast_assert(cache != NULL);
628         ast_assert(cache->entries != NULL);
629         ast_assert(id != NULL);
630
631         if (!type) {
632                 return NULL;
633         }
634
635         ao2_rdlock(cache->entries);
636
637         cached_entry = cache_find(cache->entries, type, id);
638         if (cached_entry) {
639                 snapshot = cache_entry_by_eid(cached_entry, eid);
640                 ao2_bump(snapshot);
641         }
642
643         ao2_unlock(cache->entries);
644
645         ao2_cleanup(cached_entry);
646         return snapshot;
647 }
648
649 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
650 {
651         return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
652 }
653
654 struct cache_dump_data {
655         struct ao2_container *container;
656         struct stasis_message_type *type;
657         const struct ast_eid *eid;
658 };
659
660 static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
661 {
662         struct cache_dump_data *cache_dump = arg;
663         struct stasis_cache_entry *entry = obj;
664
665         if (!cache_dump->type || entry->key.type == cache_dump->type) {
666                 struct stasis_message *snapshot;
667
668                 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
669                 if (snapshot) {
670                         if (!ao2_link(cache_dump->container, snapshot)) {
671                                 ao2_cleanup(cache_dump->container);
672                                 cache_dump->container = NULL;
673                                 return CMP_STOP;
674                         }
675                 }
676         }
677
678         return 0;
679 }
680
681 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
682 {
683         struct cache_dump_data cache_dump;
684
685         ast_assert(cache != NULL);
686         ast_assert(cache->entries != NULL);
687
688         cache_dump.eid = eid;
689         cache_dump.type = type;
690         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
691         if (!cache_dump.container) {
692                 return NULL;
693         }
694
695         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
696         return cache_dump.container;
697 }
698
699 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
700 {
701         return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
702 }
703
704 static int cache_dump_all_cb(void *obj, void *arg, int flags)
705 {
706         struct cache_dump_data *cache_dump = arg;
707         struct stasis_cache_entry *entry = obj;
708
709         if (!cache_dump->type || entry->key.type == cache_dump->type) {
710                 if (cache_entry_dump(cache_dump->container, entry)) {
711                         ao2_cleanup(cache_dump->container);
712                         cache_dump->container = NULL;
713                         return CMP_STOP;
714                 }
715         }
716
717         return 0;
718 }
719
720 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
721 {
722         struct cache_dump_data cache_dump;
723
724         ast_assert(cache != NULL);
725         ast_assert(cache->entries != NULL);
726
727         cache_dump.eid = NULL;
728         cache_dump.type = type;
729         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
730         if (!cache_dump.container) {
731                 return NULL;
732         }
733
734         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
735         return cache_dump.container;
736 }
737
738 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
739 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
740
741 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
742 {
743         return stasis_message_create(stasis_cache_clear_type(), id_message);
744 }
745
746 static void stasis_cache_update_dtor(void *obj)
747 {
748         struct stasis_cache_update *update = obj;
749
750         ao2_cleanup(update->old_snapshot);
751         update->old_snapshot = NULL;
752         ao2_cleanup(update->new_snapshot);
753         update->new_snapshot = NULL;
754         ao2_cleanup(update->type);
755         update->type = NULL;
756 }
757
758 static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
759 {
760         struct stasis_cache_update *update;
761         struct stasis_message *msg;
762
763         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
764
765         if (!stasis_cache_update_type()) {
766                 return NULL;
767         }
768
769         update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
770                 AO2_ALLOC_OPT_LOCK_NOLOCK);
771         if (!update) {
772                 return NULL;
773         }
774
775         if (old_snapshot) {
776                 ao2_ref(old_snapshot, +1);
777                 update->old_snapshot = old_snapshot;
778                 if (!new_snapshot) {
779                         ao2_ref(stasis_message_type(old_snapshot), +1);
780                         update->type = stasis_message_type(old_snapshot);
781                 }
782         }
783         if (new_snapshot) {
784                 ao2_ref(new_snapshot, +1);
785                 update->new_snapshot = new_snapshot;
786                 ao2_ref(stasis_message_type(new_snapshot), +1);
787                 update->type = stasis_message_type(new_snapshot);
788         }
789
790         msg = stasis_message_create(stasis_cache_update_type(), update);
791
792         ao2_cleanup(update);
793         return msg;
794 }
795
796 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
797         struct stasis_message *message)
798 {
799         struct stasis_caching_topic *caching_topic_needs_unref;
800         struct stasis_caching_topic *caching_topic = data;
801         struct stasis_message *msg;
802         struct stasis_message *msg_put;
803         struct stasis_message_type *msg_type;
804         const struct ast_eid *msg_eid;
805         const char *msg_id;
806
807         ast_assert(caching_topic != NULL);
808         ast_assert(caching_topic->topic != NULL);
809         ast_assert(caching_topic->cache != NULL);
810         ast_assert(caching_topic->cache->id_fn != NULL);
811
812         if (stasis_subscription_final_message(sub, message)) {
813                 caching_topic_needs_unref = caching_topic;
814         } else {
815                 caching_topic_needs_unref = NULL;
816         }
817
818         msg_type = stasis_message_type(message);
819
820         /*
821          * app_voicemail used to rely on the cache containing every topic subscribe and
822          * unsubscribe in order to determine if anyone was currently subscribed to a
823          * particular mailbox.  This caused the cache to grow unabated for the life of
824          * the asterisk instance.  Since it no longer needs the cache of these message
825          * types, and no other function needs them, we no longer cache them.
826          */
827         if (stasis_subscription_change_type() == msg_type) {
828                 ao2_cleanup(caching_topic_needs_unref);
829                 return;
830         } else if (stasis_cache_clear_type() == msg_type) {
831                 /* Cache clear event. */
832                 msg_put = NULL;
833                 msg = stasis_message_data(message);
834                 msg_type = stasis_message_type(msg);
835         } else {
836                 /* Normal cache update event. */
837                 msg_put = message;
838                 msg = message;
839         }
840         ast_assert(msg_type != NULL);
841
842         msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
843         msg_id = caching_topic->cache->id_fn(msg);
844         if (msg_id && msg_eid) {
845                 struct stasis_message *update;
846                 struct cache_put_snapshots snapshots;
847
848                 /* Update the cache */
849                 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
850                 if (snapshots.old || msg_put) {
851                         update = update_create(snapshots.old, msg_put);
852                         if (update) {
853                                 stasis_publish(caching_topic->topic, update);
854                         }
855                         ao2_cleanup(update);
856                 } else {
857                         ast_debug(1,
858                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
859                                 stasis_topic_name(caching_topic->topic),
860                                 stasis_message_type_name(msg_type), msg_id);
861                 }
862
863                 if (snapshots.aggregate_old != snapshots.aggregate_new) {
864                         if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
865                                 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
866                                         snapshots.aggregate_new);
867                         }
868                         update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
869                         if (update) {
870                                 stasis_publish(caching_topic->topic, update);
871                         }
872                         ao2_cleanup(update);
873                 }
874
875                 ao2_cleanup(snapshots.old);
876                 ao2_cleanup(snapshots.aggregate_old);
877                 ao2_cleanup(snapshots.aggregate_new);
878         }
879
880         ao2_cleanup(caching_topic_needs_unref);
881 }
882
883 static void print_cache_entry(void *v_obj, void *where, ao2_prnt_fn *prnt)
884 {
885         struct stasis_cache_entry *entry = v_obj;
886
887         if (!entry) {
888                 return;
889         }
890         prnt(where, "Type: %s  ID: %s  Hash: %u", stasis_message_type_name(entry->key.type),
891                 entry->key.id, entry->key.hash);
892 }
893
894 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
895 {
896         struct stasis_caching_topic *caching_topic;
897         char *new_name;
898         int ret;
899
900         ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
901         if (ret < 0) {
902                 return NULL;
903         }
904
905         caching_topic = ao2_alloc_options(sizeof(*caching_topic),
906                 stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
907         if (caching_topic == NULL) {
908                 ast_free(new_name);
909
910                 return NULL;
911         }
912
913         caching_topic->topic = stasis_topic_create(new_name);
914         if (caching_topic->topic == NULL) {
915                 ao2_ref(caching_topic, -1);
916                 ast_free(new_name);
917
918                 return NULL;
919         }
920
921         ao2_ref(cache, +1);
922         caching_topic->cache = cache;
923         if (!cache->registered) {
924                 if (ao2_container_register(new_name, cache->entries, print_cache_entry)) {
925                         ast_log(LOG_ERROR, "Stasis cache container '%p' for '%s' did not register\n",
926                                 cache->entries, new_name);
927                 } else {
928                         cache->registered = 1;
929                 }
930         }
931         ast_free(new_name);
932
933         caching_topic->sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
934         if (caching_topic->sub == NULL) {
935                 ao2_ref(caching_topic, -1);
936
937                 return NULL;
938         }
939
940         ao2_ref(original_topic, +1);
941         caching_topic->original_topic = original_topic;
942
943         /* The subscription holds the reference, so no additional ref bump. */
944         return caching_topic;
945 }
946
947 static void stasis_cache_cleanup(void)
948 {
949         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
950         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
951 }
952
953 int stasis_cache_init(void)
954 {
955         ast_register_cleanup(stasis_cache_cleanup);
956
957         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
958                 return -1;
959         }
960
961         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
962                 return -1;
963         }
964
965         return 0;
966 }