2 * Asterisk -- An open source telephony toolkit.
4 * Copyright (C) 2013, Digium, Inc.
6 * David M. Lee, II <dlee@digium.com>
8 * See http://www.asterisk.org for more information about
9 * the Asterisk project. Please do not directly contact
10 * any of the maintainers of this project for assistance;
11 * the project provides a web site, mailing lists and IRC
12 * channels for your use.
14 * This program is free software, distributed under the terms of
15 * the GNU General Public License Version 2. See the LICENSE file
16 * at the top of the source tree.
21 * \brief Stasis Message API.
23 * \author David M. Lee, II <dlee@digium.com>
27 <support_level>core</support_level>
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
34 #include "asterisk/astobj2.h"
35 #include "asterisk/hashtab.h"
36 #include "asterisk/stasis.h"
37 #include "asterisk/utils.h"
40 #define NUM_CACHE_BUCKETS 17
42 #define NUM_CACHE_BUCKETS 563
46 struct stasis_caching_topic {
47 struct ao2_container *cache;
48 struct stasis_topic *topic;
49 struct stasis_subscription *sub;
50 snapshot_get_id id_fn;
53 static void stasis_caching_topic_dtor(void *obj) {
54 struct stasis_caching_topic *caching_topic = obj;
55 ast_assert(!stasis_subscription_is_subscribed(caching_topic->sub));
56 ast_assert(stasis_subscription_is_done(caching_topic->sub));
57 ao2_cleanup(caching_topic->sub);
58 caching_topic->sub = NULL;
59 ao2_cleanup(caching_topic->cache);
60 caching_topic->cache = NULL;
61 ao2_cleanup(caching_topic->topic);
62 caching_topic->topic = NULL;
65 struct stasis_topic *stasis_caching_get_topic(struct stasis_caching_topic *caching_topic)
67 return caching_topic->topic;
70 struct stasis_caching_topic *stasis_caching_unsubscribe(struct stasis_caching_topic *caching_topic)
73 if (stasis_subscription_is_subscribed(caching_topic->sub)) {
74 /* Increment the reference to hold on to it past the
76 ao2_ref(caching_topic->sub, +1);
77 stasis_unsubscribe(caching_topic->sub);
79 ast_log(LOG_ERROR, "stasis_caching_topic unsubscribed multiple times\n");
85 struct stasis_caching_topic *stasis_caching_unsubscribe_and_join(struct stasis_caching_topic *caching_topic)
91 /* Hold a ref past the unsubscribe */
92 ao2_ref(caching_topic, +1);
93 stasis_caching_unsubscribe(caching_topic);
94 stasis_subscription_join(caching_topic->sub);
95 ao2_cleanup(caching_topic);
100 struct stasis_message_type *type;
102 struct stasis_message *snapshot;
105 static void cache_entry_dtor(void *obj)
107 struct cache_entry *entry = obj;
108 ao2_cleanup(entry->type);
112 ao2_cleanup(entry->snapshot);
113 entry->snapshot = NULL;
116 static struct cache_entry *cache_entry_create(struct stasis_message_type *type, const char *id, struct stasis_message *snapshot)
118 RAII_VAR(struct cache_entry *, entry, NULL, ao2_cleanup);
120 ast_assert(type != NULL);
121 ast_assert(id != NULL);
123 entry = ao2_alloc(sizeof(*entry), cache_entry_dtor);
128 entry->id = ast_strdup(id);
135 if (snapshot != NULL) {
136 ao2_ref(snapshot, +1);
137 entry->snapshot = snapshot;
144 static int cache_entry_hash(const void *obj, int flags)
146 const struct cache_entry *entry = obj;
149 ast_assert(!(flags & OBJ_KEY));
151 hash += ast_hashtab_hash_string(stasis_message_type_name(entry->type));
152 hash += ast_hashtab_hash_string(entry->id);
156 static int cache_entry_cmp(void *obj, void *arg, int flags)
158 const struct cache_entry *left = obj;
159 const struct cache_entry *right = arg;
161 ast_assert(!(flags & OBJ_KEY));
163 if (left->type == right->type && strcmp(left->id, right->id) == 0) {
164 return CMP_MATCH | CMP_STOP;
170 static struct stasis_message *cache_put(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id, struct stasis_message *new_snapshot)
172 RAII_VAR(struct cache_entry *, new_entry, NULL, ao2_cleanup);
173 RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
174 struct stasis_message *old_snapshot = NULL;
176 ast_assert(caching_topic->cache != NULL);
178 new_entry = cache_entry_create(type, id, new_snapshot);
180 if (new_snapshot == NULL) {
181 /* Remove entry from cache */
182 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_UNLINK);
184 old_snapshot = cached_entry->snapshot;
185 cached_entry->snapshot = NULL;
188 /* Insert/update cache */
189 SCOPED_AO2LOCK(lock, caching_topic->cache);
191 cached_entry = ao2_find(caching_topic->cache, new_entry, OBJ_POINTER | OBJ_NOLOCK);
193 /* Update cache. Because objects are moving, no need to update refcounts. */
194 old_snapshot = cached_entry->snapshot;
195 cached_entry->snapshot = new_entry->snapshot;
196 new_entry->snapshot = NULL;
198 /* Insert into the cache */
199 ao2_link_flags(caching_topic->cache, new_entry, OBJ_NOLOCK);
207 struct stasis_message *stasis_cache_get(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type, const char *id)
209 RAII_VAR(struct cache_entry *, search_entry, NULL, ao2_cleanup);
210 RAII_VAR(struct cache_entry *, cached_entry, NULL, ao2_cleanup);
212 ast_assert(caching_topic->cache != NULL);
214 search_entry = cache_entry_create(type, id, NULL);
215 if (search_entry == NULL) {
219 cached_entry = ao2_find(caching_topic->cache, search_entry, OBJ_POINTER);
220 if (cached_entry == NULL) {
224 ast_assert(cached_entry->snapshot != NULL);
225 ao2_ref(cached_entry->snapshot, +1);
226 return cached_entry->snapshot;
229 struct cache_dump_data {
230 struct ao2_container *cached;
231 struct stasis_message_type *type;
234 static int cache_dump_cb(void *obj, void *arg, int flags)
236 struct cache_dump_data *cache_dump = arg;
237 struct cache_entry *entry = obj;
239 if (!cache_dump->type || entry->type == cache_dump->type) {
240 ao2_link(cache_dump->cached, entry->snapshot);
246 struct ao2_container *stasis_cache_dump(struct stasis_caching_topic *caching_topic, struct stasis_message_type *type)
248 struct cache_dump_data cache_dump;
250 ast_assert(caching_topic->cache != NULL);
252 cache_dump.type = type;
253 cache_dump.cached = ao2_container_alloc(1, NULL, NULL);
254 if (!cache_dump.cached) {
258 ao2_callback(caching_topic->cache, OBJ_MULTIPLE | OBJ_NODATA, cache_dump_cb, &cache_dump);
259 return cache_dump.cached;
262 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_clear_type);
263 STASIS_MESSAGE_TYPE_DEFN(stasis_cache_update_type);
265 static void cache_clear_dtor(void *obj)
267 struct stasis_cache_clear *ev = obj;
268 ao2_cleanup(ev->type);
272 struct stasis_message *stasis_cache_clear_create(struct stasis_message_type *type, const char *id)
274 RAII_VAR(struct stasis_cache_clear *, ev, NULL, ao2_cleanup);
275 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
277 ev = ao2_alloc(sizeof(*ev) + strlen(id) + 1, cache_clear_dtor);
287 msg = stasis_message_create(stasis_cache_clear_type(), ev);
297 static void stasis_cache_update_dtor(void *obj)
299 struct stasis_cache_update *update = obj;
300 ao2_cleanup(update->topic);
301 update->topic = NULL;
302 ao2_cleanup(update->old_snapshot);
303 update->old_snapshot = NULL;
304 ao2_cleanup(update->new_snapshot);
305 update->new_snapshot = NULL;
306 ao2_cleanup(update->type);
310 static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
312 RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
313 RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
315 ast_assert(topic != NULL);
316 ast_assert(old_snapshot != NULL || new_snapshot != NULL);
318 update = ao2_alloc(sizeof(*update), stasis_cache_update_dtor);
324 update->topic = topic;
326 ao2_ref(old_snapshot, +1);
327 update->old_snapshot = old_snapshot;
329 ao2_ref(stasis_message_type(old_snapshot), +1);
330 update->type = stasis_message_type(old_snapshot);
334 ao2_ref(new_snapshot, +1);
335 update->new_snapshot = new_snapshot;
336 ao2_ref(stasis_message_type(new_snapshot), +1);
337 update->type = stasis_message_type(new_snapshot);
340 msg = stasis_message_create(stasis_cache_update_type(), update);
349 static void caching_topic_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
351 RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
352 struct stasis_caching_topic *caching_topic = data;
353 const char *id = NULL;
355 ast_assert(caching_topic->topic != NULL);
356 ast_assert(caching_topic->id_fn != NULL);
358 if (stasis_subscription_final_message(sub, message)) {
359 caching_topic_needs_unref = caching_topic;
362 /* Handle cache clear event */
363 if (stasis_cache_clear_type() == stasis_message_type(message)) {
364 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
365 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
366 struct stasis_cache_clear *clear = stasis_message_data(message);
367 ast_assert(clear->type != NULL);
368 ast_assert(clear->id != NULL);
369 old_snapshot = cache_put(caching_topic, clear->type, clear->id, NULL);
371 update = update_create(topic, old_snapshot, NULL);
372 stasis_publish(caching_topic->topic, update);
374 /* While this could be a problem, it's very likely to
375 * happen with message forwarding */
377 "Attempting to remove an item from the cache that isn't there: %s %s\n",
378 stasis_message_type_name(clear->type), clear->id);
383 id = caching_topic->id_fn(message);
385 /* Object isn't cached; forward */
386 stasis_forward_message(caching_topic->topic, topic, message);
388 /* Update the cache */
389 RAII_VAR(struct stasis_message *, old_snapshot, NULL, ao2_cleanup);
390 RAII_VAR(struct stasis_message *, update, NULL, ao2_cleanup);
392 old_snapshot = cache_put(caching_topic, stasis_message_type(message), id, message);
394 update = update_create(topic, old_snapshot, message);
395 if (update == NULL) {
399 stasis_publish(caching_topic->topic, update);
402 if (stasis_subscription_final_message(sub, message)) {
403 ao2_cleanup(caching_topic);
407 struct stasis_caching_topic *stasis_caching_topic_create(struct stasis_topic *original_topic, snapshot_get_id id_fn)
409 RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, ao2_cleanup);
410 struct stasis_subscription *sub;
411 RAII_VAR(char *, new_name, NULL, free);
414 ret = asprintf(&new_name, "%s-cached", stasis_topic_name(original_topic));
419 caching_topic = ao2_alloc(sizeof(*caching_topic), stasis_caching_topic_dtor);
420 if (caching_topic == NULL) {
424 caching_topic->cache = ao2_container_alloc(NUM_CACHE_BUCKETS, cache_entry_hash, cache_entry_cmp);
425 if (!caching_topic->cache) {
426 ast_log(LOG_ERROR, "Stasis cache allocation failed\n");
430 caching_topic->topic = stasis_topic_create(new_name);
431 if (caching_topic->topic == NULL) {
435 caching_topic->id_fn = id_fn;
437 sub = stasis_subscribe(original_topic, caching_topic_exec, caching_topic);
441 /* This is for the reference contained in the subscription above */
442 ao2_ref(caching_topic, +1);
443 caching_topic->sub = sub;
445 ao2_ref(caching_topic, +1);
446 return caching_topic;
449 static void stasis_cache_exit(void)
451 STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_clear_type);
452 STASIS_MESSAGE_TYPE_CLEANUP(stasis_cache_update_type);
455 int stasis_cache_init(void)
457 ast_register_atexit(stasis_cache_exit);
459 if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_clear_type) != 0) {
463 if (STASIS_MESSAGE_TYPE_INIT(stasis_cache_update_type) != 0) {