test_stasis.c: Misc cleanups.
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, NULL == stasis_message_type_create(NULL, NULL));
88         uut = stasis_message_type_create("SomeMessage", NULL);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102
103         switch (cmd) {
104         case TEST_INIT:
105                 info->name = __func__;
106                 info->category = test_category;
107                 info->summary = "Test basic message functions";
108                 info->description = "Test basic message functions";
109                 return AST_TEST_NOT_RUN;
110         case TEST_EXECUTE:
111                 break;
112         }
113
114
115         type = stasis_message_type_create("SomeMessage", NULL);
116
117         ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
118         ast_test_validate(test, NULL == stasis_message_create(type, NULL));
119
120         data = ao2_alloc(strlen(expected) + 1, NULL);
121         strcpy(data, expected);
122         expected_timestamp = ast_tvnow();
123         uut = stasis_message_create(type, data);
124
125         ast_test_validate(test, NULL != uut);
126         ast_test_validate(test, type == stasis_message_type(uut));
127         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
128         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
129
130         time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
131         /* 10ms is certainly long enough for the two calls to complete */
132         ast_test_validate(test, time_diff.tv_sec == 0);
133         ast_test_validate(test, time_diff.tv_usec < 10000);
134
135         ao2_ref(uut, -1);
136         uut = NULL;
137         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
138
139         return AST_TEST_PASS;
140 }
141
142 struct consumer {
143         ast_cond_t out;
144         struct stasis_message **messages_rxed;
145         size_t messages_rxed_len;
146         int ignore_subscriptions;
147         int complete;
148 };
149
150 static void consumer_dtor(void *obj)
151 {
152         struct consumer *consumer = obj;
153
154         ast_cond_destroy(&consumer->out);
155
156         while (consumer->messages_rxed_len > 0) {
157                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
158         }
159         ast_free(consumer->messages_rxed);
160         consumer->messages_rxed = NULL;
161 }
162
163 static struct consumer *consumer_create(int ignore_subscriptions)
164 {
165         struct consumer *consumer;
166
167         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
168         if (!consumer) {
169                 return NULL;
170         }
171
172         consumer->ignore_subscriptions = ignore_subscriptions;
173         consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
174         if (!consumer->messages_rxed) {
175                 ao2_cleanup(consumer);
176                 return NULL;
177         }
178
179         ast_cond_init(&consumer->out, NULL);
180
181         return consumer;
182 }
183
184 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
185 {
186         struct consumer *consumer = data;
187         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
188         SCOPED_AO2LOCK(lock, consumer);
189
190         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
191                 ++consumer->messages_rxed_len;
192                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
193                 ast_assert(consumer->messages_rxed != NULL);
194                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
195                 ao2_ref(message, +1);
196         }
197
198         if (stasis_subscription_final_message(sub, message)) {
199                 consumer->complete = 1;
200                 consumer_needs_cleanup = consumer;
201         }
202
203         ast_cond_signal(&consumer->out);
204 }
205
206 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
207 {
208         struct consumer *consumer = data;
209         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
210         SCOPED_AO2LOCK(lock, consumer);
211
212         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
213                 ++consumer->messages_rxed_len;
214                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
215                 ast_assert(consumer->messages_rxed != NULL);
216                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
217                 ao2_ref(message, +1);
218         }
219
220         if (stasis_subscription_final_message(sub, message)) {
221                 consumer->complete = 1;
222                 consumer_needs_cleanup = consumer;
223         }
224 }
225
226 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
227 {
228         struct timeval start = ast_tvnow();
229         struct timespec end = {
230                 .tv_sec = start.tv_sec + 30,
231                 .tv_nsec = start.tv_usec * 1000
232         };
233
234         SCOPED_AO2LOCK(lock, consumer);
235
236         while (consumer->messages_rxed_len < expected_len) {
237                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
238
239                 if (r == ETIMEDOUT) {
240                         break;
241                 }
242                 ast_assert(r == 0); /* Not expecting any othet types of errors */
243         }
244         return consumer->messages_rxed_len;
245 }
246
247 static int consumer_wait_for_completion(struct consumer *consumer)
248 {
249         struct timeval start = ast_tvnow();
250         struct timespec end = {
251                 .tv_sec = start.tv_sec + 3,
252                 .tv_nsec = start.tv_usec * 1000
253         };
254
255         SCOPED_AO2LOCK(lock, consumer);
256
257         while (!consumer->complete) {
258                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
259
260                 if (r == ETIMEDOUT) {
261                         break;
262                 }
263                 ast_assert(r == 0); /* Not expecting any othet types of errors */
264         }
265         return consumer->complete;
266 }
267
268 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
269 {
270         struct timeval start = ast_tvnow();
271         struct timeval diff = {
272                 .tv_sec = 0,
273                 .tv_usec = 100000 /* wait for 100ms */
274         };
275         struct timeval end_tv = ast_tvadd(start, diff);
276         struct timespec end = {
277                 .tv_sec = end_tv.tv_sec,
278                 .tv_nsec = end_tv.tv_usec * 1000
279         };
280
281         SCOPED_AO2LOCK(lock, consumer);
282
283         while (consumer->messages_rxed_len == expected_len) {
284                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
285
286                 if (r == ETIMEDOUT) {
287                         break;
288                 }
289                 ast_assert(r == 0); /* Not expecting any othet types of errors */
290         }
291         return consumer->messages_rxed_len;
292 }
293
294 AST_TEST_DEFINE(subscription_messages)
295 {
296         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
297         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
298         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
299         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
300         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
301         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
302         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
303         int complete;
304         struct stasis_subscription_change *change;
305
306         switch (cmd) {
307         case TEST_INIT:
308                 info->name = __func__;
309                 info->category = test_category;
310                 info->summary = "Test subscribe/unsubscribe messages";
311                 info->description = "Test subscribe/unsubscribe messages";
312                 return AST_TEST_NOT_RUN;
313         case TEST_EXECUTE:
314                 break;
315         }
316
317         topic = stasis_topic_create("TestTopic");
318         ast_test_validate(test, NULL != topic);
319
320         consumer = consumer_create(0);
321         ast_test_validate(test, NULL != consumer);
322
323         uut = stasis_subscribe(topic, consumer_exec, consumer);
324         ast_test_validate(test, NULL != uut);
325         ao2_ref(consumer, +1);
326         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
327
328         uut = stasis_unsubscribe(uut);
329         complete = consumer_wait_for_completion(consumer);
330         ast_test_validate(test, 1 == complete);
331
332         ast_test_validate(test, 2 == consumer->messages_rxed_len);
333         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
334         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
335
336         change = stasis_message_data(consumer->messages_rxed[0]);
337         ast_test_validate(test, topic == change->topic);
338         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
339         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
340
341         change = stasis_message_data(consumer->messages_rxed[1]);
342         ast_test_validate(test, topic == change->topic);
343         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
344         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
345
346         return AST_TEST_PASS;
347 }
348
349 AST_TEST_DEFINE(publish)
350 {
351         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
352         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
353         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
354         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
355         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
356         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
357         int actual_len;
358         const char *actual;
359
360         switch (cmd) {
361         case TEST_INIT:
362                 info->name = __func__;
363                 info->category = test_category;
364                 info->summary = "Test publishing";
365                 info->description = "Test publishing";
366                 return AST_TEST_NOT_RUN;
367         case TEST_EXECUTE:
368                 break;
369         }
370
371         topic = stasis_topic_create("TestTopic");
372         ast_test_validate(test, NULL != topic);
373
374         consumer = consumer_create(1);
375         ast_test_validate(test, NULL != consumer);
376
377         uut = stasis_subscribe(topic, consumer_exec, consumer);
378         ast_test_validate(test, NULL != uut);
379         ao2_ref(consumer, +1);
380
381         test_data = ao2_alloc(1, NULL);
382         ast_test_validate(test, NULL != test_data);
383         test_message_type = stasis_message_type_create("TestMessage", NULL);
384         test_message = stasis_message_create(test_message_type, test_data);
385
386         stasis_publish(topic, test_message);
387
388         actual_len = consumer_wait_for(consumer, 1);
389         ast_test_validate(test, 1 == actual_len);
390         actual = stasis_message_data(consumer->messages_rxed[0]);
391         ast_test_validate(test, test_data == actual);
392
393         return AST_TEST_PASS;
394 }
395
396 AST_TEST_DEFINE(publish_sync)
397 {
398         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
399         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
400         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
401         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
402         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
403         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
404         int actual_len;
405         const char *actual;
406
407         switch (cmd) {
408         case TEST_INIT:
409                 info->name = __func__;
410                 info->category = test_category;
411                 info->summary = "Test synchronous publishing";
412                 info->description = "Test synchronous publishing";
413                 return AST_TEST_NOT_RUN;
414         case TEST_EXECUTE:
415                 break;
416         }
417
418         topic = stasis_topic_create("TestTopic");
419         ast_test_validate(test, NULL != topic);
420
421         consumer = consumer_create(1);
422         ast_test_validate(test, NULL != consumer);
423
424         uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
425         ast_test_validate(test, NULL != uut);
426         ao2_ref(consumer, +1);
427
428         test_data = ao2_alloc(1, NULL);
429         ast_test_validate(test, NULL != test_data);
430         test_message_type = stasis_message_type_create("TestMessage", NULL);
431         test_message = stasis_message_create(test_message_type, test_data);
432
433         stasis_publish_sync(uut, test_message);
434
435         actual_len = consumer->messages_rxed_len;
436         ast_test_validate(test, 1 == actual_len);
437         actual = stasis_message_data(consumer->messages_rxed[0]);
438         ast_test_validate(test, test_data == actual);
439
440         return AST_TEST_PASS;
441 }
442
443 AST_TEST_DEFINE(unsubscribe_stops_messages)
444 {
445         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
446         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
447         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
448         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
449         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
450         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
451         int actual_len;
452
453         switch (cmd) {
454         case TEST_INIT:
455                 info->name = __func__;
456                 info->category = test_category;
457                 info->summary = "Test simple subscriptions";
458                 info->description = "Test simple subscriptions";
459                 return AST_TEST_NOT_RUN;
460         case TEST_EXECUTE:
461                 break;
462         }
463
464         topic = stasis_topic_create("TestTopic");
465         ast_test_validate(test, NULL != topic);
466
467         consumer = consumer_create(1);
468         ast_test_validate(test, NULL != consumer);
469
470         uut = stasis_subscribe(topic, consumer_exec, consumer);
471         ast_test_validate(test, NULL != uut);
472         ao2_ref(consumer, +1);
473
474         uut = stasis_unsubscribe(uut);
475
476         test_data = ao2_alloc(1, NULL);
477         ast_test_validate(test, NULL != test_data);
478         test_message_type = stasis_message_type_create("TestMessage", NULL);
479         test_message = stasis_message_create(test_message_type, test_data);
480
481         stasis_publish(topic, test_message);
482
483         actual_len = consumer_should_stay(consumer, 0);
484         ast_test_validate(test, 0 == actual_len);
485
486         return AST_TEST_PASS;
487 }
488
489 AST_TEST_DEFINE(forward)
490 {
491         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
492         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
493
494         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
495         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
496
497         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
498         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
499         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
500
501         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
502         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
503         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
504         int actual_len;
505
506         switch (cmd) {
507         case TEST_INIT:
508                 info->name = __func__;
509                 info->category = test_category;
510                 info->summary = "Test sending events to a parent topic";
511                 info->description = "Test sending events to a parent topic.\n"
512                         "This test creates three topics (one parent, two children)\n"
513                         "and publishes a message to one child, and verifies it's\n"
514                         "only seen by that child and the parent";
515                 return AST_TEST_NOT_RUN;
516         case TEST_EXECUTE:
517                 break;
518         }
519
520         parent_topic = stasis_topic_create("ParentTestTopic");
521         ast_test_validate(test, NULL != parent_topic);
522         topic = stasis_topic_create("TestTopic");
523         ast_test_validate(test, NULL != topic);
524
525         forward_sub = stasis_forward_all(topic, parent_topic);
526         ast_test_validate(test, NULL != forward_sub);
527
528         parent_consumer = consumer_create(1);
529         ast_test_validate(test, NULL != parent_consumer);
530         consumer = consumer_create(1);
531         ast_test_validate(test, NULL != consumer);
532
533         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
534         ast_test_validate(test, NULL != parent_sub);
535         ao2_ref(parent_consumer, +1);
536         sub = stasis_subscribe(topic, consumer_exec, consumer);
537         ast_test_validate(test, NULL != sub);
538         ao2_ref(consumer, +1);
539
540         test_data = ao2_alloc(1, NULL);
541         ast_test_validate(test, NULL != test_data);
542         test_message_type = stasis_message_type_create("TestMessage", NULL);
543         test_message = stasis_message_create(test_message_type, test_data);
544
545         stasis_publish(topic, test_message);
546
547         actual_len = consumer_wait_for(consumer, 1);
548         ast_test_validate(test, 1 == actual_len);
549         actual_len = consumer_wait_for(parent_consumer, 1);
550         ast_test_validate(test, 1 == actual_len);
551
552         return AST_TEST_PASS;
553 }
554
555 AST_TEST_DEFINE(interleaving)
556 {
557         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
558         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
559         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
560
561         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
562
563         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
564
565         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
566         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
567         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
568
569         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
570         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
571         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
572
573         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
574
575         int actual_len;
576
577         switch (cmd) {
578         case TEST_INIT:
579                 info->name = __func__;
580                 info->category = test_category;
581                 info->summary = "Test sending interleaved events to a parent topic";
582                 info->description = "Test sending events to a parent topic.\n"
583                         "This test creates three topics (one parent, two children)\n"
584                         "and publishes messages alternately between the children.\n"
585                         "It verifies that the messages are received in the expected\n"
586                         "order.";
587                 return AST_TEST_NOT_RUN;
588         case TEST_EXECUTE:
589                 break;
590         }
591
592         test_message_type = stasis_message_type_create("test", NULL);
593         ast_test_validate(test, NULL != test_message_type);
594
595         test_data = ao2_alloc(1, NULL);
596         ast_test_validate(test, NULL != test_data);
597
598         test_message1 = stasis_message_create(test_message_type, test_data);
599         ast_test_validate(test, NULL != test_message1);
600         test_message2 = stasis_message_create(test_message_type, test_data);
601         ast_test_validate(test, NULL != test_message2);
602         test_message3 = stasis_message_create(test_message_type, test_data);
603         ast_test_validate(test, NULL != test_message3);
604
605         parent_topic = stasis_topic_create("ParentTestTopic");
606         ast_test_validate(test, NULL != parent_topic);
607         topic1 = stasis_topic_create("Topic1");
608         ast_test_validate(test, NULL != topic1);
609         topic2 = stasis_topic_create("Topic2");
610         ast_test_validate(test, NULL != topic2);
611
612         forward_sub1 = stasis_forward_all(topic1, parent_topic);
613         ast_test_validate(test, NULL != forward_sub1);
614         forward_sub2 = stasis_forward_all(topic2, parent_topic);
615         ast_test_validate(test, NULL != forward_sub2);
616
617         consumer = consumer_create(1);
618         ast_test_validate(test, NULL != consumer);
619
620         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
621         ast_test_validate(test, NULL != sub);
622         ao2_ref(consumer, +1);
623
624         stasis_publish(topic1, test_message1);
625         stasis_publish(topic2, test_message2);
626         stasis_publish(topic1, test_message3);
627
628         actual_len = consumer_wait_for(consumer, 3);
629         ast_test_validate(test, 3 == actual_len);
630
631         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
632         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
633         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
634
635         return AST_TEST_PASS;
636 }
637
638 struct cache_test_data {
639         char *id;
640         char *value;
641 };
642
643 static void cache_test_data_dtor(void *obj)
644 {
645         struct cache_test_data *data = obj;
646         ast_free(data->id);
647         ast_free(data->value);
648 }
649
650 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
651 {
652         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
653
654         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
655         if (data == NULL) {
656                 return NULL;
657         }
658
659         ast_assert(name != NULL);
660         ast_assert(value != NULL);
661
662         data->id = ast_strdup(name);
663         data->value = ast_strdup(value);
664         if (!data->id || !data->value) {
665                 return NULL;
666         }
667
668         return stasis_message_create(type, data);
669 }
670
671 static const char *cache_test_data_id(struct stasis_message *message)
672 {
673         struct cache_test_data *cachable = stasis_message_data(message);
674
675         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
676                 return NULL;
677         }
678         return cachable->id;
679 }
680
681 AST_TEST_DEFINE(cache_filter)
682 {
683         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
684         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
685         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
686         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
687         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
688         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
689         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
690         int actual_len;
691
692         switch (cmd) {
693         case TEST_INIT:
694                 info->name = __func__;
695                 info->category = test_category;
696                 info->summary = "Test caching topics only forward cache_update messages.";
697                 info->description = "Test caching topics only forward cache_update messages.";
698                 return AST_TEST_NOT_RUN;
699         case TEST_EXECUTE:
700                 break;
701         }
702
703         non_cache_type = stasis_message_type_create("NonCacheable", NULL);
704         ast_test_validate(test, NULL != non_cache_type);
705         topic = stasis_topic_create("SomeTopic");
706         ast_test_validate(test, NULL != topic);
707         cache = stasis_cache_create(cache_test_data_id);
708         ast_test_validate(test, NULL != cache);
709         caching_topic = stasis_caching_topic_create(topic, cache);
710         ast_test_validate(test, NULL != caching_topic);
711         consumer = consumer_create(1);
712         ast_test_validate(test, NULL != consumer);
713         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
714         ast_test_validate(test, NULL != sub);
715         ao2_ref(consumer, +1);
716
717         test_message = cache_test_message_create(non_cache_type, "1", "1");
718         ast_test_validate(test, NULL != test_message);
719
720         stasis_publish(topic, test_message);
721
722         actual_len = consumer_should_stay(consumer, 0);
723         ast_test_validate(test, 0 == actual_len);
724
725         return AST_TEST_PASS;
726 }
727
728 AST_TEST_DEFINE(cache)
729 {
730         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
731         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
732         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
733         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
734         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
735         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
736         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
737         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
738         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
739         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
740         int actual_len;
741         struct stasis_cache_update *actual_update;
742
743         switch (cmd) {
744         case TEST_INIT:
745                 info->name = __func__;
746                 info->category = test_category;
747                 info->summary = "Test passing messages through cache topic unscathed.";
748                 info->description = "Test passing messages through cache topic unscathed.";
749                 return AST_TEST_NOT_RUN;
750         case TEST_EXECUTE:
751                 break;
752         }
753
754         cache_type = stasis_message_type_create("Cacheable", NULL);
755         ast_test_validate(test, NULL != cache_type);
756         topic = stasis_topic_create("SomeTopic");
757         ast_test_validate(test, NULL != topic);
758         cache = stasis_cache_create(cache_test_data_id);
759         ast_test_validate(test, NULL != cache);
760         caching_topic = stasis_caching_topic_create(topic, cache);
761         ast_test_validate(test, NULL != caching_topic);
762         consumer = consumer_create(1);
763         ast_test_validate(test, NULL != consumer);
764         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
765         ast_test_validate(test, NULL != sub);
766         ao2_ref(consumer, +1);
767
768         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
769         ast_test_validate(test, NULL != test_message1_1);
770         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
771         ast_test_validate(test, NULL != test_message2_1);
772
773         /* Post a couple of snapshots */
774         stasis_publish(topic, test_message1_1);
775         stasis_publish(topic, test_message2_1);
776         actual_len = consumer_wait_for(consumer, 2);
777         ast_test_validate(test, 2 == actual_len);
778
779         /* Check for new snapshot messages */
780         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
781         actual_update = stasis_message_data(consumer->messages_rxed[0]);
782         ast_test_validate(test, NULL == actual_update->old_snapshot);
783         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
784         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
785         /* stasis_cache_get returned a ref, so unref test_message1_1 */
786         ao2_ref(test_message1_1, -1);
787
788         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
789         actual_update = stasis_message_data(consumer->messages_rxed[1]);
790         ast_test_validate(test, NULL == actual_update->old_snapshot);
791         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
792         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
793         /* stasis_cache_get returned a ref, so unref test_message2_1 */
794         ao2_ref(test_message2_1, -1);
795
796         /* Update snapshot 2 */
797         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
798         ast_test_validate(test, NULL != test_message2_2);
799         stasis_publish(topic, test_message2_2);
800
801         actual_len = consumer_wait_for(consumer, 3);
802         ast_test_validate(test, 3 == actual_len);
803
804         actual_update = stasis_message_data(consumer->messages_rxed[2]);
805         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
806         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
807         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
808         /* stasis_cache_get returned a ref, so unref test_message2_2 */
809         ao2_ref(test_message2_2, -1);
810
811         /* Clear snapshot 1 */
812         test_message1_clear = stasis_cache_clear_create(test_message1_1);
813         ast_test_validate(test, NULL != test_message1_clear);
814         stasis_publish(topic, test_message1_clear);
815
816         actual_len = consumer_wait_for(consumer, 4);
817         ast_test_validate(test, 4 == actual_len);
818
819         actual_update = stasis_message_data(consumer->messages_rxed[3]);
820         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
821         ast_test_validate(test, NULL == actual_update->new_snapshot);
822         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
823
824         return AST_TEST_PASS;
825 }
826
827 AST_TEST_DEFINE(cache_dump)
828 {
829         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
830         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
831         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
832         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
833         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
834         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
835         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
836         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
837         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
838         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
839         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
840         int actual_len;
841         struct ao2_iterator i;
842         void *obj;
843
844         switch (cmd) {
845         case TEST_INIT:
846                 info->name = __func__;
847                 info->category = test_category;
848                 info->summary = "Test passing messages through cache topic unscathed.";
849                 info->description = "Test passing messages through cache topic unscathed.";
850                 return AST_TEST_NOT_RUN;
851         case TEST_EXECUTE:
852                 break;
853         }
854
855         cache_type = stasis_message_type_create("Cacheable", NULL);
856         ast_test_validate(test, NULL != cache_type);
857         topic = stasis_topic_create("SomeTopic");
858         ast_test_validate(test, NULL != topic);
859         cache = stasis_cache_create(cache_test_data_id);
860         ast_test_validate(test, NULL != cache);
861         caching_topic = stasis_caching_topic_create(topic, cache);
862         ast_test_validate(test, NULL != caching_topic);
863         consumer = consumer_create(1);
864         ast_test_validate(test, NULL != consumer);
865         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
866         ast_test_validate(test, NULL != sub);
867         ao2_ref(consumer, +1);
868
869         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
870         ast_test_validate(test, NULL != test_message1_1);
871         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
872         ast_test_validate(test, NULL != test_message2_1);
873
874         /* Post a couple of snapshots */
875         stasis_publish(topic, test_message1_1);
876         stasis_publish(topic, test_message2_1);
877         actual_len = consumer_wait_for(consumer, 2);
878         ast_test_validate(test, 2 == actual_len);
879
880         /* Check the cache */
881         ao2_cleanup(cache_dump);
882         cache_dump = stasis_cache_dump(cache, NULL);
883         ast_test_validate(test, NULL != cache_dump);
884         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
885         i = ao2_iterator_init(cache_dump, 0);
886         while ((obj = ao2_iterator_next(&i))) {
887                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
888                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
889         }
890         ao2_iterator_destroy(&i);
891
892         /* Update snapshot 2 */
893         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
894         ast_test_validate(test, NULL != test_message2_2);
895         stasis_publish(topic, test_message2_2);
896
897         actual_len = consumer_wait_for(consumer, 3);
898         ast_test_validate(test, 3 == actual_len);
899
900         /* Check the cache */
901         ao2_cleanup(cache_dump);
902         cache_dump = stasis_cache_dump(cache, NULL);
903         ast_test_validate(test, NULL != cache_dump);
904         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
905         i = ao2_iterator_init(cache_dump, 0);
906         while ((obj = ao2_iterator_next(&i))) {
907                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
908                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
909         }
910         ao2_iterator_destroy(&i);
911
912         /* Clear snapshot 1 */
913         test_message1_clear = stasis_cache_clear_create(test_message1_1);
914         ast_test_validate(test, NULL != test_message1_clear);
915         stasis_publish(topic, test_message1_clear);
916
917         actual_len = consumer_wait_for(consumer, 4);
918         ast_test_validate(test, 4 == actual_len);
919
920         /* Check the cache */
921         ao2_cleanup(cache_dump);
922         cache_dump = stasis_cache_dump(cache, NULL);
923         ast_test_validate(test, NULL != cache_dump);
924         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
925         i = ao2_iterator_init(cache_dump, 0);
926         while ((obj = ao2_iterator_next(&i))) {
927                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
928                 ast_test_validate(test, actual_cache_entry == test_message2_2);
929         }
930         ao2_iterator_destroy(&i);
931
932         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
933         ao2_cleanup(cache_dump);
934         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
935         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
936
937         return AST_TEST_PASS;
938 }
939
940 AST_TEST_DEFINE(router)
941 {
942         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
943         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
944         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
945         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
946         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
947         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
948         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
949         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
950         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
951         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
952         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
953         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
954         int actual_len, ret;
955         struct stasis_message *actual;
956
957         switch (cmd) {
958         case TEST_INIT:
959                 info->name = __func__;
960                 info->category = test_category;
961                 info->summary = "Test simple message routing";
962                 info->description = "Test simple message routing";
963                 return AST_TEST_NOT_RUN;
964         case TEST_EXECUTE:
965                 break;
966         }
967
968         topic = stasis_topic_create("TestTopic");
969         ast_test_validate(test, NULL != topic);
970
971         consumer1 = consumer_create(1);
972         ast_test_validate(test, NULL != consumer1);
973         consumer2 = consumer_create(1);
974         ast_test_validate(test, NULL != consumer2);
975         consumer3 = consumer_create(1);
976         ast_test_validate(test, NULL != consumer3);
977
978         test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
979         ast_test_validate(test, NULL != test_message_type1);
980         test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
981         ast_test_validate(test, NULL != test_message_type2);
982         test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
983         ast_test_validate(test, NULL != test_message_type3);
984
985         uut = stasis_message_router_create(topic);
986         ast_test_validate(test, NULL != uut);
987
988         ret = stasis_message_router_add(
989                 uut, test_message_type1, consumer_exec, consumer1);
990         ast_test_validate(test, 0 == ret);
991         ao2_ref(consumer1, +1);
992         ret = stasis_message_router_add(
993                 uut, test_message_type2, consumer_exec, consumer2);
994         ast_test_validate(test, 0 == ret);
995         ao2_ref(consumer2, +1);
996         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
997         ast_test_validate(test, 0 == ret);
998         ao2_ref(consumer3, +1);
999
1000         test_data = ao2_alloc(1, NULL);
1001         ast_test_validate(test, NULL != test_data);
1002         test_message1 = stasis_message_create(test_message_type1, test_data);
1003         ast_test_validate(test, NULL != test_message1);
1004         test_message2 = stasis_message_create(test_message_type2, test_data);
1005         ast_test_validate(test, NULL != test_message2);
1006         test_message3 = stasis_message_create(test_message_type3, test_data);
1007         ast_test_validate(test, NULL != test_message3);
1008
1009         stasis_publish(topic, test_message1);
1010         stasis_publish(topic, test_message2);
1011         stasis_publish(topic, test_message3);
1012
1013         actual_len = consumer_wait_for(consumer1, 1);
1014         ast_test_validate(test, 1 == actual_len);
1015         actual_len = consumer_wait_for(consumer2, 1);
1016         ast_test_validate(test, 1 == actual_len);
1017         actual_len = consumer_wait_for(consumer3, 1);
1018         ast_test_validate(test, 1 == actual_len);
1019
1020         actual = consumer1->messages_rxed[0];
1021         ast_test_validate(test, test_message1 == actual);
1022
1023         actual = consumer2->messages_rxed[0];
1024         ast_test_validate(test, test_message2 == actual);
1025
1026         actual = consumer3->messages_rxed[0];
1027         ast_test_validate(test, test_message3 == actual);
1028
1029         /* consumer1 and consumer2 do not get the final message. */
1030         ao2_cleanup(consumer1);
1031         ao2_cleanup(consumer2);
1032
1033         return AST_TEST_PASS;
1034 }
1035
1036 static const char *cache_simple(struct stasis_message *message)
1037 {
1038         const char *type_name =
1039                 stasis_message_type_name(stasis_message_type(message));
1040         if (!ast_begins_with(type_name, "Cache")) {
1041                 return NULL;
1042         }
1043
1044         return "cached";
1045 }
1046
1047 AST_TEST_DEFINE(router_cache_updates)
1048 {
1049         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1050         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1051         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1052         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1053         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1054         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1055         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1056         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1057         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1058         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1059         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1060         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1061         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1062         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1063         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1064         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1065         struct stasis_cache_update *update;
1066         int actual_len, ret;
1067         struct stasis_message *actual;
1068
1069         switch (cmd) {
1070         case TEST_INIT:
1071                 info->name = __func__;
1072                 info->category = test_category;
1073                 info->summary = "Test special handling cache_update messages";
1074                 info->description = "Test special handling cache_update messages";
1075                 return AST_TEST_NOT_RUN;
1076         case TEST_EXECUTE:
1077                 break;
1078         }
1079
1080         topic = stasis_topic_create("TestTopic");
1081         ast_test_validate(test, NULL != topic);
1082
1083         cache = stasis_cache_create(cache_simple);
1084         ast_test_validate(test, NULL != cache);
1085         caching_topic = stasis_caching_topic_create(topic, cache);
1086         ast_test_validate(test, NULL != caching_topic);
1087
1088         consumer1 = consumer_create(1);
1089         ast_test_validate(test, NULL != consumer1);
1090         consumer2 = consumer_create(1);
1091         ast_test_validate(test, NULL != consumer2);
1092         consumer3 = consumer_create(1);
1093         ast_test_validate(test, NULL != consumer3);
1094
1095         test_message_type1 = stasis_message_type_create("Cache1", NULL);
1096         ast_test_validate(test, NULL != test_message_type1);
1097         test_message_type2 = stasis_message_type_create("Cache2", NULL);
1098         ast_test_validate(test, NULL != test_message_type2);
1099         test_message_type3 = stasis_message_type_create("NonCache", NULL);
1100         ast_test_validate(test, NULL != test_message_type3);
1101
1102         uut = stasis_message_router_create(
1103                 stasis_caching_get_topic(caching_topic));
1104         ast_test_validate(test, NULL != uut);
1105
1106         ret = stasis_message_router_add_cache_update(
1107                 uut, test_message_type1, consumer_exec, consumer1);
1108         ast_test_validate(test, 0 == ret);
1109         ao2_ref(consumer1, +1);
1110         ret = stasis_message_router_add(
1111                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1112         ast_test_validate(test, 0 == ret);
1113         ao2_ref(consumer2, +1);
1114         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1115         ast_test_validate(test, 0 == ret);
1116         ao2_ref(consumer3, +1);
1117
1118         test_data = ao2_alloc(1, NULL);
1119         ast_test_validate(test, NULL != test_data);
1120         test_message1 = stasis_message_create(test_message_type1, test_data);
1121         ast_test_validate(test, NULL != test_message1);
1122         test_message2 = stasis_message_create(test_message_type2, test_data);
1123         ast_test_validate(test, NULL != test_message2);
1124         test_message3 = stasis_message_create(test_message_type3, test_data);
1125         ast_test_validate(test, NULL != test_message3);
1126
1127         stasis_publish(topic, test_message1);
1128         stasis_publish(topic, test_message2);
1129         stasis_publish(topic, test_message3);
1130
1131         actual_len = consumer_wait_for(consumer1, 1);
1132         ast_test_validate(test, 1 == actual_len);
1133         actual_len = consumer_wait_for(consumer2, 1);
1134         ast_test_validate(test, 1 == actual_len);
1135         /* Uncacheable message should not be passed through */
1136         actual_len = consumer_should_stay(consumer3, 0);
1137         ast_test_validate(test, 0 == actual_len);
1138
1139         actual = consumer1->messages_rxed[0];
1140         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1141         update = stasis_message_data(actual);
1142         ast_test_validate(test, test_message_type1 == update->type);
1143         ast_test_validate(test, test_message1 == update->new_snapshot);
1144
1145         actual = consumer2->messages_rxed[0];
1146         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1147         update = stasis_message_data(actual);
1148         ast_test_validate(test, test_message_type2 == update->type);
1149         ast_test_validate(test, test_message2 == update->new_snapshot);
1150
1151         /* consumer1 and consumer2 do not get the final message. */
1152         ao2_cleanup(consumer1);
1153         ao2_cleanup(consumer2);
1154
1155         return AST_TEST_PASS;
1156 }
1157
1158 AST_TEST_DEFINE(no_to_json)
1159 {
1160         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1161         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1162         RAII_VAR(char *, data, NULL, ao2_cleanup);
1163         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1164         char *expected = "SomeData";
1165
1166         switch (cmd) {
1167         case TEST_INIT:
1168                 info->name = __func__;
1169                 info->category = test_category;
1170                 info->summary = "Test message to_json function";
1171                 info->description = "Test message to_json function";
1172                 return AST_TEST_NOT_RUN;
1173         case TEST_EXECUTE:
1174                 break;
1175         }
1176
1177         /* Test NULL */
1178         actual = stasis_message_to_json(NULL, NULL);
1179         ast_test_validate(test, NULL == actual);
1180
1181         /* Test message with NULL to_json function */
1182         type = stasis_message_type_create("SomeMessage", NULL);
1183
1184         data = ao2_alloc(strlen(expected) + 1, NULL);
1185         strcpy(data, expected);
1186         uut = stasis_message_create(type, data);
1187         ast_test_validate(test, NULL != uut);
1188
1189         actual = stasis_message_to_json(uut, NULL);
1190         ast_test_validate(test, NULL == actual);
1191
1192         return AST_TEST_PASS;
1193 }
1194
1195 AST_TEST_DEFINE(to_json)
1196 {
1197         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1198         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1199         RAII_VAR(char *, data, NULL, ao2_cleanup);
1200         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1201         const char *expected_text = "SomeData";
1202         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1203
1204         switch (cmd) {
1205         case TEST_INIT:
1206                 info->name = __func__;
1207                 info->category = test_category;
1208                 info->summary = "Test message to_json function when NULL";
1209                 info->description = "Test message to_json function when NULL";
1210                 return AST_TEST_NOT_RUN;
1211         case TEST_EXECUTE:
1212                 break;
1213         }
1214
1215         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1216
1217         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1218         strcpy(data, expected_text);
1219         uut = stasis_message_create(type, data);
1220         ast_test_validate(test, NULL != uut);
1221
1222         expected = ast_json_string_create(expected_text);
1223         actual = stasis_message_to_json(uut, NULL);
1224         ast_test_validate(test, ast_json_equal(expected, actual));
1225
1226         return AST_TEST_PASS;
1227 }
1228
1229 AST_TEST_DEFINE(no_to_ami)
1230 {
1231         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1232         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1233         RAII_VAR(char *, data, NULL, ao2_cleanup);
1234         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1235         char *expected = "SomeData";
1236
1237         switch (cmd) {
1238         case TEST_INIT:
1239                 info->name = __func__;
1240                 info->category = test_category;
1241                 info->summary = "Test message to_ami function when NULL";
1242                 info->description = "Test message to_ami function when NULL";
1243                 return AST_TEST_NOT_RUN;
1244         case TEST_EXECUTE:
1245                 break;
1246         }
1247
1248         /* Test NULL */
1249         actual = stasis_message_to_ami(NULL);
1250         ast_test_validate(test, NULL == actual);
1251
1252         /* Test message with NULL to_ami function */
1253         type = stasis_message_type_create("SomeMessage", NULL);
1254
1255         data = ao2_alloc(strlen(expected) + 1, NULL);
1256         strcpy(data, expected);
1257         uut = stasis_message_create(type, data);
1258         ast_test_validate(test, NULL != uut);
1259
1260         actual = stasis_message_to_ami(uut);
1261         ast_test_validate(test, NULL == actual);
1262
1263         return AST_TEST_PASS;
1264 }
1265
1266 AST_TEST_DEFINE(to_ami)
1267 {
1268         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1269         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1270         RAII_VAR(char *, data, NULL, ao2_cleanup);
1271         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1272         const char *expected_text = "SomeData";
1273         const char *expected = "Message: SomeData\r\n";
1274
1275         switch (cmd) {
1276         case TEST_INIT:
1277                 info->name = __func__;
1278                 info->category = test_category;
1279                 info->summary = "Test message to_ami function";
1280                 info->description = "Test message to_ami function";
1281                 return AST_TEST_NOT_RUN;
1282         case TEST_EXECUTE:
1283                 break;
1284         }
1285
1286         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1287
1288         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1289         strcpy(data, expected_text);
1290         uut = stasis_message_create(type, data);
1291         ast_test_validate(test, NULL != uut);
1292
1293         actual = stasis_message_to_ami(uut);
1294         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1295
1296         return AST_TEST_PASS;
1297 }
1298
1299 static void noop(void *data, struct stasis_subscription *sub,
1300         struct stasis_message *message)
1301 {
1302         /* no-op */
1303 }
1304
1305 AST_TEST_DEFINE(dtor_order)
1306 {
1307         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1308         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1309
1310         switch (cmd) {
1311         case TEST_INIT:
1312                 info->name = __func__;
1313                 info->category = test_category;
1314                 info->summary = "Test that destruction order doesn't bomb stuff";
1315                 info->description = "Test that destruction order doesn't bomb stuff";
1316                 return AST_TEST_NOT_RUN;
1317         case TEST_EXECUTE:
1318                 break;
1319         }
1320
1321         topic = stasis_topic_create("test-topic");
1322         ast_test_validate(test, NULL != topic);
1323
1324         sub = stasis_subscribe(topic, noop, NULL);
1325         ast_test_validate(test, NULL != sub);
1326
1327         /* With any luck, this won't completely blow everything up */
1328         ao2_cleanup(topic);
1329         stasis_unsubscribe(sub);
1330
1331         /* These refs were cleaned up manually */
1332         topic = NULL;
1333         sub = NULL;
1334
1335         return AST_TEST_PASS;
1336 }
1337
1338 static const char *noop_get_id(struct stasis_message *message)
1339 {
1340         return NULL;
1341 }
1342
1343 AST_TEST_DEFINE(caching_dtor_order)
1344 {
1345         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1346         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1347         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
1348                 stasis_caching_unsubscribe);
1349         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1350
1351         switch (cmd) {
1352         case TEST_INIT:
1353                 info->name = __func__;
1354                 info->category = test_category;
1355                 info->summary = "Test that destruction order doesn't bomb stuff";
1356                 info->description = "Test that destruction order doesn't bomb stuff";
1357                 return AST_TEST_NOT_RUN;
1358         case TEST_EXECUTE:
1359                 break;
1360         }
1361
1362         cache = stasis_cache_create(noop_get_id);
1363         ast_test_validate(test, NULL != cache);
1364
1365         topic = stasis_topic_create("test-topic");
1366         ast_test_validate(test, NULL != topic);
1367
1368         caching_topic = stasis_caching_topic_create(topic, cache);
1369         ast_test_validate(test, NULL != caching_topic);
1370
1371         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
1372                 NULL);
1373         ast_test_validate(test, NULL != sub);
1374
1375         /* With any luck, this won't completely blow everything up */
1376         ao2_cleanup(cache);
1377         ao2_cleanup(topic);
1378         stasis_caching_unsubscribe(caching_topic);
1379         stasis_unsubscribe(sub);
1380
1381         /* These refs were cleaned up manually */
1382         cache = NULL;
1383         topic = NULL;
1384         caching_topic = NULL;
1385         sub = NULL;
1386
1387         return AST_TEST_PASS;
1388 }
1389
1390 static int unload_module(void)
1391 {
1392         AST_TEST_UNREGISTER(message_type);
1393         AST_TEST_UNREGISTER(message);
1394         AST_TEST_UNREGISTER(subscription_messages);
1395         AST_TEST_UNREGISTER(publish);
1396         AST_TEST_UNREGISTER(publish_sync);
1397         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1398         AST_TEST_UNREGISTER(forward);
1399         AST_TEST_UNREGISTER(cache_filter);
1400         AST_TEST_UNREGISTER(cache);
1401         AST_TEST_UNREGISTER(cache_dump);
1402         AST_TEST_UNREGISTER(router);
1403         AST_TEST_UNREGISTER(router_cache_updates);
1404         AST_TEST_UNREGISTER(interleaving);
1405         AST_TEST_UNREGISTER(no_to_json);
1406         AST_TEST_UNREGISTER(to_json);
1407         AST_TEST_UNREGISTER(no_to_ami);
1408         AST_TEST_UNREGISTER(to_ami);
1409         AST_TEST_UNREGISTER(dtor_order);
1410         AST_TEST_UNREGISTER(caching_dtor_order);
1411         return 0;
1412 }
1413
1414 static int load_module(void)
1415 {
1416         AST_TEST_REGISTER(message_type);
1417         AST_TEST_REGISTER(message);
1418         AST_TEST_REGISTER(subscription_messages);
1419         AST_TEST_REGISTER(publish);
1420         AST_TEST_REGISTER(publish_sync);
1421         AST_TEST_REGISTER(unsubscribe_stops_messages);
1422         AST_TEST_REGISTER(forward);
1423         AST_TEST_REGISTER(cache_filter);
1424         AST_TEST_REGISTER(cache);
1425         AST_TEST_REGISTER(cache_dump);
1426         AST_TEST_REGISTER(router);
1427         AST_TEST_REGISTER(router_cache_updates);
1428         AST_TEST_REGISTER(interleaving);
1429         AST_TEST_REGISTER(no_to_json);
1430         AST_TEST_REGISTER(to_json);
1431         AST_TEST_REGISTER(no_to_ami);
1432         AST_TEST_REGISTER(to_ami);
1433         AST_TEST_REGISTER(dtor_order);
1434         AST_TEST_REGISTER(caching_dtor_order);
1435         return AST_MODULE_LOAD_SUCCESS;
1436 }
1437
1438 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1439                 .load = load_module,
1440                 .unload = unload_module
1441         );