0cfce2c3f99f64c26976e48d1133b626a79afd35
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, NULL == stasis_message_type_create(NULL, NULL));
88         uut = stasis_message_type_create("SomeMessage", NULL);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102
103         switch (cmd) {
104         case TEST_INIT:
105                 info->name = __func__;
106                 info->category = test_category;
107                 info->summary = "Test basic message functions";
108                 info->description = "Test basic message functions";
109                 return AST_TEST_NOT_RUN;
110         case TEST_EXECUTE:
111                 break;
112         }
113
114
115         type = stasis_message_type_create("SomeMessage", NULL);
116
117         ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
118         ast_test_validate(test, NULL == stasis_message_create(type, NULL));
119
120         data = ao2_alloc(strlen(expected) + 1, NULL);
121         strcpy(data, expected);
122         expected_timestamp = ast_tvnow();
123         uut = stasis_message_create(type, data);
124
125         ast_test_validate(test, NULL != uut);
126         ast_test_validate(test, type == stasis_message_type(uut));
127         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
128         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
129
130         time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
131         /* 10ms is certainly long enough for the two calls to complete */
132         ast_test_validate(test, time_diff.tv_sec == 0);
133         ast_test_validate(test, time_diff.tv_usec < 10000);
134
135         ao2_ref(uut, -1);
136         uut = NULL;
137         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
138
139         return AST_TEST_PASS;
140 }
141
142 struct consumer {
143         ast_mutex_t lock;
144         ast_cond_t out;
145         struct stasis_message **messages_rxed;
146         size_t messages_rxed_len;
147         int ignore_subscriptions;
148         int complete;
149 };
150
151 static void consumer_dtor(void *obj) {
152         struct consumer *consumer = obj;
153
154         ast_mutex_destroy(&consumer->lock);
155         ast_cond_destroy(&consumer->out);
156
157         while (consumer->messages_rxed_len > 0) {
158                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
159         }
160         ast_free(consumer->messages_rxed);
161         consumer->messages_rxed = NULL;
162 }
163
164 static struct consumer *consumer_create(int ignore_subscriptions) {
165         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
166
167         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
168
169         if (!consumer) {
170                 return NULL;
171         }
172
173         consumer->ignore_subscriptions = ignore_subscriptions;
174         consumer->messages_rxed = ast_malloc(0);
175         if (!consumer->messages_rxed) {
176                 return NULL;
177         }
178
179         ast_mutex_init(&consumer->lock);
180         ast_cond_init(&consumer->out, NULL);
181
182         ao2_ref(consumer, +1);
183         return consumer;
184 }
185
186 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
187 {
188         struct consumer *consumer = data;
189         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
190         SCOPED_MUTEX(lock, &consumer->lock);
191
192         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
193
194                 ++consumer->messages_rxed_len;
195                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
196                 ast_assert(consumer->messages_rxed != NULL);
197                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
198                 ao2_ref(message, +1);
199         }
200
201         if (stasis_subscription_final_message(sub, message)) {
202                 consumer->complete = 1;
203                 consumer_needs_cleanup = consumer;
204         }
205
206         ast_cond_signal(&consumer->out);
207 }
208
209 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
210 {
211         struct consumer *consumer = data;
212         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
213         SCOPED_MUTEX(lock, &consumer->lock);
214
215         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
216
217                 ++consumer->messages_rxed_len;
218                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
219                 ast_assert(consumer->messages_rxed != NULL);
220                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
221                 ao2_ref(message, +1);
222         }
223
224         if (stasis_subscription_final_message(sub, message)) {
225                 consumer->complete = 1;
226                 consumer_needs_cleanup = consumer;
227         }
228 }
229
230 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
231 {
232         struct timeval start = ast_tvnow();
233         struct timespec end = {
234                 .tv_sec = start.tv_sec + 30,
235                 .tv_nsec = start.tv_usec * 1000
236         };
237
238         SCOPED_MUTEX(lock, &consumer->lock);
239
240         while (consumer->messages_rxed_len < expected_len) {
241                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
242                 if (r == ETIMEDOUT) {
243                         break;
244                 }
245                 ast_assert(r == 0); /* Not expecting any othet types of errors */
246         }
247         return consumer->messages_rxed_len;
248 }
249
250 static int consumer_wait_for_completion(struct consumer *consumer)
251 {
252         struct timeval start = ast_tvnow();
253         struct timespec end = {
254                 .tv_sec = start.tv_sec + 3,
255                 .tv_nsec = start.tv_usec * 1000
256         };
257
258         SCOPED_MUTEX(lock, &consumer->lock);
259
260         while (!consumer->complete) {
261                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
262                 if (r == ETIMEDOUT) {
263                         break;
264                 }
265                 ast_assert(r == 0); /* Not expecting any othet types of errors */
266         }
267         return consumer->complete;
268 }
269
270 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
271 {
272         struct timeval start = ast_tvnow();
273         struct timeval diff = {
274                 .tv_sec = 0,
275                 .tv_usec = 100000 /* wait for 100ms */
276         };
277         struct timeval end_tv = ast_tvadd(start, diff);
278         struct timespec end = {
279                 .tv_sec = end_tv.tv_sec,
280                 .tv_nsec = end_tv.tv_usec * 1000
281         };
282
283         SCOPED_MUTEX(lock, &consumer->lock);
284
285         while (consumer->messages_rxed_len == expected_len) {
286                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
287                 if (r == ETIMEDOUT) {
288                         break;
289                 }
290                 ast_assert(r == 0); /* Not expecting any othet types of errors */
291         }
292         return consumer->messages_rxed_len;
293 }
294
295 AST_TEST_DEFINE(subscription_messages)
296 {
297         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
298         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
299         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
300         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
301         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
302         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
303         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
304         int complete;
305         struct stasis_subscription_change *change;
306
307         switch (cmd) {
308         case TEST_INIT:
309                 info->name = __func__;
310                 info->category = test_category;
311                 info->summary = "Test subscribe/unsubscribe messages";
312                 info->description = "Test subscribe/unsubscribe messages";
313                 return AST_TEST_NOT_RUN;
314         case TEST_EXECUTE:
315                 break;
316         }
317
318         topic = stasis_topic_create("TestTopic");
319         ast_test_validate(test, NULL != topic);
320
321         consumer = consumer_create(0);
322         ast_test_validate(test, NULL != consumer);
323
324         uut = stasis_subscribe(topic, consumer_exec, consumer);
325         ast_test_validate(test, NULL != uut);
326         ao2_ref(consumer, +1);
327         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
328
329         uut = stasis_unsubscribe(uut);
330         complete = consumer_wait_for_completion(consumer);
331         ast_test_validate(test, 1 == complete);
332
333         ast_test_validate(test, 2 == consumer->messages_rxed_len);
334         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
335         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
336
337         change = stasis_message_data(consumer->messages_rxed[0]);
338         ast_test_validate(test, topic == change->topic);
339         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
340         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
341
342         change = stasis_message_data(consumer->messages_rxed[1]);
343         ast_test_validate(test, topic == change->topic);
344         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
345         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
346
347         return AST_TEST_PASS;
348 }
349
350 AST_TEST_DEFINE(publish)
351 {
352         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
353         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
354         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
355         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
356         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
357         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
358         int actual_len;
359         const char *actual;
360
361         switch (cmd) {
362         case TEST_INIT:
363                 info->name = __func__;
364                 info->category = test_category;
365                 info->summary = "Test publishing";
366                 info->description = "Test publishing";
367                 return AST_TEST_NOT_RUN;
368         case TEST_EXECUTE:
369                 break;
370         }
371
372         topic = stasis_topic_create("TestTopic");
373         ast_test_validate(test, NULL != topic);
374
375         consumer = consumer_create(1);
376         ast_test_validate(test, NULL != consumer);
377
378         uut = stasis_subscribe(topic, consumer_exec, consumer);
379         ast_test_validate(test, NULL != uut);
380         ao2_ref(consumer, +1);
381
382         test_data = ao2_alloc(1, NULL);
383         ast_test_validate(test, NULL != test_data);
384         test_message_type = stasis_message_type_create("TestMessage", NULL);
385         test_message = stasis_message_create(test_message_type, test_data);
386
387         stasis_publish(topic, test_message);
388
389         actual_len = consumer_wait_for(consumer, 1);
390         ast_test_validate(test, 1 == actual_len);
391         actual = stasis_message_data(consumer->messages_rxed[0]);
392         ast_test_validate(test, test_data == actual);
393
394         return AST_TEST_PASS;
395 }
396
397 AST_TEST_DEFINE(publish_sync)
398 {
399         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
400         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
401         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
402         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
403         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
404         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
405         int actual_len;
406         const char *actual;
407
408         switch (cmd) {
409         case TEST_INIT:
410                 info->name = __func__;
411                 info->category = test_category;
412                 info->summary = "Test synchronous publishing";
413                 info->description = "Test synchronous publishing";
414                 return AST_TEST_NOT_RUN;
415         case TEST_EXECUTE:
416                 break;
417         }
418
419         topic = stasis_topic_create("TestTopic");
420         ast_test_validate(test, NULL != topic);
421
422         consumer = consumer_create(1);
423         ast_test_validate(test, NULL != consumer);
424
425         uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
426         ast_test_validate(test, NULL != uut);
427         ao2_ref(consumer, +1);
428
429         test_data = ao2_alloc(1, NULL);
430         ast_test_validate(test, NULL != test_data);
431         test_message_type = stasis_message_type_create("TestMessage", NULL);
432         test_message = stasis_message_create(test_message_type, test_data);
433
434         stasis_publish_sync(uut, test_message);
435
436         actual_len = consumer->messages_rxed_len;
437         ast_test_validate(test, 1 == actual_len);
438         actual = stasis_message_data(consumer->messages_rxed[0]);
439         ast_test_validate(test, test_data == actual);
440
441         return AST_TEST_PASS;
442 }
443
444 AST_TEST_DEFINE(unsubscribe_stops_messages)
445 {
446         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
447         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
448         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
449         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
450         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
451         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
452         int actual_len;
453
454         switch (cmd) {
455         case TEST_INIT:
456                 info->name = __func__;
457                 info->category = test_category;
458                 info->summary = "Test simple subscriptions";
459                 info->description = "Test simple subscriptions";
460                 return AST_TEST_NOT_RUN;
461         case TEST_EXECUTE:
462                 break;
463         }
464
465         topic = stasis_topic_create("TestTopic");
466         ast_test_validate(test, NULL != topic);
467
468         consumer = consumer_create(1);
469         ast_test_validate(test, NULL != consumer);
470
471         uut = stasis_subscribe(topic, consumer_exec, consumer);
472         ast_test_validate(test, NULL != uut);
473         ao2_ref(consumer, +1);
474
475         uut = stasis_unsubscribe(uut);
476
477         test_data = ao2_alloc(1, NULL);
478         ast_test_validate(test, NULL != test_data);
479         test_message_type = stasis_message_type_create("TestMessage", NULL);
480         test_message = stasis_message_create(test_message_type, test_data);
481
482         stasis_publish(topic, test_message);
483
484         actual_len = consumer_should_stay(consumer, 0);
485         ast_test_validate(test, 0 == actual_len);
486
487         return AST_TEST_PASS;
488 }
489
490 AST_TEST_DEFINE(forward)
491 {
492         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
493         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
494
495         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
496         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
497
498         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
499         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
500         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
501
502         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
503         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
504         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
505         int actual_len;
506
507         switch (cmd) {
508         case TEST_INIT:
509                 info->name = __func__;
510                 info->category = test_category;
511                 info->summary = "Test sending events to a parent topic";
512                 info->description = "Test sending events to a parent topic.\n"
513                         "This test creates three topics (one parent, two children)\n"
514                         "and publishes a message to one child, and verifies it's\n"
515                         "only seen by that child and the parent";
516                 return AST_TEST_NOT_RUN;
517         case TEST_EXECUTE:
518                 break;
519         }
520
521         parent_topic = stasis_topic_create("ParentTestTopic");
522         ast_test_validate(test, NULL != parent_topic);
523         topic = stasis_topic_create("TestTopic");
524         ast_test_validate(test, NULL != topic);
525
526         forward_sub = stasis_forward_all(topic, parent_topic);
527         ast_test_validate(test, NULL != forward_sub);
528
529         parent_consumer = consumer_create(1);
530         ast_test_validate(test, NULL != parent_consumer);
531         consumer = consumer_create(1);
532         ast_test_validate(test, NULL != consumer);
533
534         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
535         ast_test_validate(test, NULL != parent_sub);
536         ao2_ref(parent_consumer, +1);
537         sub = stasis_subscribe(topic, consumer_exec, consumer);
538         ast_test_validate(test, NULL != sub);
539         ao2_ref(consumer, +1);
540
541         test_data = ao2_alloc(1, NULL);
542         ast_test_validate(test, NULL != test_data);
543         test_message_type = stasis_message_type_create("TestMessage", NULL);
544         test_message = stasis_message_create(test_message_type, test_data);
545
546         stasis_publish(topic, test_message);
547
548         actual_len = consumer_wait_for(consumer, 1);
549         ast_test_validate(test, 1 == actual_len);
550         actual_len = consumer_wait_for(parent_consumer, 1);
551         ast_test_validate(test, 1 == actual_len);
552
553         return AST_TEST_PASS;
554 }
555
556 AST_TEST_DEFINE(interleaving)
557 {
558         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
559         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
560         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
561
562         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
563
564         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
565
566         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
567         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
568         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
569
570         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
571         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
572         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
573
574         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
575
576         int actual_len;
577
578         switch (cmd) {
579         case TEST_INIT:
580                 info->name = __func__;
581                 info->category = test_category;
582                 info->summary = "Test sending interleaved events to a parent topic";
583                 info->description = "Test sending events to a parent topic.\n"
584                         "This test creates three topics (one parent, two children)\n"
585                         "and publishes messages alternately between the children.\n"
586                         "It verifies that the messages are received in the expected\n"
587                         "order.";
588                 return AST_TEST_NOT_RUN;
589         case TEST_EXECUTE:
590                 break;
591         }
592
593         test_message_type = stasis_message_type_create("test", NULL);
594         ast_test_validate(test, NULL != test_message_type);
595
596         test_data = ao2_alloc(1, NULL);
597         ast_test_validate(test, NULL != test_data);
598
599         test_message1 = stasis_message_create(test_message_type, test_data);
600         ast_test_validate(test, NULL != test_message1);
601         test_message2 = stasis_message_create(test_message_type, test_data);
602         ast_test_validate(test, NULL != test_message2);
603         test_message3 = stasis_message_create(test_message_type, test_data);
604         ast_test_validate(test, NULL != test_message3);
605
606         parent_topic = stasis_topic_create("ParentTestTopic");
607         ast_test_validate(test, NULL != parent_topic);
608         topic1 = stasis_topic_create("Topic1");
609         ast_test_validate(test, NULL != topic1);
610         topic2 = stasis_topic_create("Topic2");
611         ast_test_validate(test, NULL != topic2);
612
613         forward_sub1 = stasis_forward_all(topic1, parent_topic);
614         ast_test_validate(test, NULL != forward_sub1);
615         forward_sub2 = stasis_forward_all(topic2, parent_topic);
616         ast_test_validate(test, NULL != forward_sub2);
617
618         consumer = consumer_create(1);
619         ast_test_validate(test, NULL != consumer);
620
621         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
622         ast_test_validate(test, NULL != sub);
623         ao2_ref(consumer, +1);
624
625         stasis_publish(topic1, test_message1);
626         stasis_publish(topic2, test_message2);
627         stasis_publish(topic1, test_message3);
628
629         actual_len = consumer_wait_for(consumer, 3);
630         ast_test_validate(test, 3 == actual_len);
631
632         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
633         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
634         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
635
636         return AST_TEST_PASS;
637 }
638
639 struct cache_test_data {
640         char *id;
641         char *value;
642 };
643
644 static void cache_test_data_dtor(void *obj)
645 {
646         struct cache_test_data *data = obj;
647         ast_free(data->id);
648         ast_free(data->value);
649 }
650
651 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
652 {
653         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
654
655         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
656         if (data == NULL) {
657                 return NULL;
658         }
659
660         ast_assert(name != NULL);
661         ast_assert(value != NULL);
662
663         data->id = ast_strdup(name);
664         data->value = ast_strdup(value);
665         if (!data->id || !data->value) {
666                 return NULL;
667         }
668
669         return stasis_message_create(type, data);
670 }
671
672 static const char *cache_test_data_id(struct stasis_message *message) {
673         struct cache_test_data *cachable = stasis_message_data(message);
674
675         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
676                 return NULL;
677         }
678         return cachable->id;
679 }
680
681 AST_TEST_DEFINE(cache_filter)
682 {
683         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
684         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
685         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
686         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
687         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
688         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
689         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
690         int actual_len;
691
692         switch (cmd) {
693         case TEST_INIT:
694                 info->name = __func__;
695                 info->category = test_category;
696                 info->summary = "Test caching topics only forward cache_update messages.";
697                 info->description = "Test caching topics only forward cache_update messages.";
698                 return AST_TEST_NOT_RUN;
699         case TEST_EXECUTE:
700                 break;
701         }
702
703         non_cache_type = stasis_message_type_create("NonCacheable", NULL);
704         ast_test_validate(test, NULL != non_cache_type);
705         topic = stasis_topic_create("SomeTopic");
706         ast_test_validate(test, NULL != topic);
707         cache = stasis_cache_create(cache_test_data_id);
708         ast_test_validate(test, NULL != cache);
709         caching_topic = stasis_caching_topic_create(topic, cache);
710         ast_test_validate(test, NULL != caching_topic);
711         consumer = consumer_create(1);
712         ast_test_validate(test, NULL != consumer);
713         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
714         ast_test_validate(test, NULL != sub);
715         ao2_ref(consumer, +1);
716
717         test_message = cache_test_message_create(non_cache_type, "1", "1");
718         ast_test_validate(test, NULL != test_message);
719
720         stasis_publish(topic, test_message);
721
722         actual_len = consumer_should_stay(consumer, 0);
723         ast_test_validate(test, 0 == actual_len);
724
725         return AST_TEST_PASS;
726 }
727
728 AST_TEST_DEFINE(cache)
729 {
730         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
731         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
732         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
733         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
734         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
735         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
736         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
737         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
738         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
739         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
740         int actual_len;
741         struct stasis_cache_update *actual_update;
742
743         switch (cmd) {
744         case TEST_INIT:
745                 info->name = __func__;
746                 info->category = test_category;
747                 info->summary = "Test passing messages through cache topic unscathed.";
748                 info->description = "Test passing messages through cache topic unscathed.";
749                 return AST_TEST_NOT_RUN;
750         case TEST_EXECUTE:
751                 break;
752         }
753
754         cache_type = stasis_message_type_create("Cacheable", NULL);
755         ast_test_validate(test, NULL != cache_type);
756         topic = stasis_topic_create("SomeTopic");
757         ast_test_validate(test, NULL != topic);
758         cache = stasis_cache_create(cache_test_data_id);
759         ast_test_validate(test, NULL != cache);
760         caching_topic = stasis_caching_topic_create(topic, cache);
761         ast_test_validate(test, NULL != caching_topic);
762         consumer = consumer_create(1);
763         ast_test_validate(test, NULL != consumer);
764         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
765         ast_test_validate(test, NULL != sub);
766         ao2_ref(consumer, +1);
767
768         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
769         ast_test_validate(test, NULL != test_message1_1);
770         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
771         ast_test_validate(test, NULL != test_message2_1);
772
773         /* Post a couple of snapshots */
774         stasis_publish(topic, test_message1_1);
775         stasis_publish(topic, test_message2_1);
776         actual_len = consumer_wait_for(consumer, 2);
777         ast_test_validate(test, 2 == actual_len);
778
779         /* Check for new snapshot messages */
780         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
781         actual_update = stasis_message_data(consumer->messages_rxed[0]);
782         ast_test_validate(test, NULL == actual_update->old_snapshot);
783         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
784         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
785         /* stasis_cache_get returned a ref, so unref test_message1_1 */
786         ao2_ref(test_message1_1, -1);
787
788         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
789         actual_update = stasis_message_data(consumer->messages_rxed[1]);
790         ast_test_validate(test, NULL == actual_update->old_snapshot);
791         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
792         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
793         /* stasis_cache_get returned a ref, so unref test_message2_1 */
794         ao2_ref(test_message2_1, -1);
795
796         /* Update snapshot 2 */
797         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
798         ast_test_validate(test, NULL != test_message2_2);
799         stasis_publish(topic, test_message2_2);
800
801         actual_len = consumer_wait_for(consumer, 3);
802         ast_test_validate(test, 3 == actual_len);
803
804         actual_update = stasis_message_data(consumer->messages_rxed[2]);
805         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
806         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
807         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
808         /* stasis_cache_get returned a ref, so unref test_message2_2 */
809         ao2_ref(test_message2_2, -1);
810
811         /* Clear snapshot 1 */
812         test_message1_clear = stasis_cache_clear_create(test_message1_1);
813         ast_test_validate(test, NULL != test_message1_clear);
814         stasis_publish(topic, test_message1_clear);
815
816         actual_len = consumer_wait_for(consumer, 4);
817         ast_test_validate(test, 4 == actual_len);
818
819         actual_update = stasis_message_data(consumer->messages_rxed[3]);
820         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
821         ast_test_validate(test, NULL == actual_update->new_snapshot);
822         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
823
824         return AST_TEST_PASS;
825 }
826
827 AST_TEST_DEFINE(cache_dump)
828 {
829         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
830         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
831         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
832         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
833         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
834         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
835         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
836         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
837         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
838         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
839         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
840         int actual_len;
841         struct ao2_iterator i;
842         void *obj;
843
844         switch (cmd) {
845         case TEST_INIT:
846                 info->name = __func__;
847                 info->category = test_category;
848                 info->summary = "Test passing messages through cache topic unscathed.";
849                 info->description = "Test passing messages through cache topic unscathed.";
850                 return AST_TEST_NOT_RUN;
851         case TEST_EXECUTE:
852                 break;
853         }
854
855         cache_type = stasis_message_type_create("Cacheable", NULL);
856         ast_test_validate(test, NULL != cache_type);
857         topic = stasis_topic_create("SomeTopic");
858         ast_test_validate(test, NULL != topic);
859         cache = stasis_cache_create(cache_test_data_id);
860         ast_test_validate(test, NULL != cache);
861         caching_topic = stasis_caching_topic_create(topic, cache);
862         ast_test_validate(test, NULL != caching_topic);
863         consumer = consumer_create(1);
864         ast_test_validate(test, NULL != consumer);
865         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
866         ast_test_validate(test, NULL != sub);
867         ao2_ref(consumer, +1);
868
869         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
870         ast_test_validate(test, NULL != test_message1_1);
871         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
872         ast_test_validate(test, NULL != test_message2_1);
873
874         /* Post a couple of snapshots */
875         stasis_publish(topic, test_message1_1);
876         stasis_publish(topic, test_message2_1);
877         actual_len = consumer_wait_for(consumer, 2);
878         ast_test_validate(test, 2 == actual_len);
879
880         /* Check the cache */
881         ao2_cleanup(cache_dump);
882         cache_dump = stasis_cache_dump(cache, NULL);
883         ast_test_validate(test, NULL != cache_dump);
884         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
885         i = ao2_iterator_init(cache_dump, 0);
886         while ((obj = ao2_iterator_next(&i))) {
887                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
888                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
889         }
890         ao2_iterator_destroy(&i);
891
892         /* Update snapshot 2 */
893         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
894         ast_test_validate(test, NULL != test_message2_2);
895         stasis_publish(topic, test_message2_2);
896
897         actual_len = consumer_wait_for(consumer, 3);
898         ast_test_validate(test, 3 == actual_len);
899
900         /* Check the cache */
901         ao2_cleanup(cache_dump);
902         cache_dump = stasis_cache_dump(cache, NULL);
903         ast_test_validate(test, NULL != cache_dump);
904         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
905         i = ao2_iterator_init(cache_dump, 0);
906         while ((obj = ao2_iterator_next(&i))) {
907                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
908                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
909         }
910         ao2_iterator_destroy(&i);
911
912         /* Clear snapshot 1 */
913         test_message1_clear = stasis_cache_clear_create(test_message1_1);
914         ast_test_validate(test, NULL != test_message1_clear);
915         stasis_publish(topic, test_message1_clear);
916
917         actual_len = consumer_wait_for(consumer, 4);
918         ast_test_validate(test, 4 == actual_len);
919
920         /* Check the cache */
921         ao2_cleanup(cache_dump);
922         cache_dump = stasis_cache_dump(cache, NULL);
923         ast_test_validate(test, NULL != cache_dump);
924         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
925         i = ao2_iterator_init(cache_dump, 0);
926         while ((obj = ao2_iterator_next(&i))) {
927                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
928                 ast_test_validate(test, actual_cache_entry == test_message2_2);
929         }
930         ao2_iterator_destroy(&i);
931
932         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
933         ao2_cleanup(cache_dump);
934         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
935         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
936
937         return AST_TEST_PASS;
938 }
939
940 AST_TEST_DEFINE(router)
941 {
942         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
943         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
944         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
945         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
946         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
947         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
948         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
949         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
950         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
951         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
952         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
953         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
954         int actual_len, ret;
955         struct stasis_message *actual;
956
957         switch (cmd) {
958         case TEST_INIT:
959                 info->name = __func__;
960                 info->category = test_category;
961                 info->summary = "Test simple message routing";
962                 info->description = "Test simple message routing";
963                 return AST_TEST_NOT_RUN;
964         case TEST_EXECUTE:
965                 break;
966         }
967
968         topic = stasis_topic_create("TestTopic");
969         ast_test_validate(test, NULL != topic);
970
971         consumer1 = consumer_create(1);
972         ast_test_validate(test, NULL != consumer1);
973         consumer2 = consumer_create(1);
974         ast_test_validate(test, NULL != consumer2);
975         consumer3 = consumer_create(1);
976         ast_test_validate(test, NULL != consumer3);
977
978         test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
979         ast_test_validate(test, NULL != test_message_type1);
980         test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
981         ast_test_validate(test, NULL != test_message_type2);
982         test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
983         ast_test_validate(test, NULL != test_message_type3);
984
985         uut = stasis_message_router_create(topic);
986         ast_test_validate(test, NULL != uut);
987
988         ret = stasis_message_router_add(
989                 uut, test_message_type1, consumer_exec, consumer1);
990         ast_test_validate(test, 0 == ret);
991         ao2_ref(consumer1, +1);
992         ret = stasis_message_router_add(
993                 uut, test_message_type2, consumer_exec, consumer2);
994         ast_test_validate(test, 0 == ret);
995         ao2_ref(consumer2, +1);
996         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
997         ast_test_validate(test, 0 == ret);
998         ao2_ref(consumer3, +1);
999
1000         test_data = ao2_alloc(1, NULL);
1001         ast_test_validate(test, NULL != test_data);
1002         test_message1 = stasis_message_create(test_message_type1, test_data);
1003         ast_test_validate(test, NULL != test_message1);
1004         test_message2 = stasis_message_create(test_message_type2, test_data);
1005         ast_test_validate(test, NULL != test_message2);
1006         test_message3 = stasis_message_create(test_message_type3, test_data);
1007         ast_test_validate(test, NULL != test_message3);
1008
1009         stasis_publish(topic, test_message1);
1010         stasis_publish(topic, test_message2);
1011         stasis_publish(topic, test_message3);
1012
1013         actual_len = consumer_wait_for(consumer1, 1);
1014         ast_test_validate(test, 1 == actual_len);
1015         actual_len = consumer_wait_for(consumer2, 1);
1016         ast_test_validate(test, 1 == actual_len);
1017         actual_len = consumer_wait_for(consumer3, 1);
1018         ast_test_validate(test, 1 == actual_len);
1019
1020         actual = consumer1->messages_rxed[0];
1021         ast_test_validate(test, test_message1 == actual);
1022
1023         actual = consumer2->messages_rxed[0];
1024         ast_test_validate(test, test_message2 == actual);
1025
1026         actual = consumer3->messages_rxed[0];
1027         ast_test_validate(test, test_message3 == actual);
1028
1029         /* consumer1 and consumer2 do not get the final message. */
1030         ao2_cleanup(consumer1);
1031         ao2_cleanup(consumer2);
1032
1033         return AST_TEST_PASS;
1034 }
1035
1036 static const char *cache_simple(struct stasis_message *message) {
1037         const char *type_name =
1038                 stasis_message_type_name(stasis_message_type(message));
1039         if (!ast_begins_with(type_name, "Cache")) {
1040                 return NULL;
1041         }
1042
1043         return "cached";
1044 }
1045
1046 AST_TEST_DEFINE(router_cache_updates)
1047 {
1048         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1049         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1050         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1051         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1052         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1053         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1054         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1055         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1056         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1057         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1058         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1059         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1060         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1061         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1062         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1063         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1064         struct stasis_cache_update *update;
1065         int actual_len, ret;
1066         struct stasis_message *actual;
1067
1068         switch (cmd) {
1069         case TEST_INIT:
1070                 info->name = __func__;
1071                 info->category = test_category;
1072                 info->summary = "Test special handling cache_update messages";
1073                 info->description = "Test special handling cache_update messages";
1074                 return AST_TEST_NOT_RUN;
1075         case TEST_EXECUTE:
1076                 break;
1077         }
1078
1079         topic = stasis_topic_create("TestTopic");
1080         ast_test_validate(test, NULL != topic);
1081
1082         cache = stasis_cache_create(cache_simple);
1083         ast_test_validate(test, NULL != cache);
1084         caching_topic = stasis_caching_topic_create(topic, cache);
1085         ast_test_validate(test, NULL != caching_topic);
1086
1087         consumer1 = consumer_create(1);
1088         ast_test_validate(test, NULL != consumer1);
1089         consumer2 = consumer_create(1);
1090         ast_test_validate(test, NULL != consumer2);
1091         consumer3 = consumer_create(1);
1092         ast_test_validate(test, NULL != consumer3);
1093
1094         test_message_type1 = stasis_message_type_create("Cache1", NULL);
1095         ast_test_validate(test, NULL != test_message_type1);
1096         test_message_type2 = stasis_message_type_create("Cache2", NULL);
1097         ast_test_validate(test, NULL != test_message_type2);
1098         test_message_type3 = stasis_message_type_create("NonCache", NULL);
1099         ast_test_validate(test, NULL != test_message_type3);
1100
1101         uut = stasis_message_router_create(
1102                 stasis_caching_get_topic(caching_topic));
1103         ast_test_validate(test, NULL != uut);
1104
1105         ret = stasis_message_router_add_cache_update(
1106                 uut, test_message_type1, consumer_exec, consumer1);
1107         ast_test_validate(test, 0 == ret);
1108         ao2_ref(consumer1, +1);
1109         ret = stasis_message_router_add(
1110                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1111         ast_test_validate(test, 0 == ret);
1112         ao2_ref(consumer2, +1);
1113         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1114         ast_test_validate(test, 0 == ret);
1115         ao2_ref(consumer3, +1);
1116
1117         test_data = ao2_alloc(1, NULL);
1118         ast_test_validate(test, NULL != test_data);
1119         test_message1 = stasis_message_create(test_message_type1, test_data);
1120         ast_test_validate(test, NULL != test_message1);
1121         test_message2 = stasis_message_create(test_message_type2, test_data);
1122         ast_test_validate(test, NULL != test_message2);
1123         test_message3 = stasis_message_create(test_message_type3, test_data);
1124         ast_test_validate(test, NULL != test_message3);
1125
1126         stasis_publish(topic, test_message1);
1127         stasis_publish(topic, test_message2);
1128         stasis_publish(topic, test_message3);
1129
1130         actual_len = consumer_wait_for(consumer1, 1);
1131         ast_test_validate(test, 1 == actual_len);
1132         actual_len = consumer_wait_for(consumer2, 1);
1133         ast_test_validate(test, 1 == actual_len);
1134         /* Uncacheable message should not be passed through */
1135         actual_len = consumer_should_stay(consumer3, 0);
1136         ast_test_validate(test, 0 == actual_len);
1137
1138         actual = consumer1->messages_rxed[0];
1139         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1140         update = stasis_message_data(actual);
1141         ast_test_validate(test, test_message_type1 == update->type);
1142         ast_test_validate(test, test_message1 == update->new_snapshot);
1143
1144         actual = consumer2->messages_rxed[0];
1145         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1146         update = stasis_message_data(actual);
1147         ast_test_validate(test, test_message_type2 == update->type);
1148         ast_test_validate(test, test_message2 == update->new_snapshot);
1149
1150         /* consumer1 and consumer2 do not get the final message. */
1151         ao2_cleanup(consumer1);
1152         ao2_cleanup(consumer2);
1153
1154         return AST_TEST_PASS;
1155 }
1156
1157 AST_TEST_DEFINE(no_to_json)
1158 {
1159         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1160         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1161         RAII_VAR(char *, data, NULL, ao2_cleanup);
1162         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1163         char *expected = "SomeData";
1164
1165         switch (cmd) {
1166         case TEST_INIT:
1167                 info->name = __func__;
1168                 info->category = test_category;
1169                 info->summary = "Test message to_json function";
1170                 info->description = "Test message to_json function";
1171                 return AST_TEST_NOT_RUN;
1172         case TEST_EXECUTE:
1173                 break;
1174         }
1175
1176         /* Test NULL */
1177         actual = stasis_message_to_json(NULL, NULL);
1178         ast_test_validate(test, NULL == actual);
1179
1180         /* Test message with NULL to_json function */
1181         type = stasis_message_type_create("SomeMessage", NULL);
1182
1183         data = ao2_alloc(strlen(expected) + 1, NULL);
1184         strcpy(data, expected);
1185         uut = stasis_message_create(type, data);
1186         ast_test_validate(test, NULL != uut);
1187
1188         actual = stasis_message_to_json(uut, NULL);
1189         ast_test_validate(test, NULL == actual);
1190
1191         return AST_TEST_PASS;
1192 }
1193
1194 AST_TEST_DEFINE(to_json)
1195 {
1196         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1197         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1198         RAII_VAR(char *, data, NULL, ao2_cleanup);
1199         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1200         const char *expected_text = "SomeData";
1201         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1202
1203         switch (cmd) {
1204         case TEST_INIT:
1205                 info->name = __func__;
1206                 info->category = test_category;
1207                 info->summary = "Test message to_json function when NULL";
1208                 info->description = "Test message to_json function when NULL";
1209                 return AST_TEST_NOT_RUN;
1210         case TEST_EXECUTE:
1211                 break;
1212         }
1213
1214         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1215
1216         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1217         strcpy(data, expected_text);
1218         uut = stasis_message_create(type, data);
1219         ast_test_validate(test, NULL != uut);
1220
1221         expected = ast_json_string_create(expected_text);
1222         actual = stasis_message_to_json(uut, NULL);
1223         ast_test_validate(test, ast_json_equal(expected, actual));
1224
1225         return AST_TEST_PASS;
1226 }
1227
1228 AST_TEST_DEFINE(no_to_ami)
1229 {
1230         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1231         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1232         RAII_VAR(char *, data, NULL, ao2_cleanup);
1233         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1234         char *expected = "SomeData";
1235
1236         switch (cmd) {
1237         case TEST_INIT:
1238                 info->name = __func__;
1239                 info->category = test_category;
1240                 info->summary = "Test message to_ami function when NULL";
1241                 info->description = "Test message to_ami function when NULL";
1242                 return AST_TEST_NOT_RUN;
1243         case TEST_EXECUTE:
1244                 break;
1245         }
1246
1247         /* Test NULL */
1248         actual = stasis_message_to_ami(NULL);
1249         ast_test_validate(test, NULL == actual);
1250
1251         /* Test message with NULL to_ami function */
1252         type = stasis_message_type_create("SomeMessage", NULL);
1253
1254         data = ao2_alloc(strlen(expected) + 1, NULL);
1255         strcpy(data, expected);
1256         uut = stasis_message_create(type, data);
1257         ast_test_validate(test, NULL != uut);
1258
1259         actual = stasis_message_to_ami(uut);
1260         ast_test_validate(test, NULL == actual);
1261
1262         return AST_TEST_PASS;
1263 }
1264
1265 AST_TEST_DEFINE(to_ami)
1266 {
1267         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1268         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1269         RAII_VAR(char *, data, NULL, ao2_cleanup);
1270         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1271         const char *expected_text = "SomeData";
1272         const char *expected = "Message: SomeData\r\n";
1273
1274         switch (cmd) {
1275         case TEST_INIT:
1276                 info->name = __func__;
1277                 info->category = test_category;
1278                 info->summary = "Test message to_ami function";
1279                 info->description = "Test message to_ami function";
1280                 return AST_TEST_NOT_RUN;
1281         case TEST_EXECUTE:
1282                 break;
1283         }
1284
1285         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1286
1287         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1288         strcpy(data, expected_text);
1289         uut = stasis_message_create(type, data);
1290         ast_test_validate(test, NULL != uut);
1291
1292         actual = stasis_message_to_ami(uut);
1293         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1294
1295         return AST_TEST_PASS;
1296 }
1297
1298 static void noop(void *data, struct stasis_subscription *sub,
1299         struct stasis_message *message)
1300 {
1301         /* no-op */
1302 }
1303
1304 AST_TEST_DEFINE(dtor_order)
1305 {
1306         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1307         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1308
1309         switch (cmd) {
1310         case TEST_INIT:
1311                 info->name = __func__;
1312                 info->category = test_category;
1313                 info->summary = "Test that destruction order doesn't bomb stuff";
1314                 info->description = "Test that destruction order doesn't bomb stuff";
1315                 return AST_TEST_NOT_RUN;
1316         case TEST_EXECUTE:
1317                 break;
1318         }
1319
1320         topic = stasis_topic_create("test-topic");
1321         ast_test_validate(test, NULL != topic);
1322
1323         sub = stasis_subscribe(topic, noop, NULL);
1324         ast_test_validate(test, NULL != sub);
1325
1326         /* With any luck, this won't completely blow everything up */
1327         ao2_cleanup(topic);
1328         stasis_unsubscribe(sub);
1329
1330         /* These refs were cleaned up manually */
1331         topic = NULL;
1332         sub = NULL;
1333
1334         return AST_TEST_PASS;
1335 }
1336
1337 static const char *noop_get_id(struct stasis_message *message)
1338 {
1339         return NULL;
1340 }
1341
1342 AST_TEST_DEFINE(caching_dtor_order)
1343 {
1344         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1345         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1346         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
1347                 stasis_caching_unsubscribe);
1348         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1349
1350         switch (cmd) {
1351         case TEST_INIT:
1352                 info->name = __func__;
1353                 info->category = test_category;
1354                 info->summary = "Test that destruction order doesn't bomb stuff";
1355                 info->description = "Test that destruction order doesn't bomb stuff";
1356                 return AST_TEST_NOT_RUN;
1357         case TEST_EXECUTE:
1358                 break;
1359         }
1360
1361         cache = stasis_cache_create(noop_get_id);
1362         ast_test_validate(test, NULL != cache);
1363
1364         topic = stasis_topic_create("test-topic");
1365         ast_test_validate(test, NULL != topic);
1366
1367         caching_topic = stasis_caching_topic_create(topic, cache);
1368         ast_test_validate(test, NULL != caching_topic);
1369
1370         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
1371                 NULL);
1372         ast_test_validate(test, NULL != sub);
1373
1374         /* With any luck, this won't completely blow everything up */
1375         ao2_cleanup(cache);
1376         ao2_cleanup(topic);
1377         stasis_caching_unsubscribe(caching_topic);
1378         stasis_unsubscribe(sub);
1379
1380         /* These refs were cleaned up manually */
1381         cache = NULL;
1382         topic = NULL;
1383         caching_topic = NULL;
1384         sub = NULL;
1385
1386         return AST_TEST_PASS;
1387 }
1388
1389 static int unload_module(void)
1390 {
1391         AST_TEST_UNREGISTER(message_type);
1392         AST_TEST_UNREGISTER(message);
1393         AST_TEST_UNREGISTER(subscription_messages);
1394         AST_TEST_UNREGISTER(publish);
1395         AST_TEST_UNREGISTER(publish_sync);
1396         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1397         AST_TEST_UNREGISTER(forward);
1398         AST_TEST_UNREGISTER(cache_filter);
1399         AST_TEST_UNREGISTER(cache);
1400         AST_TEST_UNREGISTER(cache_dump);
1401         AST_TEST_UNREGISTER(router);
1402         AST_TEST_UNREGISTER(router_cache_updates);
1403         AST_TEST_UNREGISTER(interleaving);
1404         AST_TEST_UNREGISTER(no_to_json);
1405         AST_TEST_UNREGISTER(to_json);
1406         AST_TEST_UNREGISTER(no_to_ami);
1407         AST_TEST_UNREGISTER(to_ami);
1408         AST_TEST_UNREGISTER(dtor_order);
1409         AST_TEST_UNREGISTER(caching_dtor_order);
1410         return 0;
1411 }
1412
1413 static int load_module(void)
1414 {
1415         AST_TEST_REGISTER(message_type);
1416         AST_TEST_REGISTER(message);
1417         AST_TEST_REGISTER(subscription_messages);
1418         AST_TEST_REGISTER(publish);
1419         AST_TEST_REGISTER(publish_sync);
1420         AST_TEST_REGISTER(unsubscribe_stops_messages);
1421         AST_TEST_REGISTER(forward);
1422         AST_TEST_REGISTER(cache_filter);
1423         AST_TEST_REGISTER(cache);
1424         AST_TEST_REGISTER(cache_dump);
1425         AST_TEST_REGISTER(router);
1426         AST_TEST_REGISTER(router_cache_updates);
1427         AST_TEST_REGISTER(interleaving);
1428         AST_TEST_REGISTER(no_to_json);
1429         AST_TEST_REGISTER(to_json);
1430         AST_TEST_REGISTER(no_to_ami);
1431         AST_TEST_REGISTER(to_ami);
1432         AST_TEST_REGISTER(dtor_order);
1433         AST_TEST_REGISTER(caching_dtor_order);
1434         return AST_MODULE_LOAD_SUCCESS;
1435 }
1436
1437 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1438                 .load = load_module,
1439                 .unload = unload_module
1440         );