ao2_iterator: Mini-audit of the ao2_iterator loops in the new code files.
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, NULL == stasis_message_type_create(NULL, NULL));
88         uut = stasis_message_type_create("SomeMessage", NULL);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102
103         switch (cmd) {
104         case TEST_INIT:
105                 info->name = __func__;
106                 info->category = test_category;
107                 info->summary = "Test basic message functions";
108                 info->description = "Test basic message functions";
109                 return AST_TEST_NOT_RUN;
110         case TEST_EXECUTE:
111                 break;
112         }
113
114
115         type = stasis_message_type_create("SomeMessage", NULL);
116
117         ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
118         ast_test_validate(test, NULL == stasis_message_create(type, NULL));
119
120         data = ao2_alloc(strlen(expected) + 1, NULL);
121         strcpy(data, expected);
122         expected_timestamp = ast_tvnow();
123         uut = stasis_message_create(type, data);
124
125         ast_test_validate(test, NULL != uut);
126         ast_test_validate(test, type == stasis_message_type(uut));
127         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
128         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
129
130         time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
131         /* 10ms is certainly long enough for the two calls to complete */
132         ast_test_validate(test, time_diff.tv_sec == 0);
133         ast_test_validate(test, time_diff.tv_usec < 10000);
134
135         ao2_ref(uut, -1);
136         uut = NULL;
137         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
138
139         return AST_TEST_PASS;
140 }
141
142 struct consumer {
143         ast_mutex_t lock;
144         ast_cond_t out;
145         struct stasis_message **messages_rxed;
146         size_t messages_rxed_len;
147         int ignore_subscriptions;
148         int complete;
149 };
150
151 static void consumer_dtor(void *obj) {
152         struct consumer *consumer = obj;
153
154         ast_mutex_destroy(&consumer->lock);
155         ast_cond_destroy(&consumer->out);
156
157         while (consumer->messages_rxed_len > 0) {
158                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
159         }
160         ast_free(consumer->messages_rxed);
161         consumer->messages_rxed = NULL;
162 }
163
164 static struct consumer *consumer_create(int ignore_subscriptions) {
165         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
166
167         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
168
169         if (!consumer) {
170                 return NULL;
171         }
172
173         consumer->ignore_subscriptions = ignore_subscriptions;
174         consumer->messages_rxed = ast_malloc(0);
175         if (!consumer->messages_rxed) {
176                 return NULL;
177         }
178
179         ast_mutex_init(&consumer->lock);
180         ast_cond_init(&consumer->out, NULL);
181
182         ao2_ref(consumer, +1);
183         return consumer;
184 }
185
186 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
187 {
188         struct consumer *consumer = data;
189         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
190         SCOPED_MUTEX(lock, &consumer->lock);
191
192         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
193
194                 ++consumer->messages_rxed_len;
195                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
196                 ast_assert(consumer->messages_rxed != NULL);
197                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
198                 ao2_ref(message, +1);
199         }
200
201         if (stasis_subscription_final_message(sub, message)) {
202                 consumer->complete = 1;
203                 consumer_needs_cleanup = consumer;
204         }
205
206         ast_cond_signal(&consumer->out);
207 }
208
209 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
210 {
211         struct timeval start = ast_tvnow();
212         struct timespec end = {
213                 .tv_sec = start.tv_sec + 30,
214                 .tv_nsec = start.tv_usec * 1000
215         };
216
217         SCOPED_MUTEX(lock, &consumer->lock);
218
219         while (consumer->messages_rxed_len < expected_len) {
220                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
221                 if (r == ETIMEDOUT) {
222                         break;
223                 }
224                 ast_assert(r == 0); /* Not expecting any othet types of errors */
225         }
226         return consumer->messages_rxed_len;
227 }
228
229 static int consumer_wait_for_completion(struct consumer *consumer)
230 {
231         struct timeval start = ast_tvnow();
232         struct timespec end = {
233                 .tv_sec = start.tv_sec + 3,
234                 .tv_nsec = start.tv_usec * 1000
235         };
236
237         SCOPED_MUTEX(lock, &consumer->lock);
238
239         while (!consumer->complete) {
240                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
241                 if (r == ETIMEDOUT) {
242                         break;
243                 }
244                 ast_assert(r == 0); /* Not expecting any othet types of errors */
245         }
246         return consumer->complete;
247 }
248
249 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
250 {
251         struct timeval start = ast_tvnow();
252         struct timeval diff = {
253                 .tv_sec = 0,
254                 .tv_usec = 100000 /* wait for 100ms */
255         };
256         struct timeval end_tv = ast_tvadd(start, diff);
257         struct timespec end = {
258                 .tv_sec = end_tv.tv_sec,
259                 .tv_nsec = end_tv.tv_usec * 1000
260         };
261
262         SCOPED_MUTEX(lock, &consumer->lock);
263
264         while (consumer->messages_rxed_len == expected_len) {
265                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
266                 if (r == ETIMEDOUT) {
267                         break;
268                 }
269                 ast_assert(r == 0); /* Not expecting any othet types of errors */
270         }
271         return consumer->messages_rxed_len;
272 }
273
274 AST_TEST_DEFINE(subscription_messages)
275 {
276         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
277         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
278         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
279         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
280         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
281         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
282         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
283         int complete;
284         struct stasis_subscription_change *change;
285
286         switch (cmd) {
287         case TEST_INIT:
288                 info->name = __func__;
289                 info->category = test_category;
290                 info->summary = "Test subscribe/unsubscribe messages";
291                 info->description = "Test subscribe/unsubscribe messages";
292                 return AST_TEST_NOT_RUN;
293         case TEST_EXECUTE:
294                 break;
295         }
296
297         topic = stasis_topic_create("TestTopic");
298         ast_test_validate(test, NULL != topic);
299
300         consumer = consumer_create(0);
301         ast_test_validate(test, NULL != consumer);
302
303         uut = stasis_subscribe(topic, consumer_exec, consumer);
304         ast_test_validate(test, NULL != uut);
305         ao2_ref(consumer, +1);
306         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
307
308         uut = stasis_unsubscribe(uut);
309         complete = consumer_wait_for_completion(consumer);
310         ast_test_validate(test, 1 == complete);
311
312         ast_test_validate(test, 2 == consumer->messages_rxed_len);
313         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
314         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
315
316         change = stasis_message_data(consumer->messages_rxed[0]);
317         ast_test_validate(test, topic == change->topic);
318         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
319         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
320
321         change = stasis_message_data(consumer->messages_rxed[1]);
322         ast_test_validate(test, topic == change->topic);
323         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
324         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
325
326         return AST_TEST_PASS;
327 }
328
329 AST_TEST_DEFINE(publish)
330 {
331         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
332         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
333         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
334         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
335         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
336         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
337         int actual_len;
338         const char *actual;
339
340         switch (cmd) {
341         case TEST_INIT:
342                 info->name = __func__;
343                 info->category = test_category;
344                 info->summary = "Test simple subscriptions";
345                 info->description = "Test simple subscriptions";
346                 return AST_TEST_NOT_RUN;
347         case TEST_EXECUTE:
348                 break;
349         }
350
351         topic = stasis_topic_create("TestTopic");
352         ast_test_validate(test, NULL != topic);
353
354         consumer = consumer_create(1);
355         ast_test_validate(test, NULL != consumer);
356
357         uut = stasis_subscribe(topic, consumer_exec, consumer);
358         ast_test_validate(test, NULL != uut);
359         ao2_ref(consumer, +1);
360
361         test_data = ao2_alloc(1, NULL);
362         ast_test_validate(test, NULL != test_data);
363         test_message_type = stasis_message_type_create("TestMessage", NULL);
364         test_message = stasis_message_create(test_message_type, test_data);
365
366         stasis_publish(topic, test_message);
367
368         actual_len = consumer_wait_for(consumer, 1);
369         ast_test_validate(test, 1 == actual_len);
370         actual = stasis_message_data(consumer->messages_rxed[0]);
371         ast_test_validate(test, test_data == actual);
372
373         return AST_TEST_PASS;
374 }
375
376 AST_TEST_DEFINE(unsubscribe_stops_messages)
377 {
378         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
379         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
380         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
381         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
382         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
383         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
384         int actual_len;
385
386         switch (cmd) {
387         case TEST_INIT:
388                 info->name = __func__;
389                 info->category = test_category;
390                 info->summary = "Test simple subscriptions";
391                 info->description = "Test simple subscriptions";
392                 return AST_TEST_NOT_RUN;
393         case TEST_EXECUTE:
394                 break;
395         }
396
397         topic = stasis_topic_create("TestTopic");
398         ast_test_validate(test, NULL != topic);
399
400         consumer = consumer_create(1);
401         ast_test_validate(test, NULL != consumer);
402
403         uut = stasis_subscribe(topic, consumer_exec, consumer);
404         ast_test_validate(test, NULL != uut);
405         ao2_ref(consumer, +1);
406
407         uut = stasis_unsubscribe(uut);
408
409         test_data = ao2_alloc(1, NULL);
410         ast_test_validate(test, NULL != test_data);
411         test_message_type = stasis_message_type_create("TestMessage", NULL);
412         test_message = stasis_message_create(test_message_type, test_data);
413
414         stasis_publish(topic, test_message);
415
416         actual_len = consumer_should_stay(consumer, 0);
417         ast_test_validate(test, 0 == actual_len);
418
419         return AST_TEST_PASS;
420 }
421
422 AST_TEST_DEFINE(forward)
423 {
424         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
425         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
426
427         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
428         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
429
430         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
431         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
432         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
433
434         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
435         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
436         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
437         int actual_len;
438
439         switch (cmd) {
440         case TEST_INIT:
441                 info->name = __func__;
442                 info->category = test_category;
443                 info->summary = "Test sending events to a parent topic";
444                 info->description = "Test sending events to a parent topic.\n"
445                         "This test creates three topics (one parent, two children)\n"
446                         "and publishes a message to one child, and verifies it's\n"
447                         "only seen by that child and the parent";
448                 return AST_TEST_NOT_RUN;
449         case TEST_EXECUTE:
450                 break;
451         }
452
453         parent_topic = stasis_topic_create("ParentTestTopic");
454         ast_test_validate(test, NULL != parent_topic);
455         topic = stasis_topic_create("TestTopic");
456         ast_test_validate(test, NULL != topic);
457
458         forward_sub = stasis_forward_all(topic, parent_topic);
459         ast_test_validate(test, NULL != forward_sub);
460
461         parent_consumer = consumer_create(1);
462         ast_test_validate(test, NULL != parent_consumer);
463         consumer = consumer_create(1);
464         ast_test_validate(test, NULL != consumer);
465
466         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
467         ast_test_validate(test, NULL != parent_sub);
468         ao2_ref(parent_consumer, +1);
469         sub = stasis_subscribe(topic, consumer_exec, consumer);
470         ast_test_validate(test, NULL != sub);
471         ao2_ref(consumer, +1);
472
473         test_data = ao2_alloc(1, NULL);
474         ast_test_validate(test, NULL != test_data);
475         test_message_type = stasis_message_type_create("TestMessage", NULL);
476         test_message = stasis_message_create(test_message_type, test_data);
477
478         stasis_publish(topic, test_message);
479
480         actual_len = consumer_wait_for(consumer, 1);
481         ast_test_validate(test, 1 == actual_len);
482         actual_len = consumer_wait_for(parent_consumer, 1);
483         ast_test_validate(test, 1 == actual_len);
484
485         return AST_TEST_PASS;
486 }
487
488 AST_TEST_DEFINE(interleaving)
489 {
490         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
491         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
492         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
493
494         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
495
496         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
497
498         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
499         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
500         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
501
502         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
503         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
504         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
505
506         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
507
508         int actual_len;
509
510         switch (cmd) {
511         case TEST_INIT:
512                 info->name = __func__;
513                 info->category = test_category;
514                 info->summary = "Test sending interleaved events to a parent topic";
515                 info->description = "Test sending events to a parent topic.\n"
516                         "This test creates three topics (one parent, two children)\n"
517                         "and publishes messages alternately between the children.\n"
518                         "It verifies that the messages are received in the expected\n"
519                         "order.";
520                 return AST_TEST_NOT_RUN;
521         case TEST_EXECUTE:
522                 break;
523         }
524
525         test_message_type = stasis_message_type_create("test", NULL);
526         ast_test_validate(test, NULL != test_message_type);
527
528         test_data = ao2_alloc(1, NULL);
529         ast_test_validate(test, NULL != test_data);
530
531         test_message1 = stasis_message_create(test_message_type, test_data);
532         ast_test_validate(test, NULL != test_message1);
533         test_message2 = stasis_message_create(test_message_type, test_data);
534         ast_test_validate(test, NULL != test_message2);
535         test_message3 = stasis_message_create(test_message_type, test_data);
536         ast_test_validate(test, NULL != test_message3);
537
538         parent_topic = stasis_topic_create("ParentTestTopic");
539         ast_test_validate(test, NULL != parent_topic);
540         topic1 = stasis_topic_create("Topic1");
541         ast_test_validate(test, NULL != topic1);
542         topic2 = stasis_topic_create("Topic2");
543         ast_test_validate(test, NULL != topic2);
544
545         forward_sub1 = stasis_forward_all(topic1, parent_topic);
546         ast_test_validate(test, NULL != forward_sub1);
547         forward_sub2 = stasis_forward_all(topic2, parent_topic);
548         ast_test_validate(test, NULL != forward_sub2);
549
550         consumer = consumer_create(1);
551         ast_test_validate(test, NULL != consumer);
552
553         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
554         ast_test_validate(test, NULL != sub);
555         ao2_ref(consumer, +1);
556
557         stasis_publish(topic1, test_message1);
558         stasis_publish(topic2, test_message2);
559         stasis_publish(topic1, test_message3);
560
561         actual_len = consumer_wait_for(consumer, 3);
562         ast_test_validate(test, 3 == actual_len);
563
564         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
565         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
566         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
567
568         return AST_TEST_PASS;
569 }
570
571 struct cache_test_data {
572         char *id;
573         char *value;
574 };
575
576 static void cache_test_data_dtor(void *obj)
577 {
578         struct cache_test_data *data = obj;
579         ast_free(data->id);
580         ast_free(data->value);
581 }
582
583 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
584 {
585         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
586
587         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
588         if (data == NULL) {
589                 return NULL;
590         }
591
592         ast_assert(name != NULL);
593         ast_assert(value != NULL);
594
595         data->id = ast_strdup(name);
596         data->value = ast_strdup(value);
597         if (!data->id || !data->value) {
598                 return NULL;
599         }
600
601         return stasis_message_create(type, data);
602 }
603
604 static const char *cache_test_data_id(struct stasis_message *message) {
605         struct cache_test_data *cachable = stasis_message_data(message);
606
607         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
608                 return NULL;
609         }
610         return cachable->id;
611 }
612
613 AST_TEST_DEFINE(cache_filter)
614 {
615         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
616         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
617         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
618         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
619         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
620         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
621         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
622         int actual_len;
623
624         switch (cmd) {
625         case TEST_INIT:
626                 info->name = __func__;
627                 info->category = test_category;
628                 info->summary = "Test caching topics only forward cache_update messages.";
629                 info->description = "Test caching topics only forward cache_update messages.";
630                 return AST_TEST_NOT_RUN;
631         case TEST_EXECUTE:
632                 break;
633         }
634
635         non_cache_type = stasis_message_type_create("NonCacheable", NULL);
636         ast_test_validate(test, NULL != non_cache_type);
637         topic = stasis_topic_create("SomeTopic");
638         ast_test_validate(test, NULL != topic);
639         cache = stasis_cache_create(cache_test_data_id);
640         ast_test_validate(test, NULL != cache);
641         caching_topic = stasis_caching_topic_create(topic, cache);
642         ast_test_validate(test, NULL != caching_topic);
643         consumer = consumer_create(1);
644         ast_test_validate(test, NULL != consumer);
645         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
646         ast_test_validate(test, NULL != sub);
647         ao2_ref(consumer, +1);
648
649         test_message = cache_test_message_create(non_cache_type, "1", "1");
650         ast_test_validate(test, NULL != test_message);
651
652         stasis_publish(topic, test_message);
653
654         actual_len = consumer_should_stay(consumer, 0);
655         ast_test_validate(test, 0 == actual_len);
656
657         return AST_TEST_PASS;
658 }
659
660 AST_TEST_DEFINE(cache)
661 {
662         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
663         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
664         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
665         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
666         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
667         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
668         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
669         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
670         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
671         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
672         int actual_len;
673         struct stasis_cache_update *actual_update;
674
675         switch (cmd) {
676         case TEST_INIT:
677                 info->name = __func__;
678                 info->category = test_category;
679                 info->summary = "Test passing messages through cache topic unscathed.";
680                 info->description = "Test passing messages through cache topic unscathed.";
681                 return AST_TEST_NOT_RUN;
682         case TEST_EXECUTE:
683                 break;
684         }
685
686         cache_type = stasis_message_type_create("Cacheable", NULL);
687         ast_test_validate(test, NULL != cache_type);
688         topic = stasis_topic_create("SomeTopic");
689         ast_test_validate(test, NULL != topic);
690         cache = stasis_cache_create(cache_test_data_id);
691         ast_test_validate(test, NULL != cache);
692         caching_topic = stasis_caching_topic_create(topic, cache);
693         ast_test_validate(test, NULL != caching_topic);
694         consumer = consumer_create(1);
695         ast_test_validate(test, NULL != consumer);
696         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
697         ast_test_validate(test, NULL != sub);
698         ao2_ref(consumer, +1);
699
700         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
701         ast_test_validate(test, NULL != test_message1_1);
702         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
703         ast_test_validate(test, NULL != test_message2_1);
704
705         /* Post a couple of snapshots */
706         stasis_publish(topic, test_message1_1);
707         stasis_publish(topic, test_message2_1);
708         actual_len = consumer_wait_for(consumer, 2);
709         ast_test_validate(test, 2 == actual_len);
710
711         /* Check for new snapshot messages */
712         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
713         actual_update = stasis_message_data(consumer->messages_rxed[0]);
714         ast_test_validate(test, NULL == actual_update->old_snapshot);
715         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
716         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
717         /* stasis_cache_get returned a ref, so unref test_message1_1 */
718         ao2_ref(test_message1_1, -1);
719
720         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
721         actual_update = stasis_message_data(consumer->messages_rxed[1]);
722         ast_test_validate(test, NULL == actual_update->old_snapshot);
723         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
724         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
725         /* stasis_cache_get returned a ref, so unref test_message2_1 */
726         ao2_ref(test_message2_1, -1);
727
728         /* Update snapshot 2 */
729         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
730         ast_test_validate(test, NULL != test_message2_2);
731         stasis_publish(topic, test_message2_2);
732
733         actual_len = consumer_wait_for(consumer, 3);
734         ast_test_validate(test, 3 == actual_len);
735
736         actual_update = stasis_message_data(consumer->messages_rxed[2]);
737         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
738         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
739         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
740         /* stasis_cache_get returned a ref, so unref test_message2_2 */
741         ao2_ref(test_message2_2, -1);
742
743         /* Clear snapshot 1 */
744         test_message1_clear = stasis_cache_clear_create(test_message1_1);
745         ast_test_validate(test, NULL != test_message1_clear);
746         stasis_publish(topic, test_message1_clear);
747
748         actual_len = consumer_wait_for(consumer, 4);
749         ast_test_validate(test, 4 == actual_len);
750
751         actual_update = stasis_message_data(consumer->messages_rxed[3]);
752         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
753         ast_test_validate(test, NULL == actual_update->new_snapshot);
754         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
755
756         return AST_TEST_PASS;
757 }
758
759 AST_TEST_DEFINE(cache_dump)
760 {
761         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
762         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
763         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
764         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
765         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
766         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
767         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
768         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
769         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
770         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
771         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
772         int actual_len;
773         struct ao2_iterator i;
774         void *obj;
775
776         switch (cmd) {
777         case TEST_INIT:
778                 info->name = __func__;
779                 info->category = test_category;
780                 info->summary = "Test passing messages through cache topic unscathed.";
781                 info->description = "Test passing messages through cache topic unscathed.";
782                 return AST_TEST_NOT_RUN;
783         case TEST_EXECUTE:
784                 break;
785         }
786
787         cache_type = stasis_message_type_create("Cacheable", NULL);
788         ast_test_validate(test, NULL != cache_type);
789         topic = stasis_topic_create("SomeTopic");
790         ast_test_validate(test, NULL != topic);
791         cache = stasis_cache_create(cache_test_data_id);
792         ast_test_validate(test, NULL != cache);
793         caching_topic = stasis_caching_topic_create(topic, cache);
794         ast_test_validate(test, NULL != caching_topic);
795         consumer = consumer_create(1);
796         ast_test_validate(test, NULL != consumer);
797         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
798         ast_test_validate(test, NULL != sub);
799         ao2_ref(consumer, +1);
800
801         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
802         ast_test_validate(test, NULL != test_message1_1);
803         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
804         ast_test_validate(test, NULL != test_message2_1);
805
806         /* Post a couple of snapshots */
807         stasis_publish(topic, test_message1_1);
808         stasis_publish(topic, test_message2_1);
809         actual_len = consumer_wait_for(consumer, 2);
810         ast_test_validate(test, 2 == actual_len);
811
812         /* Check the cache */
813         cache_dump = stasis_cache_dump(cache, NULL);
814         ast_test_validate(test, NULL != cache_dump);
815         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
816         i = ao2_iterator_init(cache_dump, 0);
817         while ((obj = ao2_iterator_next(&i))) {
818                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
819                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
820         }
821         ao2_iterator_destroy(&i);
822
823         /* Update snapshot 2 */
824         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
825         ast_test_validate(test, NULL != test_message2_2);
826         stasis_publish(topic, test_message2_2);
827
828         actual_len = consumer_wait_for(consumer, 3);
829         ast_test_validate(test, 3 == actual_len);
830
831         /* Check the cache */
832         cache_dump = stasis_cache_dump(cache, NULL);
833         ast_test_validate(test, NULL != cache_dump);
834         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
835         i = ao2_iterator_init(cache_dump, 0);
836         while ((obj = ao2_iterator_next(&i))) {
837                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
838                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
839         }
840         ao2_iterator_destroy(&i);
841
842         /* Clear snapshot 1 */
843         test_message1_clear = stasis_cache_clear_create(test_message1_1);
844         ast_test_validate(test, NULL != test_message1_clear);
845         stasis_publish(topic, test_message1_clear);
846
847         actual_len = consumer_wait_for(consumer, 4);
848         ast_test_validate(test, 4 == actual_len);
849
850         /* Check the cache */
851         cache_dump = stasis_cache_dump(cache, NULL);
852         ast_test_validate(test, NULL != cache_dump);
853         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
854         i = ao2_iterator_init(cache_dump, 0);
855         while ((obj = ao2_iterator_next(&i))) {
856                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
857                 ast_test_validate(test, actual_cache_entry == test_message2_2);
858         }
859         ao2_iterator_destroy(&i);
860
861         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
862         ao2_cleanup(cache_dump);
863         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
864         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
865
866         return AST_TEST_PASS;
867 }
868
869 AST_TEST_DEFINE(router)
870 {
871         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
872         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
873         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
874         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
875         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
876         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
877         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
878         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
879         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
880         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
881         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
882         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
883         int actual_len, ret;
884         struct stasis_message *actual;
885
886         switch (cmd) {
887         case TEST_INIT:
888                 info->name = __func__;
889                 info->category = test_category;
890                 info->summary = "Test simple message routing";
891                 info->description = "Test simple message routing";
892                 return AST_TEST_NOT_RUN;
893         case TEST_EXECUTE:
894                 break;
895         }
896
897         topic = stasis_topic_create("TestTopic");
898         ast_test_validate(test, NULL != topic);
899
900         consumer1 = consumer_create(1);
901         ast_test_validate(test, NULL != consumer1);
902         consumer2 = consumer_create(1);
903         ast_test_validate(test, NULL != consumer2);
904         consumer3 = consumer_create(1);
905         ast_test_validate(test, NULL != consumer3);
906
907         test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
908         ast_test_validate(test, NULL != test_message_type1);
909         test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
910         ast_test_validate(test, NULL != test_message_type2);
911         test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
912         ast_test_validate(test, NULL != test_message_type3);
913
914         uut = stasis_message_router_create(topic);
915         ast_test_validate(test, NULL != uut);
916
917         ret = stasis_message_router_add(
918                 uut, test_message_type1, consumer_exec, consumer1);
919         ast_test_validate(test, 0 == ret);
920         ao2_ref(consumer1, +1);
921         ret = stasis_message_router_add(
922                 uut, test_message_type2, consumer_exec, consumer2);
923         ast_test_validate(test, 0 == ret);
924         ao2_ref(consumer2, +1);
925         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
926         ast_test_validate(test, 0 == ret);
927         ao2_ref(consumer3, +1);
928
929         test_data = ao2_alloc(1, NULL);
930         ast_test_validate(test, NULL != test_data);
931         test_message1 = stasis_message_create(test_message_type1, test_data);
932         ast_test_validate(test, NULL != test_message1);
933         test_message2 = stasis_message_create(test_message_type2, test_data);
934         ast_test_validate(test, NULL != test_message2);
935         test_message3 = stasis_message_create(test_message_type3, test_data);
936         ast_test_validate(test, NULL != test_message3);
937
938         stasis_publish(topic, test_message1);
939         stasis_publish(topic, test_message2);
940         stasis_publish(topic, test_message3);
941
942         actual_len = consumer_wait_for(consumer1, 1);
943         ast_test_validate(test, 1 == actual_len);
944         actual_len = consumer_wait_for(consumer2, 1);
945         ast_test_validate(test, 1 == actual_len);
946         actual_len = consumer_wait_for(consumer3, 1);
947         ast_test_validate(test, 1 == actual_len);
948
949         actual = consumer1->messages_rxed[0];
950         ast_test_validate(test, test_message1 == actual);
951
952         actual = consumer2->messages_rxed[0];
953         ast_test_validate(test, test_message2 == actual);
954
955         actual = consumer3->messages_rxed[0];
956         ast_test_validate(test, test_message3 == actual);
957
958         /* consumer1 and consumer2 do not get the final message. */
959         ao2_cleanup(consumer1);
960         ao2_cleanup(consumer2);
961
962         return AST_TEST_PASS;
963 }
964
965 static const char *cache_simple(struct stasis_message *message) {
966         const char *type_name =
967                 stasis_message_type_name(stasis_message_type(message));
968         if (!ast_begins_with(type_name, "Cache")) {
969                 return NULL;
970         }
971
972         return "cached";
973 }
974
975 AST_TEST_DEFINE(router_cache_updates)
976 {
977         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
978         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
979         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
980         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
981         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
982         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
983         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
984         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
985         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
986         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
987         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
988         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
989         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
990         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
991         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
992         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
993         struct stasis_cache_update *update;
994         int actual_len, ret;
995         struct stasis_message *actual;
996
997         switch (cmd) {
998         case TEST_INIT:
999                 info->name = __func__;
1000                 info->category = test_category;
1001                 info->summary = "Test special handling cache_update messages";
1002                 info->description = "Test special handling cache_update messages";
1003                 return AST_TEST_NOT_RUN;
1004         case TEST_EXECUTE:
1005                 break;
1006         }
1007
1008         topic = stasis_topic_create("TestTopic");
1009         ast_test_validate(test, NULL != topic);
1010
1011         cache = stasis_cache_create(cache_simple);
1012         ast_test_validate(test, NULL != cache);
1013         caching_topic = stasis_caching_topic_create(topic, cache);
1014         ast_test_validate(test, NULL != caching_topic);
1015
1016         consumer1 = consumer_create(1);
1017         ast_test_validate(test, NULL != consumer1);
1018         consumer2 = consumer_create(1);
1019         ast_test_validate(test, NULL != consumer2);
1020         consumer3 = consumer_create(1);
1021         ast_test_validate(test, NULL != consumer3);
1022
1023         test_message_type1 = stasis_message_type_create("Cache1", NULL);
1024         ast_test_validate(test, NULL != test_message_type1);
1025         test_message_type2 = stasis_message_type_create("Cache2", NULL);
1026         ast_test_validate(test, NULL != test_message_type2);
1027         test_message_type3 = stasis_message_type_create("NonCache", NULL);
1028         ast_test_validate(test, NULL != test_message_type3);
1029
1030         uut = stasis_message_router_create(
1031                 stasis_caching_get_topic(caching_topic));
1032         ast_test_validate(test, NULL != uut);
1033
1034         ret = stasis_message_router_add_cache_update(
1035                 uut, test_message_type1, consumer_exec, consumer1);
1036         ast_test_validate(test, 0 == ret);
1037         ao2_ref(consumer1, +1);
1038         ret = stasis_message_router_add(
1039                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1040         ast_test_validate(test, 0 == ret);
1041         ao2_ref(consumer2, +1);
1042         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1043         ast_test_validate(test, 0 == ret);
1044         ao2_ref(consumer3, +1);
1045
1046         test_data = ao2_alloc(1, NULL);
1047         ast_test_validate(test, NULL != test_data);
1048         test_message1 = stasis_message_create(test_message_type1, test_data);
1049         ast_test_validate(test, NULL != test_message1);
1050         test_message2 = stasis_message_create(test_message_type2, test_data);
1051         ast_test_validate(test, NULL != test_message2);
1052         test_message3 = stasis_message_create(test_message_type3, test_data);
1053         ast_test_validate(test, NULL != test_message3);
1054
1055         stasis_publish(topic, test_message1);
1056         stasis_publish(topic, test_message2);
1057         stasis_publish(topic, test_message3);
1058
1059         actual_len = consumer_wait_for(consumer1, 1);
1060         ast_test_validate(test, 1 == actual_len);
1061         actual_len = consumer_wait_for(consumer2, 1);
1062         ast_test_validate(test, 1 == actual_len);
1063         /* Uncacheable message should not be passed through */
1064         actual_len = consumer_should_stay(consumer3, 0);
1065         ast_test_validate(test, 0 == actual_len);
1066
1067         actual = consumer1->messages_rxed[0];
1068         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1069         update = stasis_message_data(actual);
1070         ast_test_validate(test, test_message_type1 == update->type);
1071         ast_test_validate(test, test_message1 == update->new_snapshot);
1072
1073         actual = consumer2->messages_rxed[0];
1074         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1075         update = stasis_message_data(actual);
1076         ast_test_validate(test, test_message_type2 == update->type);
1077         ast_test_validate(test, test_message2 == update->new_snapshot);
1078
1079         /* consumer1 and consumer2 do not get the final message. */
1080         ao2_cleanup(consumer1);
1081         ao2_cleanup(consumer2);
1082
1083         return AST_TEST_PASS;
1084 }
1085
1086 AST_TEST_DEFINE(no_to_json)
1087 {
1088         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1089         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1090         RAII_VAR(char *, data, NULL, ao2_cleanup);
1091         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1092         char *expected = "SomeData";
1093
1094         switch (cmd) {
1095         case TEST_INIT:
1096                 info->name = __func__;
1097                 info->category = test_category;
1098                 info->summary = "Test message to_json function";
1099                 info->description = "Test message to_json function";
1100                 return AST_TEST_NOT_RUN;
1101         case TEST_EXECUTE:
1102                 break;
1103         }
1104
1105         /* Test NULL */
1106         actual = stasis_message_to_json(NULL, NULL);
1107         ast_test_validate(test, NULL == actual);
1108
1109         /* Test message with NULL to_json function */
1110         type = stasis_message_type_create("SomeMessage", NULL);
1111
1112         data = ao2_alloc(strlen(expected) + 1, NULL);
1113         strcpy(data, expected);
1114         uut = stasis_message_create(type, data);
1115         ast_test_validate(test, NULL != uut);
1116
1117         actual = stasis_message_to_json(uut, NULL);
1118         ast_test_validate(test, NULL == actual);
1119
1120         return AST_TEST_PASS;
1121 }
1122
1123 AST_TEST_DEFINE(to_json)
1124 {
1125         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1126         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1127         RAII_VAR(char *, data, NULL, ao2_cleanup);
1128         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1129         const char *expected_text = "SomeData";
1130         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1131
1132         switch (cmd) {
1133         case TEST_INIT:
1134                 info->name = __func__;
1135                 info->category = test_category;
1136                 info->summary = "Test message to_json function when NULL";
1137                 info->description = "Test message to_json function when NULL";
1138                 return AST_TEST_NOT_RUN;
1139         case TEST_EXECUTE:
1140                 break;
1141         }
1142
1143         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1144
1145         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1146         strcpy(data, expected_text);
1147         uut = stasis_message_create(type, data);
1148         ast_test_validate(test, NULL != uut);
1149
1150         expected = ast_json_string_create(expected_text);
1151         actual = stasis_message_to_json(uut, NULL);
1152         ast_test_validate(test, ast_json_equal(expected, actual));
1153
1154         return AST_TEST_PASS;
1155 }
1156
1157 AST_TEST_DEFINE(no_to_ami)
1158 {
1159         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1160         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1161         RAII_VAR(char *, data, NULL, ao2_cleanup);
1162         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1163         char *expected = "SomeData";
1164
1165         switch (cmd) {
1166         case TEST_INIT:
1167                 info->name = __func__;
1168                 info->category = test_category;
1169                 info->summary = "Test message to_ami function when NULL";
1170                 info->description = "Test message to_ami function when NULL";
1171                 return AST_TEST_NOT_RUN;
1172         case TEST_EXECUTE:
1173                 break;
1174         }
1175
1176         /* Test NULL */
1177         actual = stasis_message_to_ami(NULL);
1178         ast_test_validate(test, NULL == actual);
1179
1180         /* Test message with NULL to_ami function */
1181         type = stasis_message_type_create("SomeMessage", NULL);
1182
1183         data = ao2_alloc(strlen(expected) + 1, NULL);
1184         strcpy(data, expected);
1185         uut = stasis_message_create(type, data);
1186         ast_test_validate(test, NULL != uut);
1187
1188         actual = stasis_message_to_ami(uut);
1189         ast_test_validate(test, NULL == actual);
1190
1191         return AST_TEST_PASS;
1192 }
1193
1194 AST_TEST_DEFINE(to_ami)
1195 {
1196         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1197         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1198         RAII_VAR(char *, data, NULL, ao2_cleanup);
1199         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1200         const char *expected_text = "SomeData";
1201         const char *expected = "Message: SomeData\r\n";
1202
1203         switch (cmd) {
1204         case TEST_INIT:
1205                 info->name = __func__;
1206                 info->category = test_category;
1207                 info->summary = "Test message to_ami function";
1208                 info->description = "Test message to_ami function";
1209                 return AST_TEST_NOT_RUN;
1210         case TEST_EXECUTE:
1211                 break;
1212         }
1213
1214         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1215
1216         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1217         strcpy(data, expected_text);
1218         uut = stasis_message_create(type, data);
1219         ast_test_validate(test, NULL != uut);
1220
1221         actual = stasis_message_to_ami(uut);
1222         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1223
1224         return AST_TEST_PASS;
1225 }
1226
1227 static void noop(void *data, struct stasis_subscription *sub,
1228         struct stasis_message *message)
1229 {
1230         /* no-op */
1231 }
1232
1233 AST_TEST_DEFINE(dtor_order)
1234 {
1235         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1236         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1237
1238         switch (cmd) {
1239         case TEST_INIT:
1240                 info->name = __func__;
1241                 info->category = test_category;
1242                 info->summary = "Test that destruction order doesn't bomb stuff";
1243                 info->description = "Test that destruction order doesn't bomb stuff";
1244                 return AST_TEST_NOT_RUN;
1245         case TEST_EXECUTE:
1246                 break;
1247         }
1248
1249         topic = stasis_topic_create("test-topic");
1250         ast_test_validate(test, NULL != topic);
1251
1252         sub = stasis_subscribe(topic, noop, NULL);
1253         ast_test_validate(test, NULL != sub);
1254
1255         /* With any luck, this won't completely blow everything up */
1256         ao2_cleanup(topic);
1257         stasis_unsubscribe(sub);
1258
1259         /* These refs were cleaned up manually */
1260         topic = NULL;
1261         sub = NULL;
1262
1263         return AST_TEST_PASS;
1264 }
1265
1266 static const char *noop_get_id(struct stasis_message *message)
1267 {
1268         return NULL;
1269 }
1270
1271 AST_TEST_DEFINE(caching_dtor_order)
1272 {
1273         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1274         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1275         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
1276                 stasis_caching_unsubscribe);
1277         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1278
1279         switch (cmd) {
1280         case TEST_INIT:
1281                 info->name = __func__;
1282                 info->category = test_category;
1283                 info->summary = "Test that destruction order doesn't bomb stuff";
1284                 info->description = "Test that destruction order doesn't bomb stuff";
1285                 return AST_TEST_NOT_RUN;
1286         case TEST_EXECUTE:
1287                 break;
1288         }
1289
1290         cache = stasis_cache_create(noop_get_id);
1291         ast_test_validate(test, NULL != cache);
1292
1293         topic = stasis_topic_create("test-topic");
1294         ast_test_validate(test, NULL != topic);
1295
1296         caching_topic = stasis_caching_topic_create(topic, cache);
1297         ast_test_validate(test, NULL != caching_topic);
1298
1299         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
1300                 NULL);
1301         ast_test_validate(test, NULL != sub);
1302
1303         /* With any luck, this won't completely blow everything up */
1304         ao2_cleanup(cache);
1305         ao2_cleanup(topic);
1306         stasis_caching_unsubscribe(caching_topic);
1307         stasis_unsubscribe(sub);
1308
1309         /* These refs were cleaned up manually */
1310         cache = NULL;
1311         topic = NULL;
1312         caching_topic = NULL;
1313         sub = NULL;
1314
1315         return AST_TEST_PASS;
1316 }
1317
1318 static int unload_module(void)
1319 {
1320         AST_TEST_UNREGISTER(message_type);
1321         AST_TEST_UNREGISTER(message);
1322         AST_TEST_UNREGISTER(subscription_messages);
1323         AST_TEST_UNREGISTER(publish);
1324         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1325         AST_TEST_UNREGISTER(forward);
1326         AST_TEST_UNREGISTER(cache_filter);
1327         AST_TEST_UNREGISTER(cache);
1328         AST_TEST_UNREGISTER(cache_dump);
1329         AST_TEST_UNREGISTER(router);
1330         AST_TEST_UNREGISTER(router_cache_updates);
1331         AST_TEST_UNREGISTER(interleaving);
1332         AST_TEST_UNREGISTER(no_to_json);
1333         AST_TEST_UNREGISTER(to_json);
1334         AST_TEST_UNREGISTER(no_to_ami);
1335         AST_TEST_UNREGISTER(to_ami);
1336         AST_TEST_UNREGISTER(dtor_order);
1337         AST_TEST_UNREGISTER(caching_dtor_order);
1338         return 0;
1339 }
1340
1341 static int load_module(void)
1342 {
1343         AST_TEST_REGISTER(message_type);
1344         AST_TEST_REGISTER(message);
1345         AST_TEST_REGISTER(subscription_messages);
1346         AST_TEST_REGISTER(publish);
1347         AST_TEST_REGISTER(unsubscribe_stops_messages);
1348         AST_TEST_REGISTER(forward);
1349         AST_TEST_REGISTER(cache_filter);
1350         AST_TEST_REGISTER(cache);
1351         AST_TEST_REGISTER(cache_dump);
1352         AST_TEST_REGISTER(router);
1353         AST_TEST_REGISTER(router_cache_updates);
1354         AST_TEST_REGISTER(interleaving);
1355         AST_TEST_REGISTER(no_to_json);
1356         AST_TEST_REGISTER(to_json);
1357         AST_TEST_REGISTER(no_to_ami);
1358         AST_TEST_REGISTER(to_ami);
1359         AST_TEST_REGISTER(dtor_order);
1360         AST_TEST_REGISTER(caching_dtor_order);
1361         return AST_MODULE_LOAD_SUCCESS;
1362 }
1363
1364 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1365                 .load = load_module,
1366                 .unload = unload_module
1367         );