PJPROJECT logging: Made easier to get available logging levels.
[asterisk/asterisk.git] / main / stasis_cache.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*! \file
20  *
21  * \brief Stasis Message API.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 #include "asterisk/astobj2.h"
33 #include "asterisk/hashtab.h"
34 #include "asterisk/stasis_internal.h"
35 #include "asterisk/stasis.h"
36 #include "asterisk/utils.h"
37 #include "asterisk/vector.h"
38
39 #ifdef LOW_MEMORY
40 #define NUM_CACHE_BUCKETS 17
41 #else
42 #define NUM_CACHE_BUCKETS 563
43 #endif
44
45 /*! \internal */
46 struct stasis_cache {
47         struct ao2_container *entries;
48         snapshot_get_id id_fn;
49         cache_aggregate_calc_fn aggregate_calc_fn;
50         cache_aggregate_publish_fn aggregate_publish_fn;
51 };
52
53 /*! \internal */
54 struct stasis_caching_topic {
55         struct stasis_cache *cache;
56         struct stasis_topic *topic;
57         struct stasis_topic *original_topic;
58         struct stasis_subscription *sub;
59 };
60
61 static void stasis_caching_topic_dtor(void *obj)
62 {
63         struct stasis_caching_topic *caching_topic = obj;
64
65         /* Caching topics contain subscriptions, and must be manually
66          * unsubscribed. */
67         ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
68         /* If there are any messages in flight to this subscription; that would
69          * be bad. */
70         ast_assert(stasis_subscription_is_done(caching_topic->sub));
71
72         ao2_cleanup(caching_topic->sub);
73         caching_topic->sub = NULL;
74         ao2_cleanup(caching_topic->cache);
75         caching_topic->cache = NULL;
76         ao2_cleanup(caching_topic->topic);
77         caching_topic->topic = NULL;
78         ao2_cleanup(caching_topic->original_topic);
79         caching_topic->original_topic = NULL;
80 }
81
82 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
83 {
84         return caching_topic->topic;
85 }
86
87 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
88 {
89         if (!caching_topic) {
90                 return NULL;
91         }
92
93         /*
94          * The subscription may hold the last reference to this caching
95          * topic, but we want to make sure the unsubscribe finishes
96          * before kicking of the caching topic's dtor.
97          */
98         ao2_ref(caching_topic, +1);
99
100         if (stasis_subscription_is_subscribed(caching_topic->sub)) {
101                 /*
102                  * Increment the reference to hold on to it past the
103                  * unsubscribe. Will be cleaned up in dtor.
104                  */
105                 ao2_ref(caching_topic->sub, +1);
106                 stasis_unsubscribe(caching_topic->sub);
107         } else {
108                 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
109         }
110         ao2_cleanup(caching_topic);
111         return NULL;
112 }
113
114 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
115 {
116         if (!caching_topic) {
117                 return NULL;
118         }
119
120         /* Hold a ref past the unsubscribe */
121         ao2_ref(caching_topic, +1);
122         stasis_caching_unsubscribe(caching_topic);
123         stasis_subscription_join(caching_topic->sub);
124         ao2_cleanup(caching_topic);
125         return NULL;
126 }
127
128 /*!
129  * \brief The key for an entry in the cache
130  * \note The items in this struct must be immutable for the item in the cache
131  */
132 struct cache_entry_key {
133         /*! The message type of the item stored in the cache */
134         struct stasis_message_type *type;
135         /*! The unique ID of the item stored in the cache */
136         const char *id;
137         /*! The hash, computed from \c type and \c id */
138         unsigned int hash;
139 };
140
141 struct stasis_cache_entry {
142         struct cache_entry_key key;
143         /*! Aggregate snapshot of the stasis cache. */
144         struct stasis_message *aggregate;
145         /*! Local entity snapshot of the stasis event. */
146         struct stasis_message *local;
147         /*! Remote entity snapshots of the stasis event. */
148         AST_VECTOR(, struct stasis_message *) remote;
149 };
150
151 static void cache_entry_dtor(void *obj)
152 {
153         struct stasis_cache_entry *entry = obj;
154         size_t idx;
155
156         ao2_cleanup(entry->key.type);
157         entry->key.type = NULL;
158         ast_free((char *) entry->key.id);
159         entry->key.id = NULL;
160
161         ao2_cleanup(entry->aggregate);
162         entry->aggregate = NULL;
163         ao2_cleanup(entry->local);
164         entry->local = NULL;
165
166         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
167                 struct stasis_message *remote;
168
169                 remote = AST_VECTOR_GET(&entry->remote, idx);
170                 ao2_cleanup(remote);
171         }
172         AST_VECTOR_FREE(&entry->remote);
173 }
174
175 static void cache_entry_compute_hash(struct cache_entry_key *key)
176 {
177         key->hash = ast_hashtab_hash_string(stasis_message_type_name(key->type));
178         key->hash += ast_hashtab_hash_string(key->id);
179 }
180
181 static struct stasis_cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
182 {
183         struct stasis_cache_entry *entry;
184         int is_remote;
185
186         ast_assert(id != NULL);
187         ast_assert(snapshot != NULL);
188
189         if (!type) {
190                 return NULL;
191         }
192
193         entry = ao2_alloc_options(sizeof(*entry), cache_entry_dtor,
194                 AO2_ALLOC_OPT_LOCK_NOLOCK);
195         if (!entry) {
196                 return NULL;
197         }
198
199         entry->key.id = ast_strdup(id);
200         if (!entry->key.id) {
201                 ao2_cleanup(entry);
202                 return NULL;
203         }
204         entry->key.type = ao2_bump(type);
205         cache_entry_compute_hash(&entry->key);
206
207         is_remote = ast_eid_cmp(&ast_eid_default, stasis_message_eid(snapshot)) ? 1 : 0;
208         if (AST_VECTOR_INIT(&entry->remote, is_remote)) {
209                 ao2_cleanup(entry);
210                 return NULL;
211         }
212
213         if (is_remote) {
214                 if (AST_VECTOR_APPEND(&entry->remote, snapshot)) {
215                         ao2_cleanup(entry);
216                         return NULL;
217                 }
218         } else {
219                 entry->local = snapshot;
220         }
221         ao2_bump(snapshot);
222
223         return entry;
224 }
225
226 static int cache_entry_hash(const void *obj, int flags)
227 {
228         const struct stasis_cache_entry *object;
229         const struct cache_entry_key *key;
230
231         switch (flags & OBJ_SEARCH_MASK) {
232         case OBJ_SEARCH_KEY:
233                 key = obj;
234                 break;
235         case OBJ_SEARCH_OBJECT:
236                 object = obj;
237                 key = &object->key;
238                 break;
239         default:
240                 /* Hash can only work on something with a full key. */
241                 ast_assert(0);
242                 return 0;
243         }
244
245         return (int)key->hash;
246 }
247
248 static int cache_entry_cmp(void *obj, void *arg, int flags)
249 {
250         const struct stasis_cache_entry *object_left = obj;
251         const struct stasis_cache_entry *object_right = arg;
252         const struct cache_entry_key *right_key = arg;
253         int cmp;
254
255         switch (flags & OBJ_SEARCH_MASK) {
256         case OBJ_SEARCH_OBJECT:
257                 right_key = &object_right->key;
258                 /* Fall through */
259         case OBJ_SEARCH_KEY:
260                 cmp = object_left->key.type != right_key->type
261                         || strcmp(object_left->key.id, right_key->id);
262                 break;
263         case OBJ_SEARCH_PARTIAL_KEY:
264                 /* Not supported by container */
265                 ast_assert(0);
266                 cmp = -1;
267                 break;
268         default:
269                 /*
270                  * What arg points to is specific to this traversal callback
271                  * and has no special meaning to astobj2.
272                  */
273                 cmp = 0;
274                 break;
275         }
276         if (cmp) {
277                 return 0;
278         }
279         /*
280          * At this point the traversal callback is identical to a sorted
281          * container.
282          */
283         return CMP_MATCH;
284 }
285
286 static void cache_dtor(void *obj)
287 {
288         struct stasis_cache *cache = obj;
289
290         ao2_cleanup(cache->entries);
291         cache->entries = NULL;
292 }
293
294 struct stasis_cache *stasis_cache_create_full(snapshot_get_id id_fn,
295         cache_aggregate_calc_fn aggregate_calc_fn,
296         cache_aggregate_publish_fn aggregate_publish_fn)
297 {
298         struct stasis_cache *cache;
299
300         cache = ao2_alloc_options(sizeof(*cache), cache_dtor,
301                 AO2_ALLOC_OPT_LOCK_NOLOCK);
302         if (!cache) {
303                 return NULL;
304         }
305
306         cache->entries = ao2_container_alloc_hash(AO2_ALLOC_OPT_LOCK_RWLOCK, 0,
307                 NUM_CACHE_BUCKETS, cache_entry_hash, NULL, cache_entry_cmp);
308         if (!cache->entries) {
309                 ao2_cleanup(cache);
310                 return NULL;
311         }
312
313         cache->id_fn = id_fn;
314         cache->aggregate_calc_fn = aggregate_calc_fn;
315         cache->aggregate_publish_fn = aggregate_publish_fn;
316
317         return cache;
318 }
319
320 struct stasis_cache *stasis_cache_create(snapshot_get_id id_fn)
321 {
322         return stasis_cache_create_full(id_fn, NULL, NULL);
323 }
324
325 struct stasis_message *stasis_cache_entry_get_aggregate(struct stasis_cache_entry *entry)
326 {
327         return entry->aggregate;
328 }
329
330 struct stasis_message *stasis_cache_entry_get_local(struct stasis_cache_entry *entry)
331 {
332         return entry->local;
333 }
334
335 struct stasis_message *stasis_cache_entry_get_remote(struct stasis_cache_entry *entry, int idx)
336 {
337         if (idx < AST_VECTOR_SIZE(&entry->remote)) {
338                 return AST_VECTOR_GET(&entry->remote, idx);
339         }
340         return NULL;
341 }
342
343 /*!
344  * \internal
345  * \brief Find the cache entry in the cache entries container.
346  *
347  * \param entries Container of cached entries.
348  * \param type Type of message to retrieve the cache entry.
349  * \param id Identity of the snapshot to retrieve the cache entry.
350  *
351  * \note The entries container is already locked.
352  *
353  * \retval Cache-entry on success.
354  * \retval NULL Not in cache.
355  */
356 static struct stasis_cache_entry *cache_find(struct ao2_container *entries, struct stasis_message_type *type, const char *id)
357 {
358         struct cache_entry_key search_key;
359         struct stasis_cache_entry *entry;
360
361         search_key.type = type;
362         search_key.id = id;
363         cache_entry_compute_hash(&search_key);
364         entry = ao2_find(entries, &search_key, OBJ_SEARCH_KEY | OBJ_NOLOCK);
365
366         /* Ensure that what we looked for is what we found. */
367         ast_assert(!entry
368                 || (!strcmp(stasis_message_type_name(entry->key.type),
369                         stasis_message_type_name(type)) && !strcmp(entry->key.id, id)));
370         return entry;
371 }
372
373 /*!
374  * \internal
375  * \brief Remove the stasis snapshot in the cache entry determined by eid.
376  *
377  * \param entries Container of cached entries.
378  * \param cached_entry The entry to remove the snapshot from.
379  * \param eid Which snapshot in the cached entry.
380  *
381  * \note The entries container is already locked.
382  *
383  * \return Previous stasis entry snapshot.
384  */
385 static struct stasis_message *cache_remove(struct ao2_container *entries, struct stasis_cache_entry *cached_entry, const struct ast_eid *eid)
386 {
387         struct stasis_message *old_snapshot;
388         int is_remote;
389
390         is_remote = ast_eid_cmp(eid, &ast_eid_default);
391         if (!is_remote) {
392                 old_snapshot = cached_entry->local;
393                 cached_entry->local = NULL;
394         } else {
395                 int idx;
396
397                 old_snapshot = NULL;
398                 for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
399                         struct stasis_message *cur;
400
401                         cur = AST_VECTOR_GET(&cached_entry->remote, idx);
402                         if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
403                                 old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
404                                 break;
405                         }
406                 }
407         }
408
409         if (!cached_entry->local && !AST_VECTOR_SIZE(&cached_entry->remote)) {
410                 ao2_unlink_flags(entries, cached_entry, OBJ_NOLOCK);
411         }
412
413         return old_snapshot;
414 }
415
416 /*!
417  * \internal
418  * \brief Update the stasis snapshot in the cache entry determined by eid.
419  *
420  * \param cached_entry The entry to remove the snapshot from.
421  * \param eid Which snapshot in the cached entry.
422  * \param new_snapshot Snapshot to replace the old snapshot.
423  *
424  * \return Previous stasis entry snapshot.
425  */
426 static struct stasis_message *cache_udpate(struct stasis_cache_entry *cached_entry, const struct ast_eid *eid, struct stasis_message *new_snapshot)
427 {
428         struct stasis_message *old_snapshot;
429         int is_remote;
430         int idx;
431
432         is_remote = ast_eid_cmp(eid, &ast_eid_default);
433         if (!is_remote) {
434                 old_snapshot = cached_entry->local;
435                 cached_entry->local = ao2_bump(new_snapshot);
436                 return old_snapshot;
437         }
438
439         old_snapshot = NULL;
440         for (idx = 0; idx < AST_VECTOR_SIZE(&cached_entry->remote); ++idx) {
441                 struct stasis_message *cur;
442
443                 cur = AST_VECTOR_GET(&cached_entry->remote, idx);
444                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
445                         old_snapshot = AST_VECTOR_REMOVE_UNORDERED(&cached_entry->remote, idx);
446                         break;
447                 }
448         }
449         if (!AST_VECTOR_APPEND(&cached_entry->remote, new_snapshot)) {
450                 ao2_bump(new_snapshot);
451         }
452
453         return old_snapshot;
454 }
455
456 struct cache_put_snapshots {
457         /*! Old cache eid snapshot. */
458         struct stasis_message *old;
459         /*! Old cache aggregate snapshot. */
460         struct stasis_message *aggregate_old;
461         /*! New cache aggregate snapshot. */
462         struct stasis_message *aggregate_new;
463 };
464
465 static struct cache_put_snapshots cache_put(struct stasis_cache *cache,
466         struct stasis_message_type *type, const char *id, const struct ast_eid *eid,
467         struct stasis_message *new_snapshot)
468 {
469         struct stasis_cache_entry *cached_entry;
470         struct cache_put_snapshots snapshots;
471
472         ast_assert(cache->entries != NULL);
473         ast_assert(eid != NULL);/* Aggregate snapshots not allowed to be put directly. */
474         ast_assert(new_snapshot == NULL ||
475                 type == stasis_message_type(new_snapshot));
476
477         memset(&snapshots, 0, sizeof(snapshots));
478
479         ao2_wrlock(cache->entries);
480
481         cached_entry = cache_find(cache->entries, type, id);
482
483         /* Update the eid snapshot. */
484         if (!new_snapshot) {
485                 /* Remove snapshot from cache */
486                 if (cached_entry) {
487                         snapshots.old = cache_remove(cache->entries, cached_entry, eid);
488                 }
489         } else if (cached_entry) {
490                 /* Update snapshot in cache */
491                 snapshots.old = cache_udpate(cached_entry, eid, new_snapshot);
492         } else {
493                 /* Insert into the cache */
494                 cached_entry = cache_entry_create(type, id, new_snapshot);
495                 if (cached_entry) {
496                         ao2_link_flags(cache->entries, cached_entry, OBJ_NOLOCK);
497                 }
498         }
499
500         /* Update the aggregate snapshot. */
501         if (cache->aggregate_calc_fn && cached_entry) {
502                 snapshots.aggregate_new = cache->aggregate_calc_fn(cached_entry, new_snapshot);
503                 snapshots.aggregate_old = cached_entry->aggregate;
504                 cached_entry->aggregate = ao2_bump(snapshots.aggregate_new);
505         }
506
507         ao2_unlock(cache->entries);
508
509         ao2_cleanup(cached_entry);
510         return snapshots;
511 }
512
513 /*!
514  * \internal
515  * \brief Dump all entity snapshots in the cache entry into the given container.
516  *
517  * \param snapshots Container to put all snapshots in the cache entry.
518  * \param entry Cache entry to use.
519  *
520  * \retval 0 on success.
521  * \retval non-zero on error.
522  */
523 static int cache_entry_dump(struct ao2_container *snapshots, const struct stasis_cache_entry *entry)
524 {
525         int idx;
526         int err = 0;
527
528         ast_assert(snapshots != NULL);
529         ast_assert(entry != NULL);
530
531         /* The aggregate snapshot is not a snapshot from an entity. */
532
533         if (entry->local) {
534                 err |= !ao2_link(snapshots, entry->local);
535         }
536
537         for (idx = 0; !err && idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
538                 struct stasis_message *snapshot;
539
540                 snapshot = AST_VECTOR_GET(&entry->remote, idx);
541                 err |= !ao2_link(snapshots, snapshot);
542         }
543
544         return err;
545 }
546
547 struct ao2_container *stasis_cache_get_all(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
548 {
549         struct stasis_cache_entry *cached_entry;
550         struct ao2_container *found;
551
552         ast_assert(cache != NULL);
553         ast_assert(cache->entries != NULL);
554         ast_assert(id != NULL);
555
556         if (!type) {
557                 return NULL;
558         }
559
560         found = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
561         if (!found) {
562                 return NULL;
563         }
564
565         ao2_rdlock(cache->entries);
566
567         cached_entry = cache_find(cache->entries, type, id);
568         if (cached_entry && cache_entry_dump(found, cached_entry)) {
569                 ao2_cleanup(found);
570                 found = NULL;
571         }
572
573         ao2_unlock(cache->entries);
574
575         ao2_cleanup(cached_entry);
576         return found;
577 }
578
579 /*!
580  * \internal
581  * \brief Retrieve an item from the cache entry for a specific eid.
582  *
583  * \param entry Cache entry to use.
584  * \param eid Specific entity id to retrieve.  NULL for aggregate.
585  *
586  * \note The returned snapshot has not had its reference bumped.
587  *
588  * \retval Snapshot from the cache.
589  * \retval \c NULL if snapshot is not found.
590  */
591 static struct stasis_message *cache_entry_by_eid(const struct stasis_cache_entry *entry, const struct ast_eid *eid)
592 {
593         int is_remote;
594         int idx;
595
596         if (!eid) {
597                 /* Get aggregate. */
598                 return entry->aggregate;
599         }
600
601         /* Get snapshot with specific eid. */
602         is_remote = ast_eid_cmp(eid, &ast_eid_default);
603         if (!is_remote) {
604                 return entry->local;
605         }
606
607         for (idx = 0; idx < AST_VECTOR_SIZE(&entry->remote); ++idx) {
608                 struct stasis_message *cur;
609
610                 cur = AST_VECTOR_GET(&entry->remote, idx);
611                 if (!ast_eid_cmp(eid, stasis_message_eid(cur))) {
612                         return cur;
613                 }
614         }
615
616         return NULL;
617 }
618
619 struct stasis_message *stasis_cache_get_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const char *id, const struct ast_eid *eid)
620 {
621         struct stasis_cache_entry *cached_entry;
622         struct stasis_message *snapshot = NULL;
623
624         ast_assert(cache != NULL);
625         ast_assert(cache->entries != NULL);
626         ast_assert(id != NULL);
627
628         if (!type) {
629                 return NULL;
630         }
631
632         ao2_rdlock(cache->entries);
633
634         cached_entry = cache_find(cache->entries, type, id);
635         if (cached_entry) {
636                 snapshot = cache_entry_by_eid(cached_entry, eid);
637                 ao2_bump(snapshot);
638         }
639
640         ao2_unlock(cache->entries);
641
642         ao2_cleanup(cached_entry);
643         return snapshot;
644 }
645
646 struct stasis_message *stasis_cache_get(struct stasis_cache *cache, struct stasis_message_type *type, const char *id)
647 {
648         return stasis_cache_get_by_eid(cache, type, id, &ast_eid_default);
649 }
650
651 struct cache_dump_data {
652         struct ao2_container *container;
653         struct stasis_message_type *type;
654         const struct ast_eid *eid;
655 };
656
657 static int cache_dump_by_eid_cb(void *obj, void *arg, int flags)
658 {
659         struct cache_dump_data *cache_dump = arg;
660         struct stasis_cache_entry *entry = obj;
661
662         if (!cache_dump->type || entry->key.type == cache_dump->type) {
663                 struct stasis_message *snapshot;
664
665                 snapshot = cache_entry_by_eid(entry, cache_dump->eid);
666                 if (snapshot) {
667                         if (!ao2_link(cache_dump->container, snapshot)) {
668                                 ao2_cleanup(cache_dump->container);
669                                 cache_dump->container = NULL;
670                                 return CMP_STOP;
671                         }
672                 }
673         }
674
675         return 0;
676 }
677
678 struct ao2_container *stasis_cache_dump_by_eid(struct stasis_cache *cache, struct stasis_message_type *type, const struct ast_eid *eid)
679 {
680         struct cache_dump_data cache_dump;
681
682         ast_assert(cache != NULL);
683         ast_assert(cache->entries != NULL);
684
685         cache_dump.eid = eid;
686         cache_dump.type = type;
687         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
688         if (!cache_dump.container) {
689                 return NULL;
690         }
691
692         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_by_eid_cb, &cache_dump);
693         return cache_dump.container;
694 }
695
696 struct ao2_container *stasis_cache_dump(struct stasis_cache *cache, struct stasis_message_type *type)
697 {
698         return stasis_cache_dump_by_eid(cache, type, &ast_eid_default);
699 }
700
701 static int cache_dump_all_cb(void *obj, void *arg, int flags)
702 {
703         struct cache_dump_data *cache_dump = arg;
704         struct stasis_cache_entry *entry = obj;
705
706         if (!cache_dump->type || entry->key.type == cache_dump->type) {
707                 if (cache_entry_dump(cache_dump->container, entry)) {
708                         ao2_cleanup(cache_dump->container);
709                         cache_dump->container = NULL;
710                         return CMP_STOP;
711                 }
712         }
713
714         return 0;
715 }
716
717 struct ao2_container *stasis_cache_dump_all(struct stasis_cache *cache, struct stasis_message_type *type)
718 {
719         struct cache_dump_data cache_dump;
720
721         ast_assert(cache != NULL);
722         ast_assert(cache->entries != NULL);
723
724         cache_dump.eid = NULL;
725         cache_dump.type = type;
726         cache_dump.container = ao2_container_alloc_list(AO2_ALLOC_OPT_LOCK_NOLOCK, 0, NULL, NULL);
727         if (!cache_dump.container) {
728                 return NULL;
729         }
730
731         ao2_callback(cache->entries, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_all_cb, &cache_dump);
732         return cache_dump.container;
733 }
734
735 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
736 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
737
738 struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_message)
739 {
740         return stasis_message_create(stasis_cache_clear_type(), id_message);
741 }
742
743 static void stasis_cache_update_dtor(void *obj)
744 {
745         struct stasis_cache_update *update = obj;
746
747         ao2_cleanup(update->old_snapshot);
748         update->old_snapshot = NULL;
749         ao2_cleanup(update->new_snapshot);
750         update->new_snapshot = NULL;
751         ao2_cleanup(update->type);
752         update->type = NULL;
753 }
754
755 static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
756 {
757         struct stasis_cache_update *update;
758         struct stasis_message *msg;
759
760         ast_assert(old_snapshot != NULL || new_snapshot != NULL);
761
762         if (!stasis_cache_update_type()) {
763                 return NULL;
764         }
765
766         update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
767                 AO2_ALLOC_OPT_LOCK_NOLOCK);
768         if (!update) {
769                 return NULL;
770         }
771
772         if (old_snapshot) {
773                 ao2_ref(old_snapshot, +1);
774                 update->old_snapshot = old_snapshot;
775                 if (!new_snapshot) {
776                         ao2_ref(stasis_message_type(old_snapshot), +1);
777                         update->type = stasis_message_type(old_snapshot);
778                 }
779         }
780         if (new_snapshot) {
781                 ao2_ref(new_snapshot, +1);
782                 update->new_snapshot = new_snapshot;
783                 ao2_ref(stasis_message_type(new_snapshot), +1);
784                 update->type = stasis_message_type(new_snapshot);
785         }
786
787         msg = stasis_message_create(stasis_cache_update_type(), update);
788
789         ao2_cleanup(update);
790         return msg;
791 }
792
793 static void caching_topic_exec(void *data, struct stasis_subscription *sub,
794         struct stasis_message *message)
795 {
796         struct stasis_caching_topic *caching_topic_needs_unref;
797         struct stasis_caching_topic *caching_topic = data;
798         struct stasis_message *msg;
799         struct stasis_message *msg_put;
800         struct stasis_message_type *msg_type;
801         const struct ast_eid *msg_eid;
802         const char *msg_id;
803
804         ast_assert(caching_topic != NULL);
805         ast_assert(caching_topic->topic != NULL);
806         ast_assert(caching_topic->cache != NULL);
807         ast_assert(caching_topic->cache->id_fn != NULL);
808
809         if (stasis_subscription_final_message(sub, message)) {
810                 caching_topic_needs_unref = caching_topic;
811         } else {
812                 caching_topic_needs_unref = NULL;
813         }
814
815         msg_type = stasis_message_type(message);
816         if (stasis_cache_clear_type() == msg_type) {
817                 /* Cache clear event. */
818                 msg_put = NULL;
819                 msg = stasis_message_data(message);
820                 msg_type = stasis_message_type(msg);
821         } else {
822                 /* Normal cache update event. */
823                 msg_put = message;
824                 msg = message;
825         }
826         ast_assert(msg_type != NULL);
827
828         msg_eid = stasis_message_eid(msg);/* msg_eid is NULL for aggregate message. */
829         msg_id = caching_topic->cache->id_fn(msg);
830         if (msg_id && msg_eid) {
831                 struct stasis_message *update;
832                 struct cache_put_snapshots snapshots;
833
834                 /* Update the cache */
835                 snapshots = cache_put(caching_topic->cache, msg_type, msg_id, msg_eid, msg_put);
836                 if (snapshots.old || msg_put) {
837                         update = update_create(snapshots.old, msg_put);
838                         if (update) {
839                                 stasis_publish(caching_topic->topic, update);
840                         }
841                         ao2_cleanup(update);
842                 } else {
843                         ast_log(LOG_ERROR,
844                                 "Attempting to remove an item from the %s cache that isn't there: %s %s\n",
845                                 stasis_topic_name(caching_topic->topic),
846                                 stasis_message_type_name(msg_type), msg_id);
847                 }
848
849                 if (snapshots.aggregate_old != snapshots.aggregate_new) {
850                         if (snapshots.aggregate_new && caching_topic->cache->aggregate_publish_fn) {
851                                 caching_topic->cache->aggregate_publish_fn(caching_topic->original_topic,
852                                         snapshots.aggregate_new);
853                         }
854                         update = update_create(snapshots.aggregate_old, snapshots.aggregate_new);
855                         if (update) {
856                                 stasis_publish(caching_topic->topic, update);
857                         }
858                         ao2_cleanup(update);
859                 }
860
861                 ao2_cleanup(snapshots.old);
862                 ao2_cleanup(snapshots.aggregate_old);
863                 ao2_cleanup(snapshots.aggregate_new);
864         }
865
866         ao2_cleanup(caching_topic_needs_unref);
867 }
868
869 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, struct stasis_cache *cache)
870 {
871         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
872         struct stasis_subscription *sub;
873         RAII_VAR(char *, new_name, NULL, ast_free);
874         int ret;
875
876         ret = ast_asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
877         if (ret < 0) {
878                 return NULL;
879         }
880
881         caching_topic = ao2_alloc_options(sizeof(*caching_topic),
882                 stasis_caching_topic_dtor, AO2_ALLOC_OPT_LOCK_NOLOCK);
883         if (caching_topic == NULL) {
884                 return NULL;
885         }
886
887         caching_topic->topic = stasis_topic_create(new_name);
888         if (caching_topic->topic == NULL) {
889                 return NULL;
890         }
891
892         ao2_ref(cache, +1);
893         caching_topic->cache = cache;
894
895         sub = internal_stasis_subscribe(original_topic, caching_topic_exec, caching_topic, 0, 0);
896         if (sub == NULL) {
897                 return NULL;
898         }
899
900         ao2_ref(original_topic, +1);
901         caching_topic->original_topic = original_topic;
902
903         /* This is for the reference contained in the subscription above */
904         ao2_ref(caching_topic, +1);
905         caching_topic->sub = sub;
906
907         /* The subscription holds the reference, so no additional ref bump. */
908         return caching_topic;
909 }
910
911 static void stasis_cache_cleanup(void)
912 {
913         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
914         STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
915 }
916
917 int stasis_cache_init(void)
918 {
919         ast_register_cleanup(stasis_cache_cleanup);
920
921         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
922                 return -1;
923         }
924
925         if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {
926                 return -1;
927         }
928
929         return 0;
930 }
931