Add missing CR/LF to FakeMI stasis test AMI event.
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, NULL == stasis_message_type_create(NULL, NULL));
88         uut = stasis_message_type_create("SomeMessage", NULL);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102
103         switch (cmd) {
104         case TEST_INIT:
105                 info->name = __func__;
106                 info->category = test_category;
107                 info->summary = "Test basic message functions";
108                 info->description = "Test basic message functions";
109                 return AST_TEST_NOT_RUN;
110         case TEST_EXECUTE:
111                 break;
112         }
113
114
115         type = stasis_message_type_create("SomeMessage", NULL);
116
117         ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
118         ast_test_validate(test, NULL == stasis_message_create(type, NULL));
119
120         data = ao2_alloc(strlen(expected) + 1, NULL);
121         strcpy(data, expected);
122         expected_timestamp = ast_tvnow();
123         uut = stasis_message_create(type, data);
124
125         ast_test_validate(test, NULL != uut);
126         ast_test_validate(test, type == stasis_message_type(uut));
127         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
128         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
129
130         time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
131         /* 10ms is certainly long enough for the two calls to complete */
132         ast_test_validate(test, time_diff.tv_sec == 0);
133         ast_test_validate(test, time_diff.tv_usec < 10000);
134
135         ao2_ref(uut, -1);
136         uut = NULL;
137         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
138
139         return AST_TEST_PASS;
140 }
141
142 struct consumer {
143         ast_mutex_t lock;
144         ast_cond_t out;
145         struct stasis_message **messages_rxed;
146         size_t messages_rxed_len;
147         int ignore_subscriptions;
148         int complete;
149 };
150
151 static void consumer_dtor(void *obj) {
152         struct consumer *consumer = obj;
153
154         ast_mutex_destroy(&consumer->lock);
155         ast_cond_destroy(&consumer->out);
156
157         while (consumer->messages_rxed_len > 0) {
158                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
159         }
160         ast_free(consumer->messages_rxed);
161         consumer->messages_rxed = NULL;
162 }
163
164 static struct consumer *consumer_create(int ignore_subscriptions) {
165         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
166
167         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
168
169         if (!consumer) {
170                 return NULL;
171         }
172
173         consumer->ignore_subscriptions = ignore_subscriptions;
174         consumer->messages_rxed = ast_malloc(0);
175         if (!consumer->messages_rxed) {
176                 return NULL;
177         }
178
179         ast_mutex_init(&consumer->lock);
180         ast_cond_init(&consumer->out, NULL);
181
182         ao2_ref(consumer, +1);
183         return consumer;
184 }
185
186 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
187 {
188         struct consumer *consumer = data;
189         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
190         SCOPED_MUTEX(lock, &consumer->lock);
191
192         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
193
194                 ++consumer->messages_rxed_len;
195                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
196                 ast_assert(consumer->messages_rxed != NULL);
197                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
198                 ao2_ref(message, +1);
199         }
200
201         if (stasis_subscription_final_message(sub, message)) {
202                 consumer->complete = 1;
203                 consumer_needs_cleanup = consumer;
204         }
205
206         ast_cond_signal(&consumer->out);
207 }
208
209 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
210 {
211         struct timeval start = ast_tvnow();
212         struct timespec end = {
213                 .tv_sec = start.tv_sec + 30,
214                 .tv_nsec = start.tv_usec * 1000
215         };
216
217         SCOPED_MUTEX(lock, &consumer->lock);
218
219         while (consumer->messages_rxed_len < expected_len) {
220                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
221                 if (r == ETIMEDOUT) {
222                         break;
223                 }
224                 ast_assert(r == 0); /* Not expecting any othet types of errors */
225         }
226         return consumer->messages_rxed_len;
227 }
228
229 static int consumer_wait_for_completion(struct consumer *consumer)
230 {
231         struct timeval start = ast_tvnow();
232         struct timespec end = {
233                 .tv_sec = start.tv_sec + 3,
234                 .tv_nsec = start.tv_usec * 1000
235         };
236
237         SCOPED_MUTEX(lock, &consumer->lock);
238
239         while (!consumer->complete) {
240                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
241                 if (r == ETIMEDOUT) {
242                         break;
243                 }
244                 ast_assert(r == 0); /* Not expecting any othet types of errors */
245         }
246         return consumer->complete;
247 }
248
249 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
250 {
251         struct timeval start = ast_tvnow();
252         struct timeval diff = {
253                 .tv_sec = 0,
254                 .tv_usec = 100000 /* wait for 100ms */
255         };
256         struct timeval end_tv = ast_tvadd(start, diff);
257         struct timespec end = {
258                 .tv_sec = end_tv.tv_sec,
259                 .tv_nsec = end_tv.tv_usec * 1000
260         };
261
262         SCOPED_MUTEX(lock, &consumer->lock);
263
264         while (consumer->messages_rxed_len == expected_len) {
265                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
266                 if (r == ETIMEDOUT) {
267                         break;
268                 }
269                 ast_assert(r == 0); /* Not expecting any othet types of errors */
270         }
271         return consumer->messages_rxed_len;
272 }
273
274 AST_TEST_DEFINE(subscription_messages)
275 {
276         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
277         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
278         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
279         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
280         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
281         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
282         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
283         int complete;
284         struct stasis_subscription_change *change;
285
286         switch (cmd) {
287         case TEST_INIT:
288                 info->name = __func__;
289                 info->category = test_category;
290                 info->summary = "Test subscribe/unsubscribe messages";
291                 info->description = "Test subscribe/unsubscribe messages";
292                 return AST_TEST_NOT_RUN;
293         case TEST_EXECUTE:
294                 break;
295         }
296
297         topic = stasis_topic_create("TestTopic");
298         ast_test_validate(test, NULL != topic);
299
300         consumer = consumer_create(0);
301         ast_test_validate(test, NULL != consumer);
302
303         uut = stasis_subscribe(topic, consumer_exec, consumer);
304         ast_test_validate(test, NULL != uut);
305         ao2_ref(consumer, +1);
306         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
307
308         uut = stasis_unsubscribe(uut);
309         complete = consumer_wait_for_completion(consumer);
310         ast_test_validate(test, 1 == complete);
311
312         ast_test_validate(test, 2 == consumer->messages_rxed_len);
313         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
314         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
315
316         change = stasis_message_data(consumer->messages_rxed[0]);
317         ast_test_validate(test, topic == change->topic);
318         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
319         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
320
321         change = stasis_message_data(consumer->messages_rxed[1]);
322         ast_test_validate(test, topic == change->topic);
323         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
324         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
325
326         return AST_TEST_PASS;
327 }
328
329 AST_TEST_DEFINE(publish)
330 {
331         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
332         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
333         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
334         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
335         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
336         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
337         int actual_len;
338         const char *actual;
339
340         switch (cmd) {
341         case TEST_INIT:
342                 info->name = __func__;
343                 info->category = test_category;
344                 info->summary = "Test simple subscriptions";
345                 info->description = "Test simple subscriptions";
346                 return AST_TEST_NOT_RUN;
347         case TEST_EXECUTE:
348                 break;
349         }
350
351         topic = stasis_topic_create("TestTopic");
352         ast_test_validate(test, NULL != topic);
353
354         consumer = consumer_create(1);
355         ast_test_validate(test, NULL != consumer);
356
357         uut = stasis_subscribe(topic, consumer_exec, consumer);
358         ast_test_validate(test, NULL != uut);
359         ao2_ref(consumer, +1);
360
361         test_data = ao2_alloc(1, NULL);
362         ast_test_validate(test, NULL != test_data);
363         test_message_type = stasis_message_type_create("TestMessage", NULL);
364         test_message = stasis_message_create(test_message_type, test_data);
365
366         stasis_publish(topic, test_message);
367
368         actual_len = consumer_wait_for(consumer, 1);
369         ast_test_validate(test, 1 == actual_len);
370         actual = stasis_message_data(consumer->messages_rxed[0]);
371         ast_test_validate(test, test_data == actual);
372
373         return AST_TEST_PASS;
374 }
375
376 AST_TEST_DEFINE(unsubscribe_stops_messages)
377 {
378         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
379         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
380         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
381         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
382         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
383         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
384         int actual_len;
385
386         switch (cmd) {
387         case TEST_INIT:
388                 info->name = __func__;
389                 info->category = test_category;
390                 info->summary = "Test simple subscriptions";
391                 info->description = "Test simple subscriptions";
392                 return AST_TEST_NOT_RUN;
393         case TEST_EXECUTE:
394                 break;
395         }
396
397         topic = stasis_topic_create("TestTopic");
398         ast_test_validate(test, NULL != topic);
399
400         consumer = consumer_create(1);
401         ast_test_validate(test, NULL != consumer);
402
403         uut = stasis_subscribe(topic, consumer_exec, consumer);
404         ast_test_validate(test, NULL != uut);
405         ao2_ref(consumer, +1);
406
407         uut = stasis_unsubscribe(uut);
408
409         test_data = ao2_alloc(1, NULL);
410         ast_test_validate(test, NULL != test_data);
411         test_message_type = stasis_message_type_create("TestMessage", NULL);
412         test_message = stasis_message_create(test_message_type, test_data);
413
414         stasis_publish(topic, test_message);
415
416         actual_len = consumer_should_stay(consumer, 0);
417         ast_test_validate(test, 0 == actual_len);
418
419         return AST_TEST_PASS;
420 }
421
422 AST_TEST_DEFINE(forward)
423 {
424         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
425         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
426
427         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
428         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
429
430         RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
431         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
432         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
433
434         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
435         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
436         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
437         int actual_len;
438
439         switch (cmd) {
440         case TEST_INIT:
441                 info->name = __func__;
442                 info->category = test_category;
443                 info->summary = "Test sending events to a parent topic";
444                 info->description = "Test sending events to a parent topic.\n"
445                         "This test creates three topics (one parent, two children)\n"
446                         "and publishes a message to one child, and verifies it's\n"
447                         "only seen by that child and the parent";
448                 return AST_TEST_NOT_RUN;
449         case TEST_EXECUTE:
450                 break;
451         }
452
453         parent_topic = stasis_topic_create("ParentTestTopic");
454         ast_test_validate(test, NULL != parent_topic);
455         topic = stasis_topic_create("TestTopic");
456         ast_test_validate(test, NULL != topic);
457
458         forward_sub = stasis_forward_all(topic, parent_topic);
459         ast_test_validate(test, NULL != forward_sub);
460
461         parent_consumer = consumer_create(1);
462         ast_test_validate(test, NULL != parent_consumer);
463         consumer = consumer_create(1);
464         ast_test_validate(test, NULL != consumer);
465
466         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
467         ast_test_validate(test, NULL != parent_sub);
468         ao2_ref(parent_consumer, +1);
469         sub = stasis_subscribe(topic, consumer_exec, consumer);
470         ast_test_validate(test, NULL != sub);
471         ao2_ref(consumer, +1);
472
473         test_data = ao2_alloc(1, NULL);
474         ast_test_validate(test, NULL != test_data);
475         test_message_type = stasis_message_type_create("TestMessage", NULL);
476         test_message = stasis_message_create(test_message_type, test_data);
477
478         stasis_publish(topic, test_message);
479
480         actual_len = consumer_wait_for(consumer, 1);
481         ast_test_validate(test, 1 == actual_len);
482         actual_len = consumer_wait_for(parent_consumer, 1);
483         ast_test_validate(test, 1 == actual_len);
484
485         return AST_TEST_PASS;
486 }
487
488 AST_TEST_DEFINE(interleaving)
489 {
490         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
491         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
492         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
493
494         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
495
496         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
497
498         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
499         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
500         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
501
502         RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
503         RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
504         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
505
506         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
507
508         int actual_len;
509
510         switch (cmd) {
511         case TEST_INIT:
512                 info->name = __func__;
513                 info->category = test_category;
514                 info->summary = "Test sending interleaved events to a parent topic";
515                 info->description = "Test sending events to a parent topic.\n"
516                         "This test creates three topics (one parent, two children)\n"
517                         "and publishes messages alternately between the children.\n"
518                         "It verifies that the messages are received in the expected\n"
519                         "order.";
520                 return AST_TEST_NOT_RUN;
521         case TEST_EXECUTE:
522                 break;
523         }
524
525         test_message_type = stasis_message_type_create("test", NULL);
526         ast_test_validate(test, NULL != test_message_type);
527
528         test_data = ao2_alloc(1, NULL);
529         ast_test_validate(test, NULL != test_data);
530
531         test_message1 = stasis_message_create(test_message_type, test_data);
532         ast_test_validate(test, NULL != test_message1);
533         test_message2 = stasis_message_create(test_message_type, test_data);
534         ast_test_validate(test, NULL != test_message2);
535         test_message3 = stasis_message_create(test_message_type, test_data);
536         ast_test_validate(test, NULL != test_message3);
537
538         parent_topic = stasis_topic_create("ParentTestTopic");
539         ast_test_validate(test, NULL != parent_topic);
540         topic1 = stasis_topic_create("Topic1");
541         ast_test_validate(test, NULL != topic1);
542         topic2 = stasis_topic_create("Topic2");
543         ast_test_validate(test, NULL != topic2);
544
545         forward_sub1 = stasis_forward_all(topic1, parent_topic);
546         ast_test_validate(test, NULL != forward_sub1);
547         forward_sub2 = stasis_forward_all(topic2, parent_topic);
548         ast_test_validate(test, NULL != forward_sub2);
549
550         consumer = consumer_create(1);
551         ast_test_validate(test, NULL != consumer);
552
553         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
554         ast_test_validate(test, NULL != sub);
555         ao2_ref(consumer, +1);
556
557         stasis_publish(topic1, test_message1);
558         stasis_publish(topic2, test_message2);
559         stasis_publish(topic1, test_message3);
560
561         actual_len = consumer_wait_for(consumer, 3);
562         ast_test_validate(test, 3 == actual_len);
563
564         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
565         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
566         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
567
568         return AST_TEST_PASS;
569 }
570
571 struct cache_test_data {
572         char *id;
573         char *value;
574 };
575
576 static void cache_test_data_dtor(void *obj)
577 {
578         struct cache_test_data *data = obj;
579         ast_free(data->id);
580         ast_free(data->value);
581 }
582
583 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
584 {
585         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
586
587         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
588         if (data == NULL) {
589                 return NULL;
590         }
591
592         ast_assert(name != NULL);
593         ast_assert(value != NULL);
594
595         data->id = ast_strdup(name);
596         data->value = ast_strdup(value);
597         if (!data->id || !data->value) {
598                 return NULL;
599         }
600
601         return stasis_message_create(type, data);
602 }
603
604 static const char *cache_test_data_id(struct stasis_message *message) {
605         struct cache_test_data *cachable = stasis_message_data(message);
606
607         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
608                 return NULL;
609         }
610         return cachable->id;
611 }
612
613 AST_TEST_DEFINE(cache_filter)
614 {
615         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
616         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
617         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
618         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
619         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
620         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
621         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
622         int actual_len;
623
624         switch (cmd) {
625         case TEST_INIT:
626                 info->name = __func__;
627                 info->category = test_category;
628                 info->summary = "Test caching topics only forward cache_update messages.";
629                 info->description = "Test caching topics only forward cache_update messages.";
630                 return AST_TEST_NOT_RUN;
631         case TEST_EXECUTE:
632                 break;
633         }
634
635         non_cache_type = stasis_message_type_create("NonCacheable", NULL);
636         ast_test_validate(test, NULL != non_cache_type);
637         topic = stasis_topic_create("SomeTopic");
638         ast_test_validate(test, NULL != topic);
639         cache = stasis_cache_create(cache_test_data_id);
640         ast_test_validate(test, NULL != cache);
641         caching_topic = stasis_caching_topic_create(topic, cache);
642         ast_test_validate(test, NULL != caching_topic);
643         consumer = consumer_create(1);
644         ast_test_validate(test, NULL != consumer);
645         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
646         ast_test_validate(test, NULL != sub);
647         ao2_ref(consumer, +1);
648
649         test_message = cache_test_message_create(non_cache_type, "1", "1");
650         ast_test_validate(test, NULL != test_message);
651
652         stasis_publish(topic, test_message);
653
654         actual_len = consumer_should_stay(consumer, 0);
655         ast_test_validate(test, 0 == actual_len);
656
657         return AST_TEST_PASS;
658 }
659
660 AST_TEST_DEFINE(cache)
661 {
662         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
663         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
664         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
665         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
666         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
667         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
668         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
669         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
670         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
671         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
672         int actual_len;
673         struct stasis_cache_update *actual_update;
674
675         switch (cmd) {
676         case TEST_INIT:
677                 info->name = __func__;
678                 info->category = test_category;
679                 info->summary = "Test passing messages through cache topic unscathed.";
680                 info->description = "Test passing messages through cache topic unscathed.";
681                 return AST_TEST_NOT_RUN;
682         case TEST_EXECUTE:
683                 break;
684         }
685
686         cache_type = stasis_message_type_create("Cacheable", NULL);
687         ast_test_validate(test, NULL != cache_type);
688         topic = stasis_topic_create("SomeTopic");
689         ast_test_validate(test, NULL != topic);
690         cache = stasis_cache_create(cache_test_data_id);
691         ast_test_validate(test, NULL != cache);
692         caching_topic = stasis_caching_topic_create(topic, cache);
693         ast_test_validate(test, NULL != caching_topic);
694         consumer = consumer_create(1);
695         ast_test_validate(test, NULL != consumer);
696         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
697         ast_test_validate(test, NULL != sub);
698         ao2_ref(consumer, +1);
699
700         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
701         ast_test_validate(test, NULL != test_message1_1);
702         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
703         ast_test_validate(test, NULL != test_message2_1);
704
705         /* Post a couple of snapshots */
706         stasis_publish(topic, test_message1_1);
707         stasis_publish(topic, test_message2_1);
708         actual_len = consumer_wait_for(consumer, 2);
709         ast_test_validate(test, 2 == actual_len);
710
711         /* Check for new snapshot messages */
712         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
713         actual_update = stasis_message_data(consumer->messages_rxed[0]);
714         ast_test_validate(test, topic == actual_update->topic);
715         ast_test_validate(test, NULL == actual_update->old_snapshot);
716         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
717         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
718         /* stasis_cache_get returned a ref, so unref test_message1_1 */
719         ao2_ref(test_message1_1, -1);
720
721         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
722         actual_update = stasis_message_data(consumer->messages_rxed[1]);
723         ast_test_validate(test, topic == actual_update->topic);
724         ast_test_validate(test, NULL == actual_update->old_snapshot);
725         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
726         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
727         /* stasis_cache_get returned a ref, so unref test_message2_1 */
728         ao2_ref(test_message2_1, -1);
729
730         /* Update snapshot 2 */
731         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
732         ast_test_validate(test, NULL != test_message2_2);
733         stasis_publish(topic, test_message2_2);
734
735         actual_len = consumer_wait_for(consumer, 3);
736         ast_test_validate(test, 3 == actual_len);
737
738         actual_update = stasis_message_data(consumer->messages_rxed[2]);
739         ast_test_validate(test, topic == actual_update->topic);
740         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
741         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
742         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
743         /* stasis_cache_get returned a ref, so unref test_message2_2 */
744         ao2_ref(test_message2_2, -1);
745
746         /* Clear snapshot 1 */
747         test_message1_clear = stasis_cache_clear_create(test_message1_1);
748         ast_test_validate(test, NULL != test_message1_clear);
749         stasis_publish(topic, test_message1_clear);
750
751         actual_len = consumer_wait_for(consumer, 4);
752         ast_test_validate(test, 4 == actual_len);
753
754         actual_update = stasis_message_data(consumer->messages_rxed[3]);
755         ast_test_validate(test, topic == actual_update->topic);
756         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
757         ast_test_validate(test, NULL == actual_update->new_snapshot);
758         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
759
760         return AST_TEST_PASS;
761 }
762
763 AST_TEST_DEFINE(cache_dump)
764 {
765         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
766         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
767         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
768         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
769         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
770         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
771         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
772         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
773         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
774         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
775         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
776         int actual_len;
777         struct ao2_iterator i;
778         void *obj;
779
780         switch (cmd) {
781         case TEST_INIT:
782                 info->name = __func__;
783                 info->category = test_category;
784                 info->summary = "Test passing messages through cache topic unscathed.";
785                 info->description = "Test passing messages through cache topic unscathed.";
786                 return AST_TEST_NOT_RUN;
787         case TEST_EXECUTE:
788                 break;
789         }
790
791         cache_type = stasis_message_type_create("Cacheable", NULL);
792         ast_test_validate(test, NULL != cache_type);
793         topic = stasis_topic_create("SomeTopic");
794         ast_test_validate(test, NULL != topic);
795         cache = stasis_cache_create(cache_test_data_id);
796         ast_test_validate(test, NULL != cache);
797         caching_topic = stasis_caching_topic_create(topic, cache);
798         ast_test_validate(test, NULL != caching_topic);
799         consumer = consumer_create(1);
800         ast_test_validate(test, NULL != consumer);
801         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
802         ast_test_validate(test, NULL != sub);
803         ao2_ref(consumer, +1);
804
805         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
806         ast_test_validate(test, NULL != test_message1_1);
807         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
808         ast_test_validate(test, NULL != test_message2_1);
809
810         /* Post a couple of snapshots */
811         stasis_publish(topic, test_message1_1);
812         stasis_publish(topic, test_message2_1);
813         actual_len = consumer_wait_for(consumer, 2);
814         ast_test_validate(test, 2 == actual_len);
815
816         /* Check the cache */
817         cache_dump = stasis_cache_dump(cache, NULL);
818         ast_test_validate(test, NULL != cache_dump);
819         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
820         i = ao2_iterator_init(cache_dump, 0);
821         while ((obj = ao2_iterator_next(&i))) {
822                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
823                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
824         }
825
826         /* Update snapshot 2 */
827         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
828         ast_test_validate(test, NULL != test_message2_2);
829         stasis_publish(topic, test_message2_2);
830
831         actual_len = consumer_wait_for(consumer, 3);
832         ast_test_validate(test, 3 == actual_len);
833
834         /* Check the cache */
835         cache_dump = stasis_cache_dump(cache, NULL);
836         ast_test_validate(test, NULL != cache_dump);
837         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
838         i = ao2_iterator_init(cache_dump, 0);
839         while ((obj = ao2_iterator_next(&i))) {
840                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
841                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
842         }
843
844         /* Clear snapshot 1 */
845         test_message1_clear = stasis_cache_clear_create(test_message1_1);
846         ast_test_validate(test, NULL != test_message1_clear);
847         stasis_publish(topic, test_message1_clear);
848
849         actual_len = consumer_wait_for(consumer, 4);
850         ast_test_validate(test, 4 == actual_len);
851
852         /* Check the cache */
853         cache_dump = stasis_cache_dump(cache, NULL);
854         ast_test_validate(test, NULL != cache_dump);
855         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
856         i = ao2_iterator_init(cache_dump, 0);
857         while ((obj = ao2_iterator_next(&i))) {
858                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
859                 ast_test_validate(test, actual_cache_entry == test_message2_2);
860         }
861
862         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
863         ao2_cleanup(cache_dump);
864         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
865         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
866
867         return AST_TEST_PASS;
868 }
869
870 AST_TEST_DEFINE(route_conflicts)
871 {
872         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
873         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
874         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
875         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
876         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
877         int ret;
878
879         switch (cmd) {
880         case TEST_INIT:
881                 info->name = __func__;
882                 info->category = test_category;
883                 info->summary =
884                         "Multiple routes to the same message_type should fail";
885                 info->description =
886                         "Multiple routes to the same message_type should fail";
887                 return AST_TEST_NOT_RUN;
888         case TEST_EXECUTE:
889                 break;
890         }
891
892         topic = stasis_topic_create("TestTopic");
893         ast_test_validate(test, NULL != topic);
894
895         consumer1 = consumer_create(1);
896         ast_test_validate(test, NULL != consumer1);
897         consumer2 = consumer_create(1);
898         ast_test_validate(test, NULL != consumer2);
899
900         test_message_type = stasis_message_type_create("TestMessage", NULL);
901         ast_test_validate(test, NULL != test_message_type);
902
903         uut = stasis_message_router_create(topic);
904         ast_test_validate(test, NULL != uut);
905
906         ret = stasis_message_router_add(
907                 uut, test_message_type, consumer_exec, consumer1);
908         ast_test_validate(test, 0 == ret);
909         ret = stasis_message_router_add(
910                 uut, test_message_type, consumer_exec, consumer2);
911         ast_test_validate(test, 0 != ret);
912
913         return AST_TEST_PASS;
914 }
915
916 AST_TEST_DEFINE(router)
917 {
918         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
919         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
920         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
921         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
922         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
923         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
924         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
925         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
926         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
927         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
928         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
929         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
930         int actual_len, ret;
931         struct stasis_message *actual;
932
933         switch (cmd) {
934         case TEST_INIT:
935                 info->name = __func__;
936                 info->category = test_category;
937                 info->summary = "Test simple message routing";
938                 info->description = "Test simple message routing";
939                 return AST_TEST_NOT_RUN;
940         case TEST_EXECUTE:
941                 break;
942         }
943
944         topic = stasis_topic_create("TestTopic");
945         ast_test_validate(test, NULL != topic);
946
947         consumer1 = consumer_create(1);
948         ast_test_validate(test, NULL != consumer1);
949         consumer2 = consumer_create(1);
950         ast_test_validate(test, NULL != consumer2);
951         consumer3 = consumer_create(1);
952         ast_test_validate(test, NULL != consumer3);
953
954         test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
955         ast_test_validate(test, NULL != test_message_type1);
956         test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
957         ast_test_validate(test, NULL != test_message_type2);
958         test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
959         ast_test_validate(test, NULL != test_message_type3);
960
961         uut = stasis_message_router_create(topic);
962         ast_test_validate(test, NULL != uut);
963
964         ret = stasis_message_router_add(
965                 uut, test_message_type1, consumer_exec, consumer1);
966         ast_test_validate(test, 0 == ret);
967         ao2_ref(consumer1, +1);
968         ret = stasis_message_router_add(
969                 uut, test_message_type2, consumer_exec, consumer2);
970         ast_test_validate(test, 0 == ret);
971         ao2_ref(consumer2, +1);
972         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
973         ast_test_validate(test, 0 == ret);
974         ao2_ref(consumer3, +1);
975
976         test_data = ao2_alloc(1, NULL);
977         ast_test_validate(test, NULL != test_data);
978         test_message1 = stasis_message_create(test_message_type1, test_data);
979         ast_test_validate(test, NULL != test_message1);
980         test_message2 = stasis_message_create(test_message_type2, test_data);
981         ast_test_validate(test, NULL != test_message2);
982         test_message3 = stasis_message_create(test_message_type3, test_data);
983         ast_test_validate(test, NULL != test_message3);
984
985         stasis_publish(topic, test_message1);
986         stasis_publish(topic, test_message2);
987         stasis_publish(topic, test_message3);
988
989         actual_len = consumer_wait_for(consumer1, 1);
990         ast_test_validate(test, 1 == actual_len);
991         actual_len = consumer_wait_for(consumer2, 1);
992         ast_test_validate(test, 1 == actual_len);
993         actual_len = consumer_wait_for(consumer3, 1);
994         ast_test_validate(test, 1 == actual_len);
995
996         actual = consumer1->messages_rxed[0];
997         ast_test_validate(test, test_message1 == actual);
998
999         actual = consumer2->messages_rxed[0];
1000         ast_test_validate(test, test_message2 == actual);
1001
1002         actual = consumer3->messages_rxed[0];
1003         ast_test_validate(test, test_message3 == actual);
1004
1005         /* consumer1 and consumer2 do not get the final message. */
1006         ao2_cleanup(consumer1);
1007         ao2_cleanup(consumer2);
1008
1009         return AST_TEST_PASS;
1010 }
1011
1012 static const char *cache_simple(struct stasis_message *message) {
1013         const char *type_name =
1014                 stasis_message_type_name(stasis_message_type(message));
1015         if (!ast_begins_with(type_name, "Cache")) {
1016                 return NULL;
1017         }
1018
1019         return "cached";
1020 }
1021
1022 AST_TEST_DEFINE(router_cache_updates)
1023 {
1024         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1025         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1026         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1027         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1028         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1029         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1030         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1031         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1032         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1033         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1034         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1035         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1036         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1037         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1038         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1039         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1040         struct stasis_cache_update *update;
1041         int actual_len, ret;
1042         struct stasis_message *actual;
1043
1044         switch (cmd) {
1045         case TEST_INIT:
1046                 info->name = __func__;
1047                 info->category = test_category;
1048                 info->summary = "Test special handling cache_update messages";
1049                 info->description = "Test special handling cache_update messages";
1050                 return AST_TEST_NOT_RUN;
1051         case TEST_EXECUTE:
1052                 break;
1053         }
1054
1055         topic = stasis_topic_create("TestTopic");
1056         ast_test_validate(test, NULL != topic);
1057
1058         cache = stasis_cache_create(cache_simple);
1059         ast_test_validate(test, NULL != cache);
1060         caching_topic = stasis_caching_topic_create(topic, cache);
1061         ast_test_validate(test, NULL != caching_topic);
1062
1063         consumer1 = consumer_create(1);
1064         ast_test_validate(test, NULL != consumer1);
1065         consumer2 = consumer_create(1);
1066         ast_test_validate(test, NULL != consumer2);
1067         consumer3 = consumer_create(1);
1068         ast_test_validate(test, NULL != consumer3);
1069
1070         test_message_type1 = stasis_message_type_create("Cache1", NULL);
1071         ast_test_validate(test, NULL != test_message_type1);
1072         test_message_type2 = stasis_message_type_create("Cache2", NULL);
1073         ast_test_validate(test, NULL != test_message_type2);
1074         test_message_type3 = stasis_message_type_create("NonCache", NULL);
1075         ast_test_validate(test, NULL != test_message_type3);
1076
1077         uut = stasis_message_router_create(
1078                 stasis_caching_get_topic(caching_topic));
1079         ast_test_validate(test, NULL != uut);
1080
1081         ret = stasis_message_router_add_cache_update(
1082                 uut, test_message_type1, consumer_exec, consumer1);
1083         ast_test_validate(test, 0 == ret);
1084         ao2_ref(consumer1, +1);
1085         ret = stasis_message_router_add(
1086                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1087         ast_test_validate(test, 0 == ret);
1088         ao2_ref(consumer2, +1);
1089         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1090         ast_test_validate(test, 0 == ret);
1091         ao2_ref(consumer3, +1);
1092
1093         test_data = ao2_alloc(1, NULL);
1094         ast_test_validate(test, NULL != test_data);
1095         test_message1 = stasis_message_create(test_message_type1, test_data);
1096         ast_test_validate(test, NULL != test_message1);
1097         test_message2 = stasis_message_create(test_message_type2, test_data);
1098         ast_test_validate(test, NULL != test_message2);
1099         test_message3 = stasis_message_create(test_message_type3, test_data);
1100         ast_test_validate(test, NULL != test_message3);
1101
1102         stasis_publish(topic, test_message1);
1103         stasis_publish(topic, test_message2);
1104         stasis_publish(topic, test_message3);
1105
1106         actual_len = consumer_wait_for(consumer1, 1);
1107         ast_test_validate(test, 1 == actual_len);
1108         actual_len = consumer_wait_for(consumer2, 1);
1109         ast_test_validate(test, 1 == actual_len);
1110         /* Uncacheable message should not be passed through */
1111         actual_len = consumer_should_stay(consumer3, 0);
1112         ast_test_validate(test, 0 == actual_len);
1113
1114         actual = consumer1->messages_rxed[0];
1115         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1116         update = stasis_message_data(actual);
1117         ast_test_validate(test, test_message_type1 == update->type);
1118         ast_test_validate(test, test_message1 == update->new_snapshot);
1119
1120         actual = consumer2->messages_rxed[0];
1121         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1122         update = stasis_message_data(actual);
1123         ast_test_validate(test, test_message_type2 == update->type);
1124         ast_test_validate(test, test_message2 == update->new_snapshot);
1125
1126         /* consumer1 and consumer2 do not get the final message. */
1127         ao2_cleanup(consumer1);
1128         ao2_cleanup(consumer2);
1129
1130         return AST_TEST_PASS;
1131 }
1132
1133 AST_TEST_DEFINE(no_to_json)
1134 {
1135         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1136         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1137         RAII_VAR(char *, data, NULL, ao2_cleanup);
1138         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1139         char *expected = "SomeData";
1140
1141         switch (cmd) {
1142         case TEST_INIT:
1143                 info->name = __func__;
1144                 info->category = test_category;
1145                 info->summary = "Test message to_json function";
1146                 info->description = "Test message to_json function";
1147                 return AST_TEST_NOT_RUN;
1148         case TEST_EXECUTE:
1149                 break;
1150         }
1151
1152         /* Test NULL */
1153         actual = stasis_message_to_json(NULL);
1154         ast_test_validate(test, NULL == actual);
1155
1156         /* Test message with NULL to_json function */
1157         type = stasis_message_type_create("SomeMessage", NULL);
1158
1159         data = ao2_alloc(strlen(expected) + 1, NULL);
1160         strcpy(data, expected);
1161         uut = stasis_message_create(type, data);
1162         ast_test_validate(test, NULL != uut);
1163
1164         actual = stasis_message_to_json(uut);
1165         ast_test_validate(test, NULL == actual);
1166
1167         return AST_TEST_PASS;
1168 }
1169
1170 AST_TEST_DEFINE(to_json)
1171 {
1172         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1173         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1174         RAII_VAR(char *, data, NULL, ao2_cleanup);
1175         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1176         const char *expected_text = "SomeData";
1177         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1178
1179         switch (cmd) {
1180         case TEST_INIT:
1181                 info->name = __func__;
1182                 info->category = test_category;
1183                 info->summary = "Test message to_json function when NULL";
1184                 info->description = "Test message to_json function when NULL";
1185                 return AST_TEST_NOT_RUN;
1186         case TEST_EXECUTE:
1187                 break;
1188         }
1189
1190         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1191
1192         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1193         strcpy(data, expected_text);
1194         uut = stasis_message_create(type, data);
1195         ast_test_validate(test, NULL != uut);
1196
1197         expected = ast_json_string_create(expected_text);
1198         actual = stasis_message_to_json(uut);
1199         ast_test_validate(test, ast_json_equal(expected, actual));
1200
1201         return AST_TEST_PASS;
1202 }
1203
1204 AST_TEST_DEFINE(no_to_ami)
1205 {
1206         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1207         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1208         RAII_VAR(char *, data, NULL, ao2_cleanup);
1209         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1210         char *expected = "SomeData";
1211
1212         switch (cmd) {
1213         case TEST_INIT:
1214                 info->name = __func__;
1215                 info->category = test_category;
1216                 info->summary = "Test message to_ami function when NULL";
1217                 info->description = "Test message to_ami function when NULL";
1218                 return AST_TEST_NOT_RUN;
1219         case TEST_EXECUTE:
1220                 break;
1221         }
1222
1223         /* Test NULL */
1224         actual = stasis_message_to_ami(NULL);
1225         ast_test_validate(test, NULL == actual);
1226
1227         /* Test message with NULL to_ami function */
1228         type = stasis_message_type_create("SomeMessage", NULL);
1229
1230         data = ao2_alloc(strlen(expected) + 1, NULL);
1231         strcpy(data, expected);
1232         uut = stasis_message_create(type, data);
1233         ast_test_validate(test, NULL != uut);
1234
1235         actual = stasis_message_to_ami(uut);
1236         ast_test_validate(test, NULL == actual);
1237
1238         return AST_TEST_PASS;
1239 }
1240
1241 AST_TEST_DEFINE(to_ami)
1242 {
1243         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1244         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1245         RAII_VAR(char *, data, NULL, ao2_cleanup);
1246         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1247         const char *expected_text = "SomeData";
1248         const char *expected = "Message: SomeData";
1249
1250         switch (cmd) {
1251         case TEST_INIT:
1252                 info->name = __func__;
1253                 info->category = test_category;
1254                 info->summary = "Test message to_ami function";
1255                 info->description = "Test message to_ami function";
1256                 return AST_TEST_NOT_RUN;
1257         case TEST_EXECUTE:
1258                 break;
1259         }
1260
1261         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1262
1263         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1264         strcpy(data, expected_text);
1265         uut = stasis_message_create(type, data);
1266         ast_test_validate(test, NULL != uut);
1267
1268         actual = stasis_message_to_ami(uut);
1269         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1270
1271         return AST_TEST_PASS;
1272 }
1273
1274 static int unload_module(void)
1275 {
1276         AST_TEST_UNREGISTER(message_type);
1277         AST_TEST_UNREGISTER(message);
1278         AST_TEST_UNREGISTER(subscription_messages);
1279         AST_TEST_UNREGISTER(publish);
1280         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1281         AST_TEST_UNREGISTER(forward);
1282         AST_TEST_UNREGISTER(cache_filter);
1283         AST_TEST_UNREGISTER(cache);
1284         AST_TEST_UNREGISTER(cache_dump);
1285         AST_TEST_UNREGISTER(route_conflicts);
1286         AST_TEST_UNREGISTER(router);
1287         AST_TEST_UNREGISTER(router_cache_updates);
1288         AST_TEST_UNREGISTER(interleaving);
1289         AST_TEST_UNREGISTER(no_to_json);
1290         AST_TEST_UNREGISTER(to_json);
1291         AST_TEST_UNREGISTER(no_to_ami);
1292         AST_TEST_UNREGISTER(to_ami);
1293         return 0;
1294 }
1295
1296 static int load_module(void)
1297 {
1298         AST_TEST_REGISTER(message_type);
1299         AST_TEST_REGISTER(message);
1300         AST_TEST_REGISTER(subscription_messages);
1301         AST_TEST_REGISTER(publish);
1302         AST_TEST_REGISTER(unsubscribe_stops_messages);
1303         AST_TEST_REGISTER(forward);
1304         AST_TEST_REGISTER(cache_filter);
1305         AST_TEST_REGISTER(cache);
1306         AST_TEST_REGISTER(cache_dump);
1307         AST_TEST_REGISTER(route_conflicts);
1308         AST_TEST_REGISTER(router);
1309         AST_TEST_REGISTER(router_cache_updates);
1310         AST_TEST_REGISTER(interleaving);
1311         AST_TEST_REGISTER(no_to_json);
1312         AST_TEST_REGISTER(to_json);
1313         AST_TEST_REGISTER(no_to_ami);
1314         AST_TEST_REGISTER(to_ami);
1315         return AST_MODULE_LOAD_SUCCESS;
1316 }
1317
1318 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1319                 .load = load_module,
1320                 .unload = unload_module
1321         );