Stasis: Allow message types to be blocked
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, stasis_message_type_create(NULL, NULL, NULL) == STASIS_MESSAGE_TYPE_ERROR);
88         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
98         RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
99         RAII_VAR(char *, data, NULL, ao2_cleanup);
100         char *expected = "SomeData";
101         struct timeval expected_timestamp;
102         struct timeval time_diff;
103         struct ast_eid foreign_eid;
104
105         switch (cmd) {
106         case TEST_INIT:
107                 info->name = __func__;
108                 info->category = test_category;
109                 info->summary = "Test basic message functions";
110                 info->description = "Test basic message functions";
111                 return AST_TEST_NOT_RUN;
112         case TEST_EXECUTE:
113                 break;
114         }
115
116
117         memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
118
119         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
120
121         ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
122         ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
123
124         data = ao2_alloc(strlen(expected) + 1, NULL);
125         strcpy(data, expected);/* Safe */
126         expected_timestamp = ast_tvnow();
127         uut1 = stasis_message_create_full(type, data, &foreign_eid);
128         uut2 = stasis_message_create_full(type, data, NULL);
129
130         ast_test_validate(test, NULL != uut1);
131         ast_test_validate(test, NULL != uut2);
132         ast_test_validate(test, type == stasis_message_type(uut1));
133         ast_test_validate(test, type == stasis_message_type(uut2));
134         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
135         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
136         ast_test_validate(test, NULL != stasis_message_eid(uut1));
137         ast_test_validate(test, NULL == stasis_message_eid(uut2));
138         ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
139
140         ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
141
142         time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
143         /* 10ms is certainly long enough for the two calls to complete */
144         ast_test_validate(test, time_diff.tv_sec == 0);
145         ast_test_validate(test, time_diff.tv_usec < 10000);
146
147         ao2_ref(uut1, -1);
148         uut1 = NULL;
149         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
150         ao2_ref(uut2, -1);
151         uut2 = NULL;
152         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
153
154         return AST_TEST_PASS;
155 }
156
157 struct consumer {
158         ast_cond_t out;
159         struct stasis_message **messages_rxed;
160         size_t messages_rxed_len;
161         int ignore_subscriptions;
162         int complete;
163 };
164
165 static void consumer_dtor(void *obj)
166 {
167         struct consumer *consumer = obj;
168
169         ast_cond_destroy(&consumer->out);
170
171         while (consumer->messages_rxed_len > 0) {
172                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
173         }
174         ast_free(consumer->messages_rxed);
175         consumer->messages_rxed = NULL;
176 }
177
178 static struct consumer *consumer_create(int ignore_subscriptions)
179 {
180         struct consumer *consumer;
181
182         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
183         if (!consumer) {
184                 return NULL;
185         }
186
187         consumer->ignore_subscriptions = ignore_subscriptions;
188         consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
189         if (!consumer->messages_rxed) {
190                 ao2_cleanup(consumer);
191                 return NULL;
192         }
193
194         ast_cond_init(&consumer->out, NULL);
195
196         return consumer;
197 }
198
199 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
200 {
201         struct consumer *consumer = data;
202         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
203         SCOPED_AO2LOCK(lock, consumer);
204
205         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
206                 ++consumer->messages_rxed_len;
207                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
208                 ast_assert(consumer->messages_rxed != NULL);
209                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
210                 ao2_ref(message, +1);
211         }
212
213         if (stasis_subscription_final_message(sub, message)) {
214                 consumer->complete = 1;
215                 consumer_needs_cleanup = consumer;
216         }
217
218         ast_cond_signal(&consumer->out);
219 }
220
221 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
222 {
223         struct consumer *consumer = data;
224         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
225         SCOPED_AO2LOCK(lock, consumer);
226
227         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
228                 ++consumer->messages_rxed_len;
229                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
230                 ast_assert(consumer->messages_rxed != NULL);
231                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
232                 ao2_ref(message, +1);
233         }
234
235         if (stasis_subscription_final_message(sub, message)) {
236                 consumer->complete = 1;
237                 consumer_needs_cleanup = consumer;
238         }
239 }
240
241 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
242 {
243         struct timeval start = ast_tvnow();
244         struct timespec end = {
245                 .tv_sec = start.tv_sec + 30,
246                 .tv_nsec = start.tv_usec * 1000
247         };
248
249         SCOPED_AO2LOCK(lock, consumer);
250
251         while (consumer->messages_rxed_len < expected_len) {
252                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
253
254                 if (r == ETIMEDOUT) {
255                         break;
256                 }
257                 ast_assert(r == 0); /* Not expecting any othet types of errors */
258         }
259         return consumer->messages_rxed_len;
260 }
261
262 static int consumer_wait_for_completion(struct consumer *consumer)
263 {
264         struct timeval start = ast_tvnow();
265         struct timespec end = {
266                 .tv_sec = start.tv_sec + 3,
267                 .tv_nsec = start.tv_usec * 1000
268         };
269
270         SCOPED_AO2LOCK(lock, consumer);
271
272         while (!consumer->complete) {
273                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
274
275                 if (r == ETIMEDOUT) {
276                         break;
277                 }
278                 ast_assert(r == 0); /* Not expecting any othet types of errors */
279         }
280         return consumer->complete;
281 }
282
283 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
284 {
285         struct timeval start = ast_tvnow();
286         struct timeval diff = {
287                 .tv_sec = 0,
288                 .tv_usec = 100000 /* wait for 100ms */
289         };
290         struct timeval end_tv = ast_tvadd(start, diff);
291         struct timespec end = {
292                 .tv_sec = end_tv.tv_sec,
293                 .tv_nsec = end_tv.tv_usec * 1000
294         };
295
296         SCOPED_AO2LOCK(lock, consumer);
297
298         while (consumer->messages_rxed_len == expected_len) {
299                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
300
301                 if (r == ETIMEDOUT) {
302                         break;
303                 }
304                 ast_assert(r == 0); /* Not expecting any othet types of errors */
305         }
306         return consumer->messages_rxed_len;
307 }
308
309 AST_TEST_DEFINE(subscription_messages)
310 {
311         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
312         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
313         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
314         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
315         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
316         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
317         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
318         int complete;
319         struct stasis_subscription_change *change;
320
321         switch (cmd) {
322         case TEST_INIT:
323                 info->name = __func__;
324                 info->category = test_category;
325                 info->summary = "Test subscribe/unsubscribe messages";
326                 info->description = "Test subscribe/unsubscribe messages";
327                 return AST_TEST_NOT_RUN;
328         case TEST_EXECUTE:
329                 break;
330         }
331
332         topic = stasis_topic_create("TestTopic");
333         ast_test_validate(test, NULL != topic);
334
335         consumer = consumer_create(0);
336         ast_test_validate(test, NULL != consumer);
337
338         uut = stasis_subscribe(topic, consumer_exec, consumer);
339         ast_test_validate(test, NULL != uut);
340         ao2_ref(consumer, +1);
341         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
342
343         uut = stasis_unsubscribe(uut);
344         complete = consumer_wait_for_completion(consumer);
345         ast_test_validate(test, 1 == complete);
346
347         ast_test_validate(test, 2 == consumer->messages_rxed_len);
348         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
349         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
350
351         change = stasis_message_data(consumer->messages_rxed[0]);
352         ast_test_validate(test, topic == change->topic);
353         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
354         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
355
356         change = stasis_message_data(consumer->messages_rxed[1]);
357         ast_test_validate(test, topic == change->topic);
358         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
359         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
360
361         return AST_TEST_PASS;
362 }
363
364 AST_TEST_DEFINE(publish)
365 {
366         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
367         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
368         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
369         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
370         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
371         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
372         int actual_len;
373         const char *actual;
374
375         switch (cmd) {
376         case TEST_INIT:
377                 info->name = __func__;
378                 info->category = test_category;
379                 info->summary = "Test publishing";
380                 info->description = "Test publishing";
381                 return AST_TEST_NOT_RUN;
382         case TEST_EXECUTE:
383                 break;
384         }
385
386         topic = stasis_topic_create("TestTopic");
387         ast_test_validate(test, NULL != topic);
388
389         consumer = consumer_create(1);
390         ast_test_validate(test, NULL != consumer);
391
392         uut = stasis_subscribe(topic, consumer_exec, consumer);
393         ast_test_validate(test, NULL != uut);
394         ao2_ref(consumer, +1);
395
396         test_data = ao2_alloc(1, NULL);
397         ast_test_validate(test, NULL != test_data);
398         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
399         test_message = stasis_message_create(test_message_type, test_data);
400
401         stasis_publish(topic, test_message);
402
403         actual_len = consumer_wait_for(consumer, 1);
404         ast_test_validate(test, 1 == actual_len);
405         actual = stasis_message_data(consumer->messages_rxed[0]);
406         ast_test_validate(test, test_data == actual);
407
408         return AST_TEST_PASS;
409 }
410
411 AST_TEST_DEFINE(publish_sync)
412 {
413         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
414         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
415         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
416         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
417         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
418         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
419         int actual_len;
420         const char *actual;
421
422         switch (cmd) {
423         case TEST_INIT:
424                 info->name = __func__;
425                 info->category = test_category;
426                 info->summary = "Test synchronous publishing";
427                 info->description = "Test synchronous publishing";
428                 return AST_TEST_NOT_RUN;
429         case TEST_EXECUTE:
430                 break;
431         }
432
433         topic = stasis_topic_create("TestTopic");
434         ast_test_validate(test, NULL != topic);
435
436         consumer = consumer_create(1);
437         ast_test_validate(test, NULL != consumer);
438
439         uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
440         ast_test_validate(test, NULL != uut);
441         ao2_ref(consumer, +1);
442
443         test_data = ao2_alloc(1, NULL);
444         ast_test_validate(test, NULL != test_data);
445         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
446         test_message = stasis_message_create(test_message_type, test_data);
447
448         stasis_publish_sync(uut, test_message);
449
450         actual_len = consumer->messages_rxed_len;
451         ast_test_validate(test, 1 == actual_len);
452         actual = stasis_message_data(consumer->messages_rxed[0]);
453         ast_test_validate(test, test_data == actual);
454
455         return AST_TEST_PASS;
456 }
457
458 AST_TEST_DEFINE(unsubscribe_stops_messages)
459 {
460         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
461         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
462         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
463         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
464         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
465         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
466         int actual_len;
467
468         switch (cmd) {
469         case TEST_INIT:
470                 info->name = __func__;
471                 info->category = test_category;
472                 info->summary = "Test simple subscriptions";
473                 info->description = "Test simple subscriptions";
474                 return AST_TEST_NOT_RUN;
475         case TEST_EXECUTE:
476                 break;
477         }
478
479         topic = stasis_topic_create("TestTopic");
480         ast_test_validate(test, NULL != topic);
481
482         consumer = consumer_create(1);
483         ast_test_validate(test, NULL != consumer);
484
485         uut = stasis_subscribe(topic, consumer_exec, consumer);
486         ast_test_validate(test, NULL != uut);
487         ao2_ref(consumer, +1);
488
489         uut = stasis_unsubscribe(uut);
490
491         test_data = ao2_alloc(1, NULL);
492         ast_test_validate(test, NULL != test_data);
493         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
494         test_message = stasis_message_create(test_message_type, test_data);
495
496         stasis_publish(topic, test_message);
497
498         actual_len = consumer_should_stay(consumer, 0);
499         ast_test_validate(test, 0 == actual_len);
500
501         return AST_TEST_PASS;
502 }
503
504 AST_TEST_DEFINE(forward)
505 {
506         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
507         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
508
509         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
510         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
511
512         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
513         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
514         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
515
516         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
517         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
518         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
519         int actual_len;
520
521         switch (cmd) {
522         case TEST_INIT:
523                 info->name = __func__;
524                 info->category = test_category;
525                 info->summary = "Test sending events to a parent topic";
526                 info->description = "Test sending events to a parent topic.\n"
527                         "This test creates three topics (one parent, two children)\n"
528                         "and publishes a message to one child, and verifies it's\n"
529                         "only seen by that child and the parent";
530                 return AST_TEST_NOT_RUN;
531         case TEST_EXECUTE:
532                 break;
533         }
534
535         parent_topic = stasis_topic_create("ParentTestTopic");
536         ast_test_validate(test, NULL != parent_topic);
537         topic = stasis_topic_create("TestTopic");
538         ast_test_validate(test, NULL != topic);
539
540         forward_sub = stasis_forward_all(topic, parent_topic);
541         ast_test_validate(test, NULL != forward_sub);
542
543         parent_consumer = consumer_create(1);
544         ast_test_validate(test, NULL != parent_consumer);
545         consumer = consumer_create(1);
546         ast_test_validate(test, NULL != consumer);
547
548         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
549         ast_test_validate(test, NULL != parent_sub);
550         ao2_ref(parent_consumer, +1);
551         sub = stasis_subscribe(topic, consumer_exec, consumer);
552         ast_test_validate(test, NULL != sub);
553         ao2_ref(consumer, +1);
554
555         test_data = ao2_alloc(1, NULL);
556         ast_test_validate(test, NULL != test_data);
557         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
558         test_message = stasis_message_create(test_message_type, test_data);
559
560         stasis_publish(topic, test_message);
561
562         actual_len = consumer_wait_for(consumer, 1);
563         ast_test_validate(test, 1 == actual_len);
564         actual_len = consumer_wait_for(parent_consumer, 1);
565         ast_test_validate(test, 1 == actual_len);
566
567         return AST_TEST_PASS;
568 }
569
570 AST_TEST_DEFINE(interleaving)
571 {
572         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
573         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
574         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
575
576         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
577
578         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
579
580         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
581         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
582         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
583
584         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
585         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
586         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
587
588         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
589
590         int actual_len;
591
592         switch (cmd) {
593         case TEST_INIT:
594                 info->name = __func__;
595                 info->category = test_category;
596                 info->summary = "Test sending interleaved events to a parent topic";
597                 info->description = "Test sending events to a parent topic.\n"
598                         "This test creates three topics (one parent, two children)\n"
599                         "and publishes messages alternately between the children.\n"
600                         "It verifies that the messages are received in the expected\n"
601                         "order.";
602                 return AST_TEST_NOT_RUN;
603         case TEST_EXECUTE:
604                 break;
605         }
606
607         ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
608         ast_test_validate(test, NULL != test_message_type);
609
610         test_data = ao2_alloc(1, NULL);
611         ast_test_validate(test, NULL != test_data);
612
613         test_message1 = stasis_message_create(test_message_type, test_data);
614         ast_test_validate(test, NULL != test_message1);
615         test_message2 = stasis_message_create(test_message_type, test_data);
616         ast_test_validate(test, NULL != test_message2);
617         test_message3 = stasis_message_create(test_message_type, test_data);
618         ast_test_validate(test, NULL != test_message3);
619
620         parent_topic = stasis_topic_create("ParentTestTopic");
621         ast_test_validate(test, NULL != parent_topic);
622         topic1 = stasis_topic_create("Topic1");
623         ast_test_validate(test, NULL != topic1);
624         topic2 = stasis_topic_create("Topic2");
625         ast_test_validate(test, NULL != topic2);
626
627         forward_sub1 = stasis_forward_all(topic1, parent_topic);
628         ast_test_validate(test, NULL != forward_sub1);
629         forward_sub2 = stasis_forward_all(topic2, parent_topic);
630         ast_test_validate(test, NULL != forward_sub2);
631
632         consumer = consumer_create(1);
633         ast_test_validate(test, NULL != consumer);
634
635         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
636         ast_test_validate(test, NULL != sub);
637         ao2_ref(consumer, +1);
638
639         stasis_publish(topic1, test_message1);
640         stasis_publish(topic2, test_message2);
641         stasis_publish(topic1, test_message3);
642
643         actual_len = consumer_wait_for(consumer, 3);
644         ast_test_validate(test, 3 == actual_len);
645
646         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
647         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
648         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
649
650         return AST_TEST_PASS;
651 }
652
653 struct cache_test_data {
654         char *id;
655         char *value;
656 };
657
658 static void cache_test_data_dtor(void *obj)
659 {
660         struct cache_test_data *data = obj;
661
662         ast_free(data->id);
663         ast_free(data->value);
664 }
665
666 static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
667 {
668         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
669
670         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
671         if (data == NULL) {
672                 return NULL;
673         }
674
675         ast_assert(name != NULL);
676         ast_assert(value != NULL);
677
678         data->id = ast_strdup(name);
679         data->value = ast_strdup(value);
680         if (!data->id || !data->value) {
681                 return NULL;
682         }
683
684         return stasis_message_create_full(type, data, eid);
685 }
686
687 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
688 {
689         return cache_test_message_create_full(type, name, value, &ast_eid_default);
690 }
691
692 static const char *cache_test_data_id(struct stasis_message *message)
693 {
694         struct cache_test_data *cachable = stasis_message_data(message);
695
696         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
697                 return NULL;
698         }
699         return cachable->id;
700 }
701
702 static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
703 {
704         struct stasis_message *aggregate_snapshot;
705         struct stasis_message *snapshot;
706         struct stasis_message_type *type = NULL;
707         struct cache_test_data *test_data = NULL;
708         int idx;
709         int accumulated = 0;
710         char aggregate_str[30];
711
712         /* Accumulate the aggregate value. */
713         snapshot = stasis_cache_entry_get_local(entry);
714         if (snapshot) {
715                 type = stasis_message_type(snapshot);
716                 test_data = stasis_message_data(snapshot);
717                 accumulated += atoi(test_data->value);
718         }
719         for (idx = 0; ; ++idx) {
720                 snapshot = stasis_cache_entry_get_remote(entry, idx);
721                 if (!snapshot) {
722                         break;
723                 }
724
725                 type = stasis_message_type(snapshot);
726                 test_data = stasis_message_data(snapshot);
727                 accumulated += atoi(test_data->value);
728         }
729
730         if (!test_data) {
731                 /* There are no test entries cached.  Delete the aggregate. */
732                 return NULL;
733         }
734
735         snapshot = stasis_cache_entry_get_aggregate(entry);
736         if (snapshot) {
737                 type = stasis_message_type(snapshot);
738                 test_data = stasis_message_data(snapshot);
739                 if (accumulated == atoi(test_data->value)) {
740                         /* Aggregate test entry did not change. */
741                         return ao2_bump(snapshot);
742                 }
743         }
744
745         snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
746         aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
747         if (!aggregate_snapshot) {
748                 /* Bummer.  We have to keep the old aggregate snapshot. */
749                 ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
750                 return ao2_bump(snapshot);
751         }
752
753         return aggregate_snapshot;
754 }
755
756 static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
757 {
758         stasis_publish(topic, aggregate);
759 }
760
761 static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
762 {
763         RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
764         struct cache_test_data *test_data;
765
766         aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
767         if (!aggregate) {
768                 /* No aggregate, return true if given no value. */
769                 return !value;
770         }
771
772         /* Return true if the given value matches the aggregate value. */
773         test_data = stasis_message_data(aggregate);
774         return value && !strcmp(value, test_data->value);
775 }
776
777 AST_TEST_DEFINE(cache_filter)
778 {
779         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
780         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
781         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
782         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
783         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
784         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
785         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
786         int actual_len;
787
788         switch (cmd) {
789         case TEST_INIT:
790                 info->name = __func__;
791                 info->category = test_category;
792                 info->summary = "Test caching topics only forward cache_update messages.";
793                 info->description = "Test caching topics only forward cache_update messages.";
794                 return AST_TEST_NOT_RUN;
795         case TEST_EXECUTE:
796                 break;
797         }
798
799         ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
800         ast_test_validate(test, NULL != non_cache_type);
801         topic = stasis_topic_create("SomeTopic");
802         ast_test_validate(test, NULL != topic);
803         cache = stasis_cache_create(cache_test_data_id);
804         ast_test_validate(test, NULL != cache);
805         caching_topic = stasis_caching_topic_create(topic, cache);
806         ast_test_validate(test, NULL != caching_topic);
807         consumer = consumer_create(1);
808         ast_test_validate(test, NULL != consumer);
809         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
810         ast_test_validate(test, NULL != sub);
811         ao2_ref(consumer, +1);
812
813         test_message = cache_test_message_create(non_cache_type, "1", "1");
814         ast_test_validate(test, NULL != test_message);
815
816         stasis_publish(topic, test_message);
817
818         actual_len = consumer_should_stay(consumer, 0);
819         ast_test_validate(test, 0 == actual_len);
820
821         return AST_TEST_PASS;
822 }
823
824 AST_TEST_DEFINE(cache)
825 {
826         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
827         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
828         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
829         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
830         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
831         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
832         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
833         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
834         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
835         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
836         int actual_len;
837         struct stasis_cache_update *actual_update;
838
839         switch (cmd) {
840         case TEST_INIT:
841                 info->name = __func__;
842                 info->category = test_category;
843                 info->summary = "Test passing messages through cache topic unscathed.";
844                 info->description = "Test passing messages through cache topic unscathed.";
845                 return AST_TEST_NOT_RUN;
846         case TEST_EXECUTE:
847                 break;
848         }
849
850         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
851         ast_test_validate(test, NULL != cache_type);
852         topic = stasis_topic_create("SomeTopic");
853         ast_test_validate(test, NULL != topic);
854         cache = stasis_cache_create(cache_test_data_id);
855         ast_test_validate(test, NULL != cache);
856         caching_topic = stasis_caching_topic_create(topic, cache);
857         ast_test_validate(test, NULL != caching_topic);
858         consumer = consumer_create(1);
859         ast_test_validate(test, NULL != consumer);
860         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
861         ast_test_validate(test, NULL != sub);
862         ao2_ref(consumer, +1);
863
864         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
865         ast_test_validate(test, NULL != test_message1_1);
866         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
867         ast_test_validate(test, NULL != test_message2_1);
868
869         /* Post a couple of snapshots */
870         stasis_publish(topic, test_message1_1);
871         stasis_publish(topic, test_message2_1);
872         actual_len = consumer_wait_for(consumer, 2);
873         ast_test_validate(test, 2 == actual_len);
874
875         /* Check for new snapshot messages */
876         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
877         actual_update = stasis_message_data(consumer->messages_rxed[0]);
878         ast_test_validate(test, NULL == actual_update->old_snapshot);
879         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
880         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
881         /* stasis_cache_get returned a ref, so unref test_message1_1 */
882         ao2_ref(test_message1_1, -1);
883
884         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
885         actual_update = stasis_message_data(consumer->messages_rxed[1]);
886         ast_test_validate(test, NULL == actual_update->old_snapshot);
887         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
888         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
889         /* stasis_cache_get returned a ref, so unref test_message2_1 */
890         ao2_ref(test_message2_1, -1);
891
892         /* Update snapshot 2 */
893         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
894         ast_test_validate(test, NULL != test_message2_2);
895         stasis_publish(topic, test_message2_2);
896
897         actual_len = consumer_wait_for(consumer, 3);
898         ast_test_validate(test, 3 == actual_len);
899
900         actual_update = stasis_message_data(consumer->messages_rxed[2]);
901         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
902         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
903         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
904         /* stasis_cache_get returned a ref, so unref test_message2_2 */
905         ao2_ref(test_message2_2, -1);
906
907         /* Clear snapshot 1 */
908         test_message1_clear = stasis_cache_clear_create(test_message1_1);
909         ast_test_validate(test, NULL != test_message1_clear);
910         stasis_publish(topic, test_message1_clear);
911
912         actual_len = consumer_wait_for(consumer, 4);
913         ast_test_validate(test, 4 == actual_len);
914
915         actual_update = stasis_message_data(consumer->messages_rxed[3]);
916         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
917         ast_test_validate(test, NULL == actual_update->new_snapshot);
918         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
919
920         return AST_TEST_PASS;
921 }
922
923 AST_TEST_DEFINE(cache_dump)
924 {
925         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
926         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
927         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
928         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
929         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
930         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
931         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
932         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
933         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
934         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
935         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
936         int actual_len;
937         struct ao2_iterator i;
938         void *obj;
939
940         switch (cmd) {
941         case TEST_INIT:
942                 info->name = __func__;
943                 info->category = test_category;
944                 info->summary = "Test cache dump routines.";
945                 info->description = "Test cache dump routines.";
946                 return AST_TEST_NOT_RUN;
947         case TEST_EXECUTE:
948                 break;
949         }
950
951         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
952         ast_test_validate(test, NULL != cache_type);
953         topic = stasis_topic_create("SomeTopic");
954         ast_test_validate(test, NULL != topic);
955         cache = stasis_cache_create(cache_test_data_id);
956         ast_test_validate(test, NULL != cache);
957         caching_topic = stasis_caching_topic_create(topic, cache);
958         ast_test_validate(test, NULL != caching_topic);
959         consumer = consumer_create(1);
960         ast_test_validate(test, NULL != consumer);
961         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
962         ast_test_validate(test, NULL != sub);
963         ao2_ref(consumer, +1);
964
965         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
966         ast_test_validate(test, NULL != test_message1_1);
967         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
968         ast_test_validate(test, NULL != test_message2_1);
969
970         /* Post a couple of snapshots */
971         stasis_publish(topic, test_message1_1);
972         stasis_publish(topic, test_message2_1);
973         actual_len = consumer_wait_for(consumer, 2);
974         ast_test_validate(test, 2 == actual_len);
975
976         /* Check the cache */
977         ao2_cleanup(cache_dump);
978         cache_dump = stasis_cache_dump(cache, NULL);
979         ast_test_validate(test, NULL != cache_dump);
980         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
981         i = ao2_iterator_init(cache_dump, 0);
982         while ((obj = ao2_iterator_next(&i))) {
983                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
984                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
985         }
986         ao2_iterator_destroy(&i);
987
988         /* Update snapshot 2 */
989         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
990         ast_test_validate(test, NULL != test_message2_2);
991         stasis_publish(topic, test_message2_2);
992
993         actual_len = consumer_wait_for(consumer, 3);
994         ast_test_validate(test, 3 == actual_len);
995
996         /* Check the cache */
997         ao2_cleanup(cache_dump);
998         cache_dump = stasis_cache_dump(cache, NULL);
999         ast_test_validate(test, NULL != cache_dump);
1000         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1001         i = ao2_iterator_init(cache_dump, 0);
1002         while ((obj = ao2_iterator_next(&i))) {
1003                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1004                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1005         }
1006         ao2_iterator_destroy(&i);
1007
1008         /* Clear snapshot 1 */
1009         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1010         ast_test_validate(test, NULL != test_message1_clear);
1011         stasis_publish(topic, test_message1_clear);
1012
1013         actual_len = consumer_wait_for(consumer, 4);
1014         ast_test_validate(test, 4 == actual_len);
1015
1016         /* Check the cache */
1017         ao2_cleanup(cache_dump);
1018         cache_dump = stasis_cache_dump(cache, NULL);
1019         ast_test_validate(test, NULL != cache_dump);
1020         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1021         i = ao2_iterator_init(cache_dump, 0);
1022         while ((obj = ao2_iterator_next(&i))) {
1023                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1024                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1025         }
1026         ao2_iterator_destroy(&i);
1027
1028         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
1029         ao2_cleanup(cache_dump);
1030         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
1031         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
1032
1033         return AST_TEST_PASS;
1034 }
1035
1036 AST_TEST_DEFINE(cache_eid_aggregate)
1037 {
1038         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1039         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1040         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1041         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1042         RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
1043         RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
1044         RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
1045         RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
1046         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1047         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1048         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1049         RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
1050         RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
1051         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1052         RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
1053         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1054         int actual_len;
1055         struct ao2_iterator i;
1056         void *obj;
1057         struct ast_eid foreign_eid1;
1058         struct ast_eid foreign_eid2;
1059
1060         switch (cmd) {
1061         case TEST_INIT:
1062                 info->name = __func__;
1063                 info->category = test_category;
1064                 info->summary = "Test cache eid and aggregate support.";
1065                 info->description = "Test cache eid and aggregate support.";
1066                 return AST_TEST_NOT_RUN;
1067         case TEST_EXECUTE:
1068                 break;
1069         }
1070
1071         memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
1072         memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
1073
1074         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1075         ast_test_validate(test, NULL != cache_type);
1076
1077         topic = stasis_topic_create("SomeTopic");
1078         ast_test_validate(test, NULL != topic);
1079
1080         /* To consume events published to the topic. */
1081         topic_consumer = consumer_create(1);
1082         ast_test_validate(test, NULL != topic_consumer);
1083
1084         topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1085         ast_test_validate(test, NULL != topic_sub);
1086         ao2_ref(topic_consumer, +1);
1087
1088         cache = stasis_cache_create_full(cache_test_data_id,
1089                 cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
1090         ast_test_validate(test, NULL != cache);
1091
1092         caching_topic = stasis_caching_topic_create(topic, cache);
1093         ast_test_validate(test, NULL != caching_topic);
1094
1095         /* To consume update events published to the caching_topic. */
1096         cache_consumer = consumer_create(1);
1097         ast_test_validate(test, NULL != cache_consumer);
1098
1099         cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
1100         ast_test_validate(test, NULL != cache_sub);
1101         ao2_ref(cache_consumer, +1);
1102
1103         /* Create test messages. */
1104         test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
1105         ast_test_validate(test, NULL != test_message1_1);
1106         test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
1107         ast_test_validate(test, NULL != test_message2_1);
1108         test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
1109         ast_test_validate(test, NULL != test_message2_2);
1110         test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
1111         ast_test_validate(test, NULL != test_message2_3);
1112         test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
1113         ast_test_validate(test, NULL != test_message2_4);
1114
1115         /* Post some snapshots */
1116         stasis_publish(topic, test_message1_1);
1117         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
1118         stasis_publish(topic, test_message2_1);
1119         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
1120         stasis_publish(topic, test_message2_2);
1121         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
1122
1123         actual_len = consumer_wait_for(cache_consumer, 6);
1124         ast_test_validate(test, 6 == actual_len);
1125         actual_len = consumer_wait_for(topic_consumer, 6);
1126         ast_test_validate(test, 6 == actual_len);
1127
1128         /* Check the cache */
1129         ao2_cleanup(cache_dump);
1130         cache_dump = stasis_cache_dump_all(cache, NULL);
1131         ast_test_validate(test, NULL != cache_dump);
1132         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1133         i = ao2_iterator_init(cache_dump, 0);
1134         while ((obj = ao2_iterator_next(&i))) {
1135                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1136
1137                 ast_test_validate(test,
1138                         actual_cache_entry == test_message1_1
1139                         || actual_cache_entry == test_message2_1
1140                         || actual_cache_entry == test_message2_2);
1141         }
1142         ao2_iterator_destroy(&i);
1143
1144         /* Check the local cached items */
1145         ao2_cleanup(cache_dump);
1146         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
1147         ast_test_validate(test, NULL != cache_dump);
1148         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1149         i = ao2_iterator_init(cache_dump, 0);
1150         while ((obj = ao2_iterator_next(&i))) {
1151                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1152
1153                 ast_test_validate(test,
1154                         actual_cache_entry == test_message1_1
1155                         || actual_cache_entry == test_message2_1);
1156         }
1157         ao2_iterator_destroy(&i);
1158
1159         /* Post snapshot 2 from another eid. */
1160         stasis_publish(topic, test_message2_3);
1161         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
1162
1163         actual_len = consumer_wait_for(cache_consumer, 8);
1164         ast_test_validate(test, 8 == actual_len);
1165         actual_len = consumer_wait_for(topic_consumer, 8);
1166         ast_test_validate(test, 8 == actual_len);
1167
1168         /* Check the cache */
1169         ao2_cleanup(cache_dump);
1170         cache_dump = stasis_cache_dump_all(cache, NULL);
1171         ast_test_validate(test, NULL != cache_dump);
1172         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1173         i = ao2_iterator_init(cache_dump, 0);
1174         while ((obj = ao2_iterator_next(&i))) {
1175                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1176
1177                 ast_test_validate(test,
1178                         actual_cache_entry == test_message1_1
1179                         || actual_cache_entry == test_message2_1
1180                         || actual_cache_entry == test_message2_2
1181                         || actual_cache_entry == test_message2_3);
1182         }
1183         ao2_iterator_destroy(&i);
1184
1185         /* Check the remote cached items */
1186         ao2_cleanup(cache_dump);
1187         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
1188         ast_test_validate(test, NULL != cache_dump);
1189         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1190         i = ao2_iterator_init(cache_dump, 0);
1191         while ((obj = ao2_iterator_next(&i))) {
1192                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1193
1194                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1195         }
1196         ao2_iterator_destroy(&i);
1197
1198         /* Post snapshot 2 from a repeated eid. */
1199         stasis_publish(topic, test_message2_4);
1200         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
1201
1202         actual_len = consumer_wait_for(cache_consumer, 10);
1203         ast_test_validate(test, 10 == actual_len);
1204         actual_len = consumer_wait_for(topic_consumer, 10);
1205         ast_test_validate(test, 10 == actual_len);
1206
1207         /* Check the cache */
1208         ao2_cleanup(cache_dump);
1209         cache_dump = stasis_cache_dump_all(cache, NULL);
1210         ast_test_validate(test, NULL != cache_dump);
1211         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1212         i = ao2_iterator_init(cache_dump, 0);
1213         while ((obj = ao2_iterator_next(&i))) {
1214                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1215
1216                 ast_test_validate(test,
1217                         actual_cache_entry == test_message1_1
1218                         || actual_cache_entry == test_message2_1
1219                         || actual_cache_entry == test_message2_2
1220                         || actual_cache_entry == test_message2_4);
1221         }
1222         ao2_iterator_destroy(&i);
1223
1224         /* Check all snapshot 2 cache entries. */
1225         ao2_cleanup(cache_dump);
1226         cache_dump = stasis_cache_get_all(cache, cache_type, "2");
1227         ast_test_validate(test, NULL != cache_dump);
1228         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1229         i = ao2_iterator_init(cache_dump, 0);
1230         while ((obj = ao2_iterator_next(&i))) {
1231                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1232
1233                 ast_test_validate(test,
1234                         actual_cache_entry == test_message2_1
1235                         || actual_cache_entry == test_message2_2
1236                         || actual_cache_entry == test_message2_4);
1237         }
1238         ao2_iterator_destroy(&i);
1239
1240         /* Clear snapshot 1 */
1241         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1242         ast_test_validate(test, NULL != test_message1_clear);
1243         stasis_publish(topic, test_message1_clear);
1244         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
1245
1246         actual_len = consumer_wait_for(cache_consumer, 12);
1247         ast_test_validate(test, 12 == actual_len);
1248         actual_len = consumer_wait_for(topic_consumer, 11);
1249         ast_test_validate(test, 11 == actual_len);
1250
1251         /* Check the cache */
1252         ao2_cleanup(cache_dump);
1253         cache_dump = stasis_cache_dump_all(cache, NULL);
1254         ast_test_validate(test, NULL != cache_dump);
1255         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1256         i = ao2_iterator_init(cache_dump, 0);
1257         while ((obj = ao2_iterator_next(&i))) {
1258                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1259
1260                 ast_test_validate(test,
1261                         actual_cache_entry == test_message2_1
1262                         || actual_cache_entry == test_message2_2
1263                         || actual_cache_entry == test_message2_4);
1264         }
1265         ao2_iterator_destroy(&i);
1266
1267         /* Clear snapshot 2 from a remote eid */
1268         test_message2_clear = stasis_cache_clear_create(test_message2_2);
1269         ast_test_validate(test, NULL != test_message2_clear);
1270         stasis_publish(topic, test_message2_clear);
1271         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
1272
1273         actual_len = consumer_wait_for(cache_consumer, 14);
1274         ast_test_validate(test, 14 == actual_len);
1275         actual_len = consumer_wait_for(topic_consumer, 13);
1276         ast_test_validate(test, 13 == actual_len);
1277
1278         /* Check the cache */
1279         ao2_cleanup(cache_dump);
1280         cache_dump = stasis_cache_dump_all(cache, NULL);
1281         ast_test_validate(test, NULL != cache_dump);
1282         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1283         i = ao2_iterator_init(cache_dump, 0);
1284         while ((obj = ao2_iterator_next(&i))) {
1285                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1286
1287                 ast_test_validate(test,
1288                         actual_cache_entry == test_message2_1
1289                         || actual_cache_entry == test_message2_4);
1290         }
1291         ao2_iterator_destroy(&i);
1292
1293         return AST_TEST_PASS;
1294 }
1295
1296 AST_TEST_DEFINE(router)
1297 {
1298         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1299         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1300         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1301         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1302         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1303         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1304         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1305         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1306         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1307         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1308         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1309         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1310         int actual_len, ret;
1311         struct stasis_message *actual;
1312
1313         switch (cmd) {
1314         case TEST_INIT:
1315                 info->name = __func__;
1316                 info->category = test_category;
1317                 info->summary = "Test simple message routing";
1318                 info->description = "Test simple message routing";
1319                 return AST_TEST_NOT_RUN;
1320         case TEST_EXECUTE:
1321                 break;
1322         }
1323
1324         topic = stasis_topic_create("TestTopic");
1325         ast_test_validate(test, NULL != topic);
1326
1327         consumer1 = consumer_create(1);
1328         ast_test_validate(test, NULL != consumer1);
1329         consumer2 = consumer_create(1);
1330         ast_test_validate(test, NULL != consumer2);
1331         consumer3 = consumer_create(1);
1332         ast_test_validate(test, NULL != consumer3);
1333
1334         ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1335         ast_test_validate(test, NULL != test_message_type1);
1336         ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1337         ast_test_validate(test, NULL != test_message_type2);
1338         ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1339         ast_test_validate(test, NULL != test_message_type3);
1340
1341         uut = stasis_message_router_create(topic);
1342         ast_test_validate(test, NULL != uut);
1343
1344         ret = stasis_message_router_add(
1345                 uut, test_message_type1, consumer_exec, consumer1);
1346         ast_test_validate(test, 0 == ret);
1347         ao2_ref(consumer1, +1);
1348         ret = stasis_message_router_add(
1349                 uut, test_message_type2, consumer_exec, consumer2);
1350         ast_test_validate(test, 0 == ret);
1351         ao2_ref(consumer2, +1);
1352         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1353         ast_test_validate(test, 0 == ret);
1354         ao2_ref(consumer3, +1);
1355
1356         test_data = ao2_alloc(1, NULL);
1357         ast_test_validate(test, NULL != test_data);
1358         test_message1 = stasis_message_create(test_message_type1, test_data);
1359         ast_test_validate(test, NULL != test_message1);
1360         test_message2 = stasis_message_create(test_message_type2, test_data);
1361         ast_test_validate(test, NULL != test_message2);
1362         test_message3 = stasis_message_create(test_message_type3, test_data);
1363         ast_test_validate(test, NULL != test_message3);
1364
1365         stasis_publish(topic, test_message1);
1366         stasis_publish(topic, test_message2);
1367         stasis_publish(topic, test_message3);
1368
1369         actual_len = consumer_wait_for(consumer1, 1);
1370         ast_test_validate(test, 1 == actual_len);
1371         actual_len = consumer_wait_for(consumer2, 1);
1372         ast_test_validate(test, 1 == actual_len);
1373         actual_len = consumer_wait_for(consumer3, 1);
1374         ast_test_validate(test, 1 == actual_len);
1375
1376         actual = consumer1->messages_rxed[0];
1377         ast_test_validate(test, test_message1 == actual);
1378
1379         actual = consumer2->messages_rxed[0];
1380         ast_test_validate(test, test_message2 == actual);
1381
1382         actual = consumer3->messages_rxed[0];
1383         ast_test_validate(test, test_message3 == actual);
1384
1385         /* consumer1 and consumer2 do not get the final message. */
1386         ao2_cleanup(consumer1);
1387         ao2_cleanup(consumer2);
1388
1389         return AST_TEST_PASS;
1390 }
1391
1392 static const char *cache_simple(struct stasis_message *message)
1393 {
1394         const char *type_name =
1395                 stasis_message_type_name(stasis_message_type(message));
1396         if (!ast_begins_with(type_name, "Cache")) {
1397                 return NULL;
1398         }
1399
1400         return "cached";
1401 }
1402
1403 AST_TEST_DEFINE(router_cache_updates)
1404 {
1405         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1406         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1407         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1408         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1409         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1410         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1411         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1412         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1413         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1414         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1415         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1416         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1417         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1418         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1419         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1420         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1421         struct stasis_cache_update *update;
1422         int actual_len, ret;
1423         struct stasis_message *actual;
1424
1425         switch (cmd) {
1426         case TEST_INIT:
1427                 info->name = __func__;
1428                 info->category = test_category;
1429                 info->summary = "Test special handling cache_update messages";
1430                 info->description = "Test special handling cache_update messages";
1431                 return AST_TEST_NOT_RUN;
1432         case TEST_EXECUTE:
1433                 break;
1434         }
1435
1436         topic = stasis_topic_create("TestTopic");
1437         ast_test_validate(test, NULL != topic);
1438
1439         cache = stasis_cache_create(cache_simple);
1440         ast_test_validate(test, NULL != cache);
1441         caching_topic = stasis_caching_topic_create(topic, cache);
1442         ast_test_validate(test, NULL != caching_topic);
1443
1444         consumer1 = consumer_create(1);
1445         ast_test_validate(test, NULL != consumer1);
1446         consumer2 = consumer_create(1);
1447         ast_test_validate(test, NULL != consumer2);
1448         consumer3 = consumer_create(1);
1449         ast_test_validate(test, NULL != consumer3);
1450
1451         ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1452         ast_test_validate(test, NULL != test_message_type1);
1453         ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1454         ast_test_validate(test, NULL != test_message_type2);
1455         ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1456         ast_test_validate(test, NULL != test_message_type3);
1457
1458         uut = stasis_message_router_create(
1459                 stasis_caching_get_topic(caching_topic));
1460         ast_test_validate(test, NULL != uut);
1461
1462         ret = stasis_message_router_add_cache_update(
1463                 uut, test_message_type1, consumer_exec, consumer1);
1464         ast_test_validate(test, 0 == ret);
1465         ao2_ref(consumer1, +1);
1466         ret = stasis_message_router_add(
1467                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1468         ast_test_validate(test, 0 == ret);
1469         ao2_ref(consumer2, +1);
1470         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1471         ast_test_validate(test, 0 == ret);
1472         ao2_ref(consumer3, +1);
1473
1474         test_data = ao2_alloc(1, NULL);
1475         ast_test_validate(test, NULL != test_data);
1476         test_message1 = stasis_message_create(test_message_type1, test_data);
1477         ast_test_validate(test, NULL != test_message1);
1478         test_message2 = stasis_message_create(test_message_type2, test_data);
1479         ast_test_validate(test, NULL != test_message2);
1480         test_message3 = stasis_message_create(test_message_type3, test_data);
1481         ast_test_validate(test, NULL != test_message3);
1482
1483         stasis_publish(topic, test_message1);
1484         stasis_publish(topic, test_message2);
1485         stasis_publish(topic, test_message3);
1486
1487         actual_len = consumer_wait_for(consumer1, 1);
1488         ast_test_validate(test, 1 == actual_len);
1489         actual_len = consumer_wait_for(consumer2, 1);
1490         ast_test_validate(test, 1 == actual_len);
1491         /* Uncacheable message should not be passed through */
1492         actual_len = consumer_should_stay(consumer3, 0);
1493         ast_test_validate(test, 0 == actual_len);
1494
1495         actual = consumer1->messages_rxed[0];
1496         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1497         update = stasis_message_data(actual);
1498         ast_test_validate(test, test_message_type1 == update->type);
1499         ast_test_validate(test, test_message1 == update->new_snapshot);
1500
1501         actual = consumer2->messages_rxed[0];
1502         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1503         update = stasis_message_data(actual);
1504         ast_test_validate(test, test_message_type2 == update->type);
1505         ast_test_validate(test, test_message2 == update->new_snapshot);
1506
1507         /* consumer1 and consumer2 do not get the final message. */
1508         ao2_cleanup(consumer1);
1509         ao2_cleanup(consumer2);
1510
1511         return AST_TEST_PASS;
1512 }
1513
1514 AST_TEST_DEFINE(no_to_json)
1515 {
1516         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1517         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1518         RAII_VAR(char *, data, NULL, ao2_cleanup);
1519         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1520         char *expected = "SomeData";
1521
1522         switch (cmd) {
1523         case TEST_INIT:
1524                 info->name = __func__;
1525                 info->category = test_category;
1526                 info->summary = "Test message to_json function";
1527                 info->description = "Test message to_json function";
1528                 return AST_TEST_NOT_RUN;
1529         case TEST_EXECUTE:
1530                 break;
1531         }
1532
1533         /* Test NULL */
1534         actual = stasis_message_to_json(NULL, NULL);
1535         ast_test_validate(test, NULL == actual);
1536
1537         /* Test message with NULL to_json function */
1538         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1539
1540         data = ao2_alloc(strlen(expected) + 1, NULL);
1541         strcpy(data, expected);
1542         uut = stasis_message_create(type, data);
1543         ast_test_validate(test, NULL != uut);
1544
1545         actual = stasis_message_to_json(uut, NULL);
1546         ast_test_validate(test, NULL == actual);
1547
1548         return AST_TEST_PASS;
1549 }
1550
1551 AST_TEST_DEFINE(to_json)
1552 {
1553         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1554         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1555         RAII_VAR(char *, data, NULL, ao2_cleanup);
1556         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1557         const char *expected_text = "SomeData";
1558         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1559
1560         switch (cmd) {
1561         case TEST_INIT:
1562                 info->name = __func__;
1563                 info->category = test_category;
1564                 info->summary = "Test message to_json function when NULL";
1565                 info->description = "Test message to_json function when NULL";
1566                 return AST_TEST_NOT_RUN;
1567         case TEST_EXECUTE:
1568                 break;
1569         }
1570
1571         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1572
1573         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1574         strcpy(data, expected_text);
1575         uut = stasis_message_create(type, data);
1576         ast_test_validate(test, NULL != uut);
1577
1578         expected = ast_json_string_create(expected_text);
1579         actual = stasis_message_to_json(uut, NULL);
1580         ast_test_validate(test, ast_json_equal(expected, actual));
1581
1582         return AST_TEST_PASS;
1583 }
1584
1585 AST_TEST_DEFINE(no_to_ami)
1586 {
1587         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1588         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1589         RAII_VAR(char *, data, NULL, ao2_cleanup);
1590         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1591         char *expected = "SomeData";
1592
1593         switch (cmd) {
1594         case TEST_INIT:
1595                 info->name = __func__;
1596                 info->category = test_category;
1597                 info->summary = "Test message to_ami function when NULL";
1598                 info->description = "Test message to_ami function when NULL";
1599                 return AST_TEST_NOT_RUN;
1600         case TEST_EXECUTE:
1601                 break;
1602         }
1603
1604         /* Test NULL */
1605         actual = stasis_message_to_ami(NULL);
1606         ast_test_validate(test, NULL == actual);
1607
1608         /* Test message with NULL to_ami function */
1609         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1610
1611         data = ao2_alloc(strlen(expected) + 1, NULL);
1612         strcpy(data, expected);
1613         uut = stasis_message_create(type, data);
1614         ast_test_validate(test, NULL != uut);
1615
1616         actual = stasis_message_to_ami(uut);
1617         ast_test_validate(test, NULL == actual);
1618
1619         return AST_TEST_PASS;
1620 }
1621
1622 AST_TEST_DEFINE(to_ami)
1623 {
1624         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1625         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1626         RAII_VAR(char *, data, NULL, ao2_cleanup);
1627         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1628         const char *expected_text = "SomeData";
1629         const char *expected = "Message: SomeData\r\n";
1630
1631         switch (cmd) {
1632         case TEST_INIT:
1633                 info->name = __func__;
1634                 info->category = test_category;
1635                 info->summary = "Test message to_ami function";
1636                 info->description = "Test message to_ami function";
1637                 return AST_TEST_NOT_RUN;
1638         case TEST_EXECUTE:
1639                 break;
1640         }
1641
1642         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1643
1644         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1645         strcpy(data, expected_text);
1646         uut = stasis_message_create(type, data);
1647         ast_test_validate(test, NULL != uut);
1648
1649         actual = stasis_message_to_ami(uut);
1650         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1651
1652         return AST_TEST_PASS;
1653 }
1654
1655 static void noop(void *data, struct stasis_subscription *sub,
1656         struct stasis_message *message)
1657 {
1658         /* no-op */
1659 }
1660
1661 AST_TEST_DEFINE(dtor_order)
1662 {
1663         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1664         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1665
1666         switch (cmd) {
1667         case TEST_INIT:
1668                 info->name = __func__;
1669                 info->category = test_category;
1670                 info->summary = "Test that destruction order doesn't bomb stuff";
1671                 info->description = "Test that destruction order doesn't bomb stuff";
1672                 return AST_TEST_NOT_RUN;
1673         case TEST_EXECUTE:
1674                 break;
1675         }
1676
1677         topic = stasis_topic_create("test-topic");
1678         ast_test_validate(test, NULL != topic);
1679
1680         sub = stasis_subscribe(topic, noop, NULL);
1681         ast_test_validate(test, NULL != sub);
1682
1683         /* With any luck, this won't completely blow everything up */
1684         ao2_cleanup(topic);
1685         stasis_unsubscribe(sub);
1686
1687         /* These refs were cleaned up manually */
1688         topic = NULL;
1689         sub = NULL;
1690
1691         return AST_TEST_PASS;
1692 }
1693
1694 static const char *noop_get_id(struct stasis_message *message)
1695 {
1696         return NULL;
1697 }
1698
1699 AST_TEST_DEFINE(caching_dtor_order)
1700 {
1701         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1702         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1703         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
1704                 stasis_caching_unsubscribe);
1705         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1706
1707         switch (cmd) {
1708         case TEST_INIT:
1709                 info->name = __func__;
1710                 info->category = test_category;
1711                 info->summary = "Test that destruction order doesn't bomb stuff";
1712                 info->description = "Test that destruction order doesn't bomb stuff";
1713                 return AST_TEST_NOT_RUN;
1714         case TEST_EXECUTE:
1715                 break;
1716         }
1717
1718         cache = stasis_cache_create(noop_get_id);
1719         ast_test_validate(test, NULL != cache);
1720
1721         topic = stasis_topic_create("test-topic");
1722         ast_test_validate(test, NULL != topic);
1723
1724         caching_topic = stasis_caching_topic_create(topic, cache);
1725         ast_test_validate(test, NULL != caching_topic);
1726
1727         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
1728                 NULL);
1729         ast_test_validate(test, NULL != sub);
1730
1731         /* With any luck, this won't completely blow everything up */
1732         ao2_cleanup(cache);
1733         ao2_cleanup(topic);
1734         stasis_caching_unsubscribe(caching_topic);
1735         stasis_unsubscribe(sub);
1736
1737         /* These refs were cleaned up manually */
1738         cache = NULL;
1739         topic = NULL;
1740         caching_topic = NULL;
1741         sub = NULL;
1742
1743         return AST_TEST_PASS;
1744 }
1745
1746 static int unload_module(void)
1747 {
1748         AST_TEST_UNREGISTER(message_type);
1749         AST_TEST_UNREGISTER(message);
1750         AST_TEST_UNREGISTER(subscription_messages);
1751         AST_TEST_UNREGISTER(publish);
1752         AST_TEST_UNREGISTER(publish_sync);
1753         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1754         AST_TEST_UNREGISTER(forward);
1755         AST_TEST_UNREGISTER(cache_filter);
1756         AST_TEST_UNREGISTER(cache);
1757         AST_TEST_UNREGISTER(cache_dump);
1758         AST_TEST_UNREGISTER(cache_eid_aggregate);
1759         AST_TEST_UNREGISTER(router);
1760         AST_TEST_UNREGISTER(router_cache_updates);
1761         AST_TEST_UNREGISTER(interleaving);
1762         AST_TEST_UNREGISTER(no_to_json);
1763         AST_TEST_UNREGISTER(to_json);
1764         AST_TEST_UNREGISTER(no_to_ami);
1765         AST_TEST_UNREGISTER(to_ami);
1766         AST_TEST_UNREGISTER(dtor_order);
1767         AST_TEST_UNREGISTER(caching_dtor_order);
1768         return 0;
1769 }
1770
1771 static int load_module(void)
1772 {
1773         AST_TEST_REGISTER(message_type);
1774         AST_TEST_REGISTER(message);
1775         AST_TEST_REGISTER(subscription_messages);
1776         AST_TEST_REGISTER(publish);
1777         AST_TEST_REGISTER(publish_sync);
1778         AST_TEST_REGISTER(unsubscribe_stops_messages);
1779         AST_TEST_REGISTER(forward);
1780         AST_TEST_REGISTER(cache_filter);
1781         AST_TEST_REGISTER(cache);
1782         AST_TEST_REGISTER(cache_dump);
1783         AST_TEST_REGISTER(cache_eid_aggregate);
1784         AST_TEST_REGISTER(router);
1785         AST_TEST_REGISTER(router_cache_updates);
1786         AST_TEST_REGISTER(interleaving);
1787         AST_TEST_REGISTER(no_to_json);
1788         AST_TEST_REGISTER(to_json);
1789         AST_TEST_REGISTER(no_to_ami);
1790         AST_TEST_REGISTER(to_ami);
1791         AST_TEST_REGISTER(dtor_order);
1792         AST_TEST_REGISTER(caching_dtor_order);
1793         return AST_MODULE_LOAD_SUCCESS;
1794 }
1795
1796 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1797                 .load = load_module,
1798                 .unload = unload_module
1799         );