Merge "core: Tweak startup order."
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Test Stasis message bus.
22  *
23  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
24  *
25  * \ingroup tests
26  */
27
28 /*** MODULEINFO
29         <depend>TEST_FRAMEWORK</depend>
30         <support_level>core</support_level>
31  ***/
32
33 #include "asterisk.h"
34
35 #include "asterisk/astobj2.h"
36 #include "asterisk/module.h"
37 #include "asterisk/stasis.h"
38 #include "asterisk/stasis_message_router.h"
39 #include "asterisk/test.h"
40
41 static const char *test_category = "/stasis/core/";
42
43 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
44 {
45         const char *text = stasis_message_data(message);
46
47         return ast_json_string_create(text);
48 }
49
50 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
51 {
52         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
53         const char *text = stasis_message_data(message);
54
55         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
56                 "Message: %s\r\n", text);
57
58         if (res == NULL) {
59                 return NULL;
60         }
61
62         ao2_ref(res, +1);
63         return res;
64 }
65
66 static struct stasis_message_vtable fake_vtable = {
67         .to_json = fake_json,
68         .to_ami = fake_ami
69 };
70
71 AST_TEST_DEFINE(message_type)
72 {
73         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
74
75         switch (cmd) {
76         case TEST_INIT:
77                 info->name = __func__;
78                 info->category = test_category;
79                 info->summary = "Test basic message_type functions";
80                 info->description = "Test basic message_type functions";
81                 return AST_TEST_NOT_RUN;
82         case TEST_EXECUTE:
83                 break;
84         }
85
86         ast_test_validate(test, stasis_message_type_create(NULL, NULL, NULL) == STASIS_MESSAGE_TYPE_ERROR);
87         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
88         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
89
90         return AST_TEST_PASS;
91 }
92
93 AST_TEST_DEFINE(message)
94 {
95         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
96         RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102         struct ast_eid foreign_eid;
103
104         switch (cmd) {
105         case TEST_INIT:
106                 info->name = __func__;
107                 info->category = test_category;
108                 info->summary = "Test basic message functions";
109                 info->description = "Test basic message functions";
110                 return AST_TEST_NOT_RUN;
111         case TEST_EXECUTE:
112                 break;
113         }
114
115
116         memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
117
118         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
119
120         ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
121         ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
122
123         data = ao2_alloc(strlen(expected) + 1, NULL);
124         strcpy(data, expected);/* Safe */
125         expected_timestamp = ast_tvnow();
126         uut1 = stasis_message_create_full(type, data, &foreign_eid);
127         uut2 = stasis_message_create_full(type, data, NULL);
128
129         ast_test_validate(test, NULL != uut1);
130         ast_test_validate(test, NULL != uut2);
131         ast_test_validate(test, type == stasis_message_type(uut1));
132         ast_test_validate(test, type == stasis_message_type(uut2));
133         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
134         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
135         ast_test_validate(test, NULL != stasis_message_eid(uut1));
136         ast_test_validate(test, NULL == stasis_message_eid(uut2));
137         ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
138
139         ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
140
141         time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
142         /* 10ms is certainly long enough for the two calls to complete */
143         ast_test_validate(test, time_diff.tv_sec == 0);
144         ast_test_validate(test, time_diff.tv_usec < 10000);
145
146         ao2_ref(uut1, -1);
147         uut1 = NULL;
148         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
149         ao2_ref(uut2, -1);
150         uut2 = NULL;
151         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
152
153         return AST_TEST_PASS;
154 }
155
156 struct consumer {
157         ast_cond_t out;
158         struct stasis_message **messages_rxed;
159         size_t messages_rxed_len;
160         int ignore_subscriptions;
161         int complete;
162 };
163
164 static void consumer_dtor(void *obj)
165 {
166         struct consumer *consumer = obj;
167
168         ast_cond_destroy(&consumer->out);
169
170         while (consumer->messages_rxed_len > 0) {
171                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
172         }
173         ast_free(consumer->messages_rxed);
174         consumer->messages_rxed = NULL;
175 }
176
177 static struct consumer *consumer_create(int ignore_subscriptions)
178 {
179         struct consumer *consumer;
180
181         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
182         if (!consumer) {
183                 return NULL;
184         }
185
186         consumer->ignore_subscriptions = ignore_subscriptions;
187         consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
188         if (!consumer->messages_rxed) {
189                 ao2_cleanup(consumer);
190                 return NULL;
191         }
192
193         ast_cond_init(&consumer->out, NULL);
194
195         return consumer;
196 }
197
198 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
199 {
200         struct consumer *consumer = data;
201         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
202         SCOPED_AO2LOCK(lock, consumer);
203
204         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
205                 ++consumer->messages_rxed_len;
206                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
207                 ast_assert(consumer->messages_rxed != NULL);
208                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
209                 ao2_ref(message, +1);
210         }
211
212         if (stasis_subscription_final_message(sub, message)) {
213                 consumer->complete = 1;
214                 consumer_needs_cleanup = consumer;
215         }
216
217         ast_cond_signal(&consumer->out);
218 }
219
220 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
221 {
222         struct consumer *consumer = data;
223         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
224         SCOPED_AO2LOCK(lock, consumer);
225
226         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
227                 ++consumer->messages_rxed_len;
228                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
229                 ast_assert(consumer->messages_rxed != NULL);
230                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
231                 ao2_ref(message, +1);
232         }
233
234         if (stasis_subscription_final_message(sub, message)) {
235                 consumer->complete = 1;
236                 consumer_needs_cleanup = consumer;
237         }
238 }
239
240 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
241 {
242         struct timeval start = ast_tvnow();
243         struct timespec end = {
244                 .tv_sec = start.tv_sec + 30,
245                 .tv_nsec = start.tv_usec * 1000
246         };
247
248         SCOPED_AO2LOCK(lock, consumer);
249
250         while (consumer->messages_rxed_len < expected_len) {
251                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
252
253                 if (r == ETIMEDOUT) {
254                         break;
255                 }
256                 ast_assert(r == 0); /* Not expecting any othet types of errors */
257         }
258         return consumer->messages_rxed_len;
259 }
260
261 static int consumer_wait_for_completion(struct consumer *consumer)
262 {
263         struct timeval start = ast_tvnow();
264         struct timespec end = {
265                 .tv_sec = start.tv_sec + 3,
266                 .tv_nsec = start.tv_usec * 1000
267         };
268
269         SCOPED_AO2LOCK(lock, consumer);
270
271         while (!consumer->complete) {
272                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
273
274                 if (r == ETIMEDOUT) {
275                         break;
276                 }
277                 ast_assert(r == 0); /* Not expecting any othet types of errors */
278         }
279         return consumer->complete;
280 }
281
282 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
283 {
284         struct timeval start = ast_tvnow();
285         struct timeval diff = {
286                 .tv_sec = 0,
287                 .tv_usec = 100000 /* wait for 100ms */
288         };
289         struct timeval end_tv = ast_tvadd(start, diff);
290         struct timespec end = {
291                 .tv_sec = end_tv.tv_sec,
292                 .tv_nsec = end_tv.tv_usec * 1000
293         };
294
295         SCOPED_AO2LOCK(lock, consumer);
296
297         while (consumer->messages_rxed_len == expected_len) {
298                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
299
300                 if (r == ETIMEDOUT) {
301                         break;
302                 }
303                 ast_assert(r == 0); /* Not expecting any othet types of errors */
304         }
305         return consumer->messages_rxed_len;
306 }
307
308 AST_TEST_DEFINE(subscription_messages)
309 {
310         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
311         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
312         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
313         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
314         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
315         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
316         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
317         int complete;
318         struct stasis_subscription_change *change;
319
320         switch (cmd) {
321         case TEST_INIT:
322                 info->name = __func__;
323                 info->category = test_category;
324                 info->summary = "Test subscribe/unsubscribe messages";
325                 info->description = "Test subscribe/unsubscribe messages";
326                 return AST_TEST_NOT_RUN;
327         case TEST_EXECUTE:
328                 break;
329         }
330
331         topic = stasis_topic_create("TestTopic");
332         ast_test_validate(test, NULL != topic);
333
334         consumer = consumer_create(0);
335         ast_test_validate(test, NULL != consumer);
336
337         uut = stasis_subscribe(topic, consumer_exec, consumer);
338         ast_test_validate(test, NULL != uut);
339         ao2_ref(consumer, +1);
340         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
341
342         uut = stasis_unsubscribe(uut);
343         complete = consumer_wait_for_completion(consumer);
344         ast_test_validate(test, 1 == complete);
345
346         ast_test_validate(test, 2 == consumer->messages_rxed_len);
347         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
348         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
349
350         change = stasis_message_data(consumer->messages_rxed[0]);
351         ast_test_validate(test, topic == change->topic);
352         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
353         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
354
355         change = stasis_message_data(consumer->messages_rxed[1]);
356         ast_test_validate(test, topic == change->topic);
357         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
358         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
359
360         return AST_TEST_PASS;
361 }
362
363 AST_TEST_DEFINE(subscription_pool_messages)
364 {
365         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
366         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
367         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
368         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
369         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
370         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
371         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
372         int complete;
373         struct stasis_subscription_change *change;
374
375         switch (cmd) {
376         case TEST_INIT:
377                 info->name = __func__;
378                 info->category = test_category;
379                 info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
380                 info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
381                 return AST_TEST_NOT_RUN;
382         case TEST_EXECUTE:
383                 break;
384         }
385
386         topic = stasis_topic_create("TestTopic");
387         ast_test_validate(test, NULL != topic);
388
389         consumer = consumer_create(0);
390         ast_test_validate(test, NULL != consumer);
391
392         uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
393         ast_test_validate(test, NULL != uut);
394         ao2_ref(consumer, +1);
395         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
396
397         uut = stasis_unsubscribe(uut);
398         complete = consumer_wait_for_completion(consumer);
399         ast_test_validate(test, 1 == complete);
400
401         ast_test_validate(test, 2 == consumer->messages_rxed_len);
402         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
403         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
404
405         change = stasis_message_data(consumer->messages_rxed[0]);
406         ast_test_validate(test, topic == change->topic);
407         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
408         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
409
410         change = stasis_message_data(consumer->messages_rxed[1]);
411         ast_test_validate(test, topic == change->topic);
412         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
413         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
414
415         return AST_TEST_PASS;
416 }
417
418 AST_TEST_DEFINE(publish)
419 {
420         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
421         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
422         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
423         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
424         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
425         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
426         int actual_len;
427         const char *actual;
428
429         switch (cmd) {
430         case TEST_INIT:
431                 info->name = __func__;
432                 info->category = test_category;
433                 info->summary = "Test publishing";
434                 info->description = "Test publishing";
435                 return AST_TEST_NOT_RUN;
436         case TEST_EXECUTE:
437                 break;
438         }
439
440         topic = stasis_topic_create("TestTopic");
441         ast_test_validate(test, NULL != topic);
442
443         consumer = consumer_create(1);
444         ast_test_validate(test, NULL != consumer);
445
446         uut = stasis_subscribe(topic, consumer_exec, consumer);
447         ast_test_validate(test, NULL != uut);
448         ao2_ref(consumer, +1);
449
450         test_data = ao2_alloc(1, NULL);
451         ast_test_validate(test, NULL != test_data);
452         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
453         test_message = stasis_message_create(test_message_type, test_data);
454
455         stasis_publish(topic, test_message);
456
457         actual_len = consumer_wait_for(consumer, 1);
458         ast_test_validate(test, 1 == actual_len);
459         actual = stasis_message_data(consumer->messages_rxed[0]);
460         ast_test_validate(test, test_data == actual);
461
462         return AST_TEST_PASS;
463 }
464
465 AST_TEST_DEFINE(publish_sync)
466 {
467         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
468         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
469         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
470         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
471         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
472         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
473         int actual_len;
474         const char *actual;
475
476         switch (cmd) {
477         case TEST_INIT:
478                 info->name = __func__;
479                 info->category = test_category;
480                 info->summary = "Test synchronous publishing";
481                 info->description = "Test synchronous publishing";
482                 return AST_TEST_NOT_RUN;
483         case TEST_EXECUTE:
484                 break;
485         }
486
487         topic = stasis_topic_create("TestTopic");
488         ast_test_validate(test, NULL != topic);
489
490         consumer = consumer_create(1);
491         ast_test_validate(test, NULL != consumer);
492
493         uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
494         ast_test_validate(test, NULL != uut);
495         ao2_ref(consumer, +1);
496
497         test_data = ao2_alloc(1, NULL);
498         ast_test_validate(test, NULL != test_data);
499         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
500         test_message = stasis_message_create(test_message_type, test_data);
501
502         stasis_publish_sync(uut, test_message);
503
504         actual_len = consumer->messages_rxed_len;
505         ast_test_validate(test, 1 == actual_len);
506         actual = stasis_message_data(consumer->messages_rxed[0]);
507         ast_test_validate(test, test_data == actual);
508
509         return AST_TEST_PASS;
510 }
511
512 AST_TEST_DEFINE(publish_pool)
513 {
514         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
515         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
516         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
517         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
518         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
519         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
520         int actual_len;
521         const char *actual;
522
523         switch (cmd) {
524         case TEST_INIT:
525                 info->name = __func__;
526                 info->category = test_category;
527                 info->summary = "Test publishing with a threadpool";
528                 info->description = "Test publishing to a subscriber whose\n"
529                         "subscription dictates messages are received through a\n"
530                         "threadpool.";
531                 return AST_TEST_NOT_RUN;
532         case TEST_EXECUTE:
533                 break;
534         }
535
536         topic = stasis_topic_create("TestTopic");
537         ast_test_validate(test, NULL != topic);
538
539         consumer = consumer_create(1);
540         ast_test_validate(test, NULL != consumer);
541
542         uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
543         ast_test_validate(test, NULL != uut);
544         ao2_ref(consumer, +1);
545
546         test_data = ao2_alloc(1, NULL);
547         ast_test_validate(test, NULL != test_data);
548         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
549         test_message = stasis_message_create(test_message_type, test_data);
550
551         stasis_publish(topic, test_message);
552
553         actual_len = consumer_wait_for(consumer, 1);
554         ast_test_validate(test, 1 == actual_len);
555         actual = stasis_message_data(consumer->messages_rxed[0]);
556         ast_test_validate(test, test_data == actual);
557
558         return AST_TEST_PASS;
559 }
560
561 AST_TEST_DEFINE(unsubscribe_stops_messages)
562 {
563         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
564         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
565         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
566         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
567         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
568         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
569         int actual_len;
570
571         switch (cmd) {
572         case TEST_INIT:
573                 info->name = __func__;
574                 info->category = test_category;
575                 info->summary = "Test simple subscriptions";
576                 info->description = "Test simple subscriptions";
577                 return AST_TEST_NOT_RUN;
578         case TEST_EXECUTE:
579                 break;
580         }
581
582         topic = stasis_topic_create("TestTopic");
583         ast_test_validate(test, NULL != topic);
584
585         consumer = consumer_create(1);
586         ast_test_validate(test, NULL != consumer);
587
588         uut = stasis_subscribe(topic, consumer_exec, consumer);
589         ast_test_validate(test, NULL != uut);
590         ao2_ref(consumer, +1);
591
592         uut = stasis_unsubscribe(uut);
593
594         test_data = ao2_alloc(1, NULL);
595         ast_test_validate(test, NULL != test_data);
596         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
597         test_message = stasis_message_create(test_message_type, test_data);
598
599         stasis_publish(topic, test_message);
600
601         actual_len = consumer_should_stay(consumer, 0);
602         ast_test_validate(test, 0 == actual_len);
603
604         return AST_TEST_PASS;
605 }
606
607 AST_TEST_DEFINE(forward)
608 {
609         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
610         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
611
612         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
613         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
614
615         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
616         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
617         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
618
619         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
620         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
621         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
622         int actual_len;
623
624         switch (cmd) {
625         case TEST_INIT:
626                 info->name = __func__;
627                 info->category = test_category;
628                 info->summary = "Test sending events to a parent topic";
629                 info->description = "Test sending events to a parent topic.\n"
630                         "This test creates three topics (one parent, two children)\n"
631                         "and publishes a message to one child, and verifies it's\n"
632                         "only seen by that child and the parent";
633                 return AST_TEST_NOT_RUN;
634         case TEST_EXECUTE:
635                 break;
636         }
637
638         parent_topic = stasis_topic_create("ParentTestTopic");
639         ast_test_validate(test, NULL != parent_topic);
640         topic = stasis_topic_create("TestTopic");
641         ast_test_validate(test, NULL != topic);
642
643         forward_sub = stasis_forward_all(topic, parent_topic);
644         ast_test_validate(test, NULL != forward_sub);
645
646         parent_consumer = consumer_create(1);
647         ast_test_validate(test, NULL != parent_consumer);
648         consumer = consumer_create(1);
649         ast_test_validate(test, NULL != consumer);
650
651         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
652         ast_test_validate(test, NULL != parent_sub);
653         ao2_ref(parent_consumer, +1);
654         sub = stasis_subscribe(topic, consumer_exec, consumer);
655         ast_test_validate(test, NULL != sub);
656         ao2_ref(consumer, +1);
657
658         test_data = ao2_alloc(1, NULL);
659         ast_test_validate(test, NULL != test_data);
660         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
661         test_message = stasis_message_create(test_message_type, test_data);
662
663         stasis_publish(topic, test_message);
664
665         actual_len = consumer_wait_for(consumer, 1);
666         ast_test_validate(test, 1 == actual_len);
667         actual_len = consumer_wait_for(parent_consumer, 1);
668         ast_test_validate(test, 1 == actual_len);
669
670         return AST_TEST_PASS;
671 }
672
673 AST_TEST_DEFINE(interleaving)
674 {
675         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
676         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
677         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
678
679         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
680
681         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
682
683         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
684         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
685         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
686
687         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
688         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
689         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
690
691         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
692
693         int actual_len;
694
695         switch (cmd) {
696         case TEST_INIT:
697                 info->name = __func__;
698                 info->category = test_category;
699                 info->summary = "Test sending interleaved events to a parent topic";
700                 info->description = "Test sending events to a parent topic.\n"
701                         "This test creates three topics (one parent, two children)\n"
702                         "and publishes messages alternately between the children.\n"
703                         "It verifies that the messages are received in the expected\n"
704                         "order.";
705                 return AST_TEST_NOT_RUN;
706         case TEST_EXECUTE:
707                 break;
708         }
709
710         ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
711         ast_test_validate(test, NULL != test_message_type);
712
713         test_data = ao2_alloc(1, NULL);
714         ast_test_validate(test, NULL != test_data);
715
716         test_message1 = stasis_message_create(test_message_type, test_data);
717         ast_test_validate(test, NULL != test_message1);
718         test_message2 = stasis_message_create(test_message_type, test_data);
719         ast_test_validate(test, NULL != test_message2);
720         test_message3 = stasis_message_create(test_message_type, test_data);
721         ast_test_validate(test, NULL != test_message3);
722
723         parent_topic = stasis_topic_create("ParentTestTopic");
724         ast_test_validate(test, NULL != parent_topic);
725         topic1 = stasis_topic_create("Topic1");
726         ast_test_validate(test, NULL != topic1);
727         topic2 = stasis_topic_create("Topic2");
728         ast_test_validate(test, NULL != topic2);
729
730         forward_sub1 = stasis_forward_all(topic1, parent_topic);
731         ast_test_validate(test, NULL != forward_sub1);
732         forward_sub2 = stasis_forward_all(topic2, parent_topic);
733         ast_test_validate(test, NULL != forward_sub2);
734
735         consumer = consumer_create(1);
736         ast_test_validate(test, NULL != consumer);
737
738         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
739         ast_test_validate(test, NULL != sub);
740         ao2_ref(consumer, +1);
741
742         stasis_publish(topic1, test_message1);
743         stasis_publish(topic2, test_message2);
744         stasis_publish(topic1, test_message3);
745
746         actual_len = consumer_wait_for(consumer, 3);
747         ast_test_validate(test, 3 == actual_len);
748
749         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
750         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
751         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
752
753         return AST_TEST_PASS;
754 }
755
756 AST_TEST_DEFINE(subscription_interleaving)
757 {
758         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
759         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
760         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
761
762         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
763
764         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
765
766         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
767         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
768         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
769
770         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
771         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
772         RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
773         RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
774
775         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
776         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
777
778         int actual_len;
779
780         switch (cmd) {
781         case TEST_INIT:
782                 info->name = __func__;
783                 info->category = test_category;
784                 info->summary = "Test sending interleaved events to a parent topic with different subscribers";
785                 info->description = "Test sending events to a parent topic.\n"
786                         "This test creates three topics (one parent, two children)\n"
787                         "and publishes messages alternately between the children.\n"
788                         "It verifies that the messages are received in the expected\n"
789                         "order, for different subscription types: one with a dedicated\n"
790                         "thread, the other on the Stasis threadpool.";
791                 return AST_TEST_NOT_RUN;
792         case TEST_EXECUTE:
793                 break;
794         }
795
796         ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
797         ast_test_validate(test, NULL != test_message_type);
798
799         test_data = ao2_alloc(1, NULL);
800         ast_test_validate(test, NULL != test_data);
801
802         test_message1 = stasis_message_create(test_message_type, test_data);
803         ast_test_validate(test, NULL != test_message1);
804         test_message2 = stasis_message_create(test_message_type, test_data);
805         ast_test_validate(test, NULL != test_message2);
806         test_message3 = stasis_message_create(test_message_type, test_data);
807         ast_test_validate(test, NULL != test_message3);
808
809         parent_topic = stasis_topic_create("ParentTestTopic");
810         ast_test_validate(test, NULL != parent_topic);
811         topic1 = stasis_topic_create("Topic1");
812         ast_test_validate(test, NULL != topic1);
813         topic2 = stasis_topic_create("Topic2");
814         ast_test_validate(test, NULL != topic2);
815
816         forward_sub1 = stasis_forward_all(topic1, parent_topic);
817         ast_test_validate(test, NULL != forward_sub1);
818         forward_sub2 = stasis_forward_all(topic2, parent_topic);
819         ast_test_validate(test, NULL != forward_sub2);
820
821         consumer1 = consumer_create(1);
822         ast_test_validate(test, NULL != consumer1);
823
824         consumer2 = consumer_create(1);
825         ast_test_validate(test, NULL != consumer2);
826
827         sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
828         ast_test_validate(test, NULL != sub1);
829         ao2_ref(consumer1, +1);
830
831         sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
832         ast_test_validate(test, NULL != sub2);
833         ao2_ref(consumer2, +1);
834
835         stasis_publish(topic1, test_message1);
836         stasis_publish(topic2, test_message2);
837         stasis_publish(topic1, test_message3);
838
839         actual_len = consumer_wait_for(consumer1, 3);
840         ast_test_validate(test, 3 == actual_len);
841
842         actual_len = consumer_wait_for(consumer2, 3);
843         ast_test_validate(test, 3 == actual_len);
844
845         ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
846         ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
847         ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
848
849         ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
850         ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
851         ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
852
853         return AST_TEST_PASS;
854 }
855
856 struct cache_test_data {
857         char *id;
858         char *value;
859 };
860
861 static void cache_test_data_dtor(void *obj)
862 {
863         struct cache_test_data *data = obj;
864
865         ast_free(data->id);
866         ast_free(data->value);
867 }
868
869 static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
870 {
871         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
872
873         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
874         if (data == NULL) {
875                 return NULL;
876         }
877
878         ast_assert(name != NULL);
879         ast_assert(value != NULL);
880
881         data->id = ast_strdup(name);
882         data->value = ast_strdup(value);
883         if (!data->id || !data->value) {
884                 return NULL;
885         }
886
887         return stasis_message_create_full(type, data, eid);
888 }
889
890 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
891 {
892         return cache_test_message_create_full(type, name, value, &ast_eid_default);
893 }
894
895 static const char *cache_test_data_id(struct stasis_message *message)
896 {
897         struct cache_test_data *cachable = stasis_message_data(message);
898
899         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
900                 return NULL;
901         }
902         return cachable->id;
903 }
904
905 static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
906 {
907         struct stasis_message *aggregate_snapshot;
908         struct stasis_message *snapshot;
909         struct stasis_message_type *type = NULL;
910         struct cache_test_data *test_data = NULL;
911         int idx;
912         int accumulated = 0;
913         char aggregate_str[30];
914
915         /* Accumulate the aggregate value. */
916         snapshot = stasis_cache_entry_get_local(entry);
917         if (snapshot) {
918                 type = stasis_message_type(snapshot);
919                 test_data = stasis_message_data(snapshot);
920                 accumulated += atoi(test_data->value);
921         }
922         for (idx = 0; ; ++idx) {
923                 snapshot = stasis_cache_entry_get_remote(entry, idx);
924                 if (!snapshot) {
925                         break;
926                 }
927
928                 type = stasis_message_type(snapshot);
929                 test_data = stasis_message_data(snapshot);
930                 accumulated += atoi(test_data->value);
931         }
932
933         if (!test_data) {
934                 /* There are no test entries cached.  Delete the aggregate. */
935                 return NULL;
936         }
937
938         snapshot = stasis_cache_entry_get_aggregate(entry);
939         if (snapshot) {
940                 type = stasis_message_type(snapshot);
941                 test_data = stasis_message_data(snapshot);
942                 if (accumulated == atoi(test_data->value)) {
943                         /* Aggregate test entry did not change. */
944                         return ao2_bump(snapshot);
945                 }
946         }
947
948         snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
949         aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
950         if (!aggregate_snapshot) {
951                 /* Bummer.  We have to keep the old aggregate snapshot. */
952                 ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
953                 return ao2_bump(snapshot);
954         }
955
956         return aggregate_snapshot;
957 }
958
959 static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
960 {
961         stasis_publish(topic, aggregate);
962 }
963
964 static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
965 {
966         RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
967         struct cache_test_data *test_data;
968
969         aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
970         if (!aggregate) {
971                 /* No aggregate, return true if given no value. */
972                 return !value;
973         }
974
975         /* Return true if the given value matches the aggregate value. */
976         test_data = stasis_message_data(aggregate);
977         return value && !strcmp(value, test_data->value);
978 }
979
980 AST_TEST_DEFINE(cache_filter)
981 {
982         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
983         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
984         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
985         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
986         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
987         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
988         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
989         int actual_len;
990
991         switch (cmd) {
992         case TEST_INIT:
993                 info->name = __func__;
994                 info->category = test_category;
995                 info->summary = "Test caching topics only forward cache_update messages.";
996                 info->description = "Test caching topics only forward cache_update messages.";
997                 return AST_TEST_NOT_RUN;
998         case TEST_EXECUTE:
999                 break;
1000         }
1001
1002         ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1003         ast_test_validate(test, NULL != non_cache_type);
1004         topic = stasis_topic_create("SomeTopic");
1005         ast_test_validate(test, NULL != topic);
1006         cache = stasis_cache_create(cache_test_data_id);
1007         ast_test_validate(test, NULL != cache);
1008         caching_topic = stasis_caching_topic_create(topic, cache);
1009         ast_test_validate(test, NULL != caching_topic);
1010         consumer = consumer_create(1);
1011         ast_test_validate(test, NULL != consumer);
1012         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1013         ast_test_validate(test, NULL != sub);
1014         ao2_ref(consumer, +1);
1015
1016         test_message = cache_test_message_create(non_cache_type, "1", "1");
1017         ast_test_validate(test, NULL != test_message);
1018
1019         stasis_publish(topic, test_message);
1020
1021         actual_len = consumer_should_stay(consumer, 0);
1022         ast_test_validate(test, 0 == actual_len);
1023
1024         return AST_TEST_PASS;
1025 }
1026
1027 AST_TEST_DEFINE(cache)
1028 {
1029         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1030         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1031         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1032         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1033         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1034         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1035         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1036         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1037         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1038         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1039         int actual_len;
1040         struct stasis_cache_update *actual_update;
1041
1042         switch (cmd) {
1043         case TEST_INIT:
1044                 info->name = __func__;
1045                 info->category = test_category;
1046                 info->summary = "Test passing messages through cache topic unscathed.";
1047                 info->description = "Test passing messages through cache topic unscathed.";
1048                 return AST_TEST_NOT_RUN;
1049         case TEST_EXECUTE:
1050                 break;
1051         }
1052
1053         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1054         ast_test_validate(test, NULL != cache_type);
1055         topic = stasis_topic_create("SomeTopic");
1056         ast_test_validate(test, NULL != topic);
1057         cache = stasis_cache_create(cache_test_data_id);
1058         ast_test_validate(test, NULL != cache);
1059         caching_topic = stasis_caching_topic_create(topic, cache);
1060         ast_test_validate(test, NULL != caching_topic);
1061         consumer = consumer_create(1);
1062         ast_test_validate(test, NULL != consumer);
1063         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1064         ast_test_validate(test, NULL != sub);
1065         ao2_ref(consumer, +1);
1066
1067         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1068         ast_test_validate(test, NULL != test_message1_1);
1069         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1070         ast_test_validate(test, NULL != test_message2_1);
1071
1072         /* Post a couple of snapshots */
1073         stasis_publish(topic, test_message1_1);
1074         stasis_publish(topic, test_message2_1);
1075         actual_len = consumer_wait_for(consumer, 2);
1076         ast_test_validate(test, 2 == actual_len);
1077
1078         /* Check for new snapshot messages */
1079         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
1080         actual_update = stasis_message_data(consumer->messages_rxed[0]);
1081         ast_test_validate(test, NULL == actual_update->old_snapshot);
1082         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
1083         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
1084         /* stasis_cache_get returned a ref, so unref test_message1_1 */
1085         ao2_ref(test_message1_1, -1);
1086
1087         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
1088         actual_update = stasis_message_data(consumer->messages_rxed[1]);
1089         ast_test_validate(test, NULL == actual_update->old_snapshot);
1090         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
1091         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
1092         /* stasis_cache_get returned a ref, so unref test_message2_1 */
1093         ao2_ref(test_message2_1, -1);
1094
1095         /* Update snapshot 2 */
1096         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1097         ast_test_validate(test, NULL != test_message2_2);
1098         stasis_publish(topic, test_message2_2);
1099
1100         actual_len = consumer_wait_for(consumer, 3);
1101         ast_test_validate(test, 3 == actual_len);
1102
1103         actual_update = stasis_message_data(consumer->messages_rxed[2]);
1104         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
1105         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
1106         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
1107         /* stasis_cache_get returned a ref, so unref test_message2_2 */
1108         ao2_ref(test_message2_2, -1);
1109
1110         /* Clear snapshot 1 */
1111         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1112         ast_test_validate(test, NULL != test_message1_clear);
1113         stasis_publish(topic, test_message1_clear);
1114
1115         actual_len = consumer_wait_for(consumer, 4);
1116         ast_test_validate(test, 4 == actual_len);
1117
1118         actual_update = stasis_message_data(consumer->messages_rxed[3]);
1119         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
1120         ast_test_validate(test, NULL == actual_update->new_snapshot);
1121         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
1122
1123         return AST_TEST_PASS;
1124 }
1125
1126 AST_TEST_DEFINE(cache_dump)
1127 {
1128         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1129         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1130         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1131         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1132         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1133         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1134         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1135         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1136         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1137         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1138         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1139         int actual_len;
1140         struct ao2_iterator i;
1141         void *obj;
1142
1143         switch (cmd) {
1144         case TEST_INIT:
1145                 info->name = __func__;
1146                 info->category = test_category;
1147                 info->summary = "Test cache dump routines.";
1148                 info->description = "Test cache dump routines.";
1149                 return AST_TEST_NOT_RUN;
1150         case TEST_EXECUTE:
1151                 break;
1152         }
1153
1154         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1155         ast_test_validate(test, NULL != cache_type);
1156         topic = stasis_topic_create("SomeTopic");
1157         ast_test_validate(test, NULL != topic);
1158         cache = stasis_cache_create(cache_test_data_id);
1159         ast_test_validate(test, NULL != cache);
1160         caching_topic = stasis_caching_topic_create(topic, cache);
1161         ast_test_validate(test, NULL != caching_topic);
1162         consumer = consumer_create(1);
1163         ast_test_validate(test, NULL != consumer);
1164         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1165         ast_test_validate(test, NULL != sub);
1166         ao2_ref(consumer, +1);
1167
1168         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1169         ast_test_validate(test, NULL != test_message1_1);
1170         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1171         ast_test_validate(test, NULL != test_message2_1);
1172
1173         /* Post a couple of snapshots */
1174         stasis_publish(topic, test_message1_1);
1175         stasis_publish(topic, test_message2_1);
1176         actual_len = consumer_wait_for(consumer, 2);
1177         ast_test_validate(test, 2 == actual_len);
1178
1179         /* Check the cache */
1180         ao2_cleanup(cache_dump);
1181         cache_dump = stasis_cache_dump(cache, NULL);
1182         ast_test_validate(test, NULL != cache_dump);
1183         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1184         i = ao2_iterator_init(cache_dump, 0);
1185         while ((obj = ao2_iterator_next(&i))) {
1186                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1187                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1188         }
1189         ao2_iterator_destroy(&i);
1190
1191         /* Update snapshot 2 */
1192         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1193         ast_test_validate(test, NULL != test_message2_2);
1194         stasis_publish(topic, test_message2_2);
1195
1196         actual_len = consumer_wait_for(consumer, 3);
1197         ast_test_validate(test, 3 == actual_len);
1198
1199         /* Check the cache */
1200         ao2_cleanup(cache_dump);
1201         cache_dump = stasis_cache_dump(cache, NULL);
1202         ast_test_validate(test, NULL != cache_dump);
1203         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1204         i = ao2_iterator_init(cache_dump, 0);
1205         while ((obj = ao2_iterator_next(&i))) {
1206                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1207                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1208         }
1209         ao2_iterator_destroy(&i);
1210
1211         /* Clear snapshot 1 */
1212         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1213         ast_test_validate(test, NULL != test_message1_clear);
1214         stasis_publish(topic, test_message1_clear);
1215
1216         actual_len = consumer_wait_for(consumer, 4);
1217         ast_test_validate(test, 4 == actual_len);
1218
1219         /* Check the cache */
1220         ao2_cleanup(cache_dump);
1221         cache_dump = stasis_cache_dump(cache, NULL);
1222         ast_test_validate(test, NULL != cache_dump);
1223         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1224         i = ao2_iterator_init(cache_dump, 0);
1225         while ((obj = ao2_iterator_next(&i))) {
1226                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1227                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1228         }
1229         ao2_iterator_destroy(&i);
1230
1231         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
1232         ao2_cleanup(cache_dump);
1233         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
1234         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
1235
1236         return AST_TEST_PASS;
1237 }
1238
1239 AST_TEST_DEFINE(cache_eid_aggregate)
1240 {
1241         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1242         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1243         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1244         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1245         RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
1246         RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
1247         RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
1248         RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
1249         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1250         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1251         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1252         RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
1253         RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
1254         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1255         RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
1256         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1257         int actual_len;
1258         struct ao2_iterator i;
1259         void *obj;
1260         struct ast_eid foreign_eid1;
1261         struct ast_eid foreign_eid2;
1262
1263         switch (cmd) {
1264         case TEST_INIT:
1265                 info->name = __func__;
1266                 info->category = test_category;
1267                 info->summary = "Test cache eid and aggregate support.";
1268                 info->description = "Test cache eid and aggregate support.";
1269                 return AST_TEST_NOT_RUN;
1270         case TEST_EXECUTE:
1271                 break;
1272         }
1273
1274         memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
1275         memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
1276
1277         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1278         ast_test_validate(test, NULL != cache_type);
1279
1280         topic = stasis_topic_create("SomeTopic");
1281         ast_test_validate(test, NULL != topic);
1282
1283         /* To consume events published to the topic. */
1284         topic_consumer = consumer_create(1);
1285         ast_test_validate(test, NULL != topic_consumer);
1286
1287         topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1288         ast_test_validate(test, NULL != topic_sub);
1289         ao2_ref(topic_consumer, +1);
1290
1291         cache = stasis_cache_create_full(cache_test_data_id,
1292                 cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
1293         ast_test_validate(test, NULL != cache);
1294
1295         caching_topic = stasis_caching_topic_create(topic, cache);
1296         ast_test_validate(test, NULL != caching_topic);
1297
1298         /* To consume update events published to the caching_topic. */
1299         cache_consumer = consumer_create(1);
1300         ast_test_validate(test, NULL != cache_consumer);
1301
1302         cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
1303         ast_test_validate(test, NULL != cache_sub);
1304         ao2_ref(cache_consumer, +1);
1305
1306         /* Create test messages. */
1307         test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
1308         ast_test_validate(test, NULL != test_message1_1);
1309         test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
1310         ast_test_validate(test, NULL != test_message2_1);
1311         test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
1312         ast_test_validate(test, NULL != test_message2_2);
1313         test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
1314         ast_test_validate(test, NULL != test_message2_3);
1315         test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
1316         ast_test_validate(test, NULL != test_message2_4);
1317
1318         /* Post some snapshots */
1319         stasis_publish(topic, test_message1_1);
1320         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
1321         stasis_publish(topic, test_message2_1);
1322         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
1323         stasis_publish(topic, test_message2_2);
1324         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
1325
1326         actual_len = consumer_wait_for(cache_consumer, 6);
1327         ast_test_validate(test, 6 == actual_len);
1328         actual_len = consumer_wait_for(topic_consumer, 6);
1329         ast_test_validate(test, 6 == actual_len);
1330
1331         /* Check the cache */
1332         ao2_cleanup(cache_dump);
1333         cache_dump = stasis_cache_dump_all(cache, NULL);
1334         ast_test_validate(test, NULL != cache_dump);
1335         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1336         i = ao2_iterator_init(cache_dump, 0);
1337         while ((obj = ao2_iterator_next(&i))) {
1338                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1339
1340                 ast_test_validate(test,
1341                         actual_cache_entry == test_message1_1
1342                         || actual_cache_entry == test_message2_1
1343                         || actual_cache_entry == test_message2_2);
1344         }
1345         ao2_iterator_destroy(&i);
1346
1347         /* Check the local cached items */
1348         ao2_cleanup(cache_dump);
1349         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
1350         ast_test_validate(test, NULL != cache_dump);
1351         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1352         i = ao2_iterator_init(cache_dump, 0);
1353         while ((obj = ao2_iterator_next(&i))) {
1354                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1355
1356                 ast_test_validate(test,
1357                         actual_cache_entry == test_message1_1
1358                         || actual_cache_entry == test_message2_1);
1359         }
1360         ao2_iterator_destroy(&i);
1361
1362         /* Post snapshot 2 from another eid. */
1363         stasis_publish(topic, test_message2_3);
1364         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
1365
1366         actual_len = consumer_wait_for(cache_consumer, 8);
1367         ast_test_validate(test, 8 == actual_len);
1368         actual_len = consumer_wait_for(topic_consumer, 8);
1369         ast_test_validate(test, 8 == actual_len);
1370
1371         /* Check the cache */
1372         ao2_cleanup(cache_dump);
1373         cache_dump = stasis_cache_dump_all(cache, NULL);
1374         ast_test_validate(test, NULL != cache_dump);
1375         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1376         i = ao2_iterator_init(cache_dump, 0);
1377         while ((obj = ao2_iterator_next(&i))) {
1378                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1379
1380                 ast_test_validate(test,
1381                         actual_cache_entry == test_message1_1
1382                         || actual_cache_entry == test_message2_1
1383                         || actual_cache_entry == test_message2_2
1384                         || actual_cache_entry == test_message2_3);
1385         }
1386         ao2_iterator_destroy(&i);
1387
1388         /* Check the remote cached items */
1389         ao2_cleanup(cache_dump);
1390         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
1391         ast_test_validate(test, NULL != cache_dump);
1392         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1393         i = ao2_iterator_init(cache_dump, 0);
1394         while ((obj = ao2_iterator_next(&i))) {
1395                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1396
1397                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1398         }
1399         ao2_iterator_destroy(&i);
1400
1401         /* Post snapshot 2 from a repeated eid. */
1402         stasis_publish(topic, test_message2_4);
1403         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
1404
1405         actual_len = consumer_wait_for(cache_consumer, 10);
1406         ast_test_validate(test, 10 == actual_len);
1407         actual_len = consumer_wait_for(topic_consumer, 10);
1408         ast_test_validate(test, 10 == actual_len);
1409
1410         /* Check the cache */
1411         ao2_cleanup(cache_dump);
1412         cache_dump = stasis_cache_dump_all(cache, NULL);
1413         ast_test_validate(test, NULL != cache_dump);
1414         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1415         i = ao2_iterator_init(cache_dump, 0);
1416         while ((obj = ao2_iterator_next(&i))) {
1417                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1418
1419                 ast_test_validate(test,
1420                         actual_cache_entry == test_message1_1
1421                         || actual_cache_entry == test_message2_1
1422                         || actual_cache_entry == test_message2_2
1423                         || actual_cache_entry == test_message2_4);
1424         }
1425         ao2_iterator_destroy(&i);
1426
1427         /* Check all snapshot 2 cache entries. */
1428         ao2_cleanup(cache_dump);
1429         cache_dump = stasis_cache_get_all(cache, cache_type, "2");
1430         ast_test_validate(test, NULL != cache_dump);
1431         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1432         i = ao2_iterator_init(cache_dump, 0);
1433         while ((obj = ao2_iterator_next(&i))) {
1434                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1435
1436                 ast_test_validate(test,
1437                         actual_cache_entry == test_message2_1
1438                         || actual_cache_entry == test_message2_2
1439                         || actual_cache_entry == test_message2_4);
1440         }
1441         ao2_iterator_destroy(&i);
1442
1443         /* Clear snapshot 1 */
1444         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1445         ast_test_validate(test, NULL != test_message1_clear);
1446         stasis_publish(topic, test_message1_clear);
1447         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
1448
1449         actual_len = consumer_wait_for(cache_consumer, 12);
1450         ast_test_validate(test, 12 == actual_len);
1451         actual_len = consumer_wait_for(topic_consumer, 11);
1452         ast_test_validate(test, 11 == actual_len);
1453
1454         /* Check the cache */
1455         ao2_cleanup(cache_dump);
1456         cache_dump = stasis_cache_dump_all(cache, NULL);
1457         ast_test_validate(test, NULL != cache_dump);
1458         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1459         i = ao2_iterator_init(cache_dump, 0);
1460         while ((obj = ao2_iterator_next(&i))) {
1461                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1462
1463                 ast_test_validate(test,
1464                         actual_cache_entry == test_message2_1
1465                         || actual_cache_entry == test_message2_2
1466                         || actual_cache_entry == test_message2_4);
1467         }
1468         ao2_iterator_destroy(&i);
1469
1470         /* Clear snapshot 2 from a remote eid */
1471         test_message2_clear = stasis_cache_clear_create(test_message2_2);
1472         ast_test_validate(test, NULL != test_message2_clear);
1473         stasis_publish(topic, test_message2_clear);
1474         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
1475
1476         actual_len = consumer_wait_for(cache_consumer, 14);
1477         ast_test_validate(test, 14 == actual_len);
1478         actual_len = consumer_wait_for(topic_consumer, 13);
1479         ast_test_validate(test, 13 == actual_len);
1480
1481         /* Check the cache */
1482         ao2_cleanup(cache_dump);
1483         cache_dump = stasis_cache_dump_all(cache, NULL);
1484         ast_test_validate(test, NULL != cache_dump);
1485         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1486         i = ao2_iterator_init(cache_dump, 0);
1487         while ((obj = ao2_iterator_next(&i))) {
1488                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1489
1490                 ast_test_validate(test,
1491                         actual_cache_entry == test_message2_1
1492                         || actual_cache_entry == test_message2_4);
1493         }
1494         ao2_iterator_destroy(&i);
1495
1496         return AST_TEST_PASS;
1497 }
1498
1499 AST_TEST_DEFINE(router)
1500 {
1501         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1502         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1503         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1504         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1505         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1506         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1507         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1508         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1509         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1510         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1511         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1512         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1513         int actual_len, ret;
1514         struct stasis_message *actual;
1515
1516         switch (cmd) {
1517         case TEST_INIT:
1518                 info->name = __func__;
1519                 info->category = test_category;
1520                 info->summary = "Test simple message routing";
1521                 info->description = "Test simple message routing";
1522                 return AST_TEST_NOT_RUN;
1523         case TEST_EXECUTE:
1524                 break;
1525         }
1526
1527         topic = stasis_topic_create("TestTopic");
1528         ast_test_validate(test, NULL != topic);
1529
1530         consumer1 = consumer_create(1);
1531         ast_test_validate(test, NULL != consumer1);
1532         consumer2 = consumer_create(1);
1533         ast_test_validate(test, NULL != consumer2);
1534         consumer3 = consumer_create(1);
1535         ast_test_validate(test, NULL != consumer3);
1536
1537         ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1538         ast_test_validate(test, NULL != test_message_type1);
1539         ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1540         ast_test_validate(test, NULL != test_message_type2);
1541         ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1542         ast_test_validate(test, NULL != test_message_type3);
1543
1544         uut = stasis_message_router_create(topic);
1545         ast_test_validate(test, NULL != uut);
1546
1547         ret = stasis_message_router_add(
1548                 uut, test_message_type1, consumer_exec, consumer1);
1549         ast_test_validate(test, 0 == ret);
1550         ao2_ref(consumer1, +1);
1551         ret = stasis_message_router_add(
1552                 uut, test_message_type2, consumer_exec, consumer2);
1553         ast_test_validate(test, 0 == ret);
1554         ao2_ref(consumer2, +1);
1555         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1556         ast_test_validate(test, 0 == ret);
1557         ao2_ref(consumer3, +1);
1558
1559         test_data = ao2_alloc(1, NULL);
1560         ast_test_validate(test, NULL != test_data);
1561         test_message1 = stasis_message_create(test_message_type1, test_data);
1562         ast_test_validate(test, NULL != test_message1);
1563         test_message2 = stasis_message_create(test_message_type2, test_data);
1564         ast_test_validate(test, NULL != test_message2);
1565         test_message3 = stasis_message_create(test_message_type3, test_data);
1566         ast_test_validate(test, NULL != test_message3);
1567
1568         stasis_publish(topic, test_message1);
1569         stasis_publish(topic, test_message2);
1570         stasis_publish(topic, test_message3);
1571
1572         actual_len = consumer_wait_for(consumer1, 1);
1573         ast_test_validate(test, 1 == actual_len);
1574         actual_len = consumer_wait_for(consumer2, 1);
1575         ast_test_validate(test, 1 == actual_len);
1576         actual_len = consumer_wait_for(consumer3, 1);
1577         ast_test_validate(test, 1 == actual_len);
1578
1579         actual = consumer1->messages_rxed[0];
1580         ast_test_validate(test, test_message1 == actual);
1581
1582         actual = consumer2->messages_rxed[0];
1583         ast_test_validate(test, test_message2 == actual);
1584
1585         actual = consumer3->messages_rxed[0];
1586         ast_test_validate(test, test_message3 == actual);
1587
1588         /* consumer1 and consumer2 do not get the final message. */
1589         ao2_cleanup(consumer1);
1590         ao2_cleanup(consumer2);
1591
1592         return AST_TEST_PASS;
1593 }
1594
1595 AST_TEST_DEFINE(router_pool)
1596 {
1597         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1598         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1599         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1600         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1601         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1602         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1603         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1604         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1605         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1606         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1607         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1608         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1609         int actual_len, ret;
1610         struct stasis_message *actual;
1611
1612         switch (cmd) {
1613         case TEST_INIT:
1614                 info->name = __func__;
1615                 info->category = test_category;
1616                 info->summary = "Test message routing via threadpool";
1617                 info->description = "Test simple message routing when\n"
1618                         "the subscriptions dictate usage of the Stasis\n"
1619                         "threadpool.";
1620                 return AST_TEST_NOT_RUN;
1621         case TEST_EXECUTE:
1622                 break;
1623         }
1624
1625         topic = stasis_topic_create("TestTopic");
1626         ast_test_validate(test, NULL != topic);
1627
1628         consumer1 = consumer_create(1);
1629         ast_test_validate(test, NULL != consumer1);
1630         consumer2 = consumer_create(1);
1631         ast_test_validate(test, NULL != consumer2);
1632         consumer3 = consumer_create(1);
1633         ast_test_validate(test, NULL != consumer3);
1634
1635         ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1636         ast_test_validate(test, NULL != test_message_type1);
1637         ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1638         ast_test_validate(test, NULL != test_message_type2);
1639         ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1640         ast_test_validate(test, NULL != test_message_type3);
1641
1642         uut = stasis_message_router_create_pool(topic);
1643         ast_test_validate(test, NULL != uut);
1644
1645         ret = stasis_message_router_add(
1646                 uut, test_message_type1, consumer_exec, consumer1);
1647         ast_test_validate(test, 0 == ret);
1648         ao2_ref(consumer1, +1);
1649         ret = stasis_message_router_add(
1650                 uut, test_message_type2, consumer_exec, consumer2);
1651         ast_test_validate(test, 0 == ret);
1652         ao2_ref(consumer2, +1);
1653         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1654         ast_test_validate(test, 0 == ret);
1655         ao2_ref(consumer3, +1);
1656
1657         test_data = ao2_alloc(1, NULL);
1658         ast_test_validate(test, NULL != test_data);
1659         test_message1 = stasis_message_create(test_message_type1, test_data);
1660         ast_test_validate(test, NULL != test_message1);
1661         test_message2 = stasis_message_create(test_message_type2, test_data);
1662         ast_test_validate(test, NULL != test_message2);
1663         test_message3 = stasis_message_create(test_message_type3, test_data);
1664         ast_test_validate(test, NULL != test_message3);
1665
1666         stasis_publish(topic, test_message1);
1667         stasis_publish(topic, test_message2);
1668         stasis_publish(topic, test_message3);
1669
1670         actual_len = consumer_wait_for(consumer1, 1);
1671         ast_test_validate(test, 1 == actual_len);
1672         actual_len = consumer_wait_for(consumer2, 1);
1673         ast_test_validate(test, 1 == actual_len);
1674         actual_len = consumer_wait_for(consumer3, 1);
1675         ast_test_validate(test, 1 == actual_len);
1676
1677         actual = consumer1->messages_rxed[0];
1678         ast_test_validate(test, test_message1 == actual);
1679
1680         actual = consumer2->messages_rxed[0];
1681         ast_test_validate(test, test_message2 == actual);
1682
1683         actual = consumer3->messages_rxed[0];
1684         ast_test_validate(test, test_message3 == actual);
1685
1686         /* consumer1 and consumer2 do not get the final message. */
1687         ao2_cleanup(consumer1);
1688         ao2_cleanup(consumer2);
1689
1690         return AST_TEST_PASS;
1691 }
1692
1693 static const char *cache_simple(struct stasis_message *message)
1694 {
1695         const char *type_name =
1696                 stasis_message_type_name(stasis_message_type(message));
1697         if (!ast_begins_with(type_name, "Cache")) {
1698                 return NULL;
1699         }
1700
1701         return "cached";
1702 }
1703
1704 AST_TEST_DEFINE(router_cache_updates)
1705 {
1706         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1707         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1708         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1709         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1710         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1711         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1712         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1713         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1714         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1715         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1716         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1717         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1718         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1719         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1720         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1721         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1722         struct stasis_cache_update *update;
1723         int actual_len, ret;
1724         struct stasis_message *actual;
1725
1726         switch (cmd) {
1727         case TEST_INIT:
1728                 info->name = __func__;
1729                 info->category = test_category;
1730                 info->summary = "Test special handling cache_update messages";
1731                 info->description = "Test special handling cache_update messages";
1732                 return AST_TEST_NOT_RUN;
1733         case TEST_EXECUTE:
1734                 break;
1735         }
1736
1737         topic = stasis_topic_create("TestTopic");
1738         ast_test_validate(test, NULL != topic);
1739
1740         cache = stasis_cache_create(cache_simple);
1741         ast_test_validate(test, NULL != cache);
1742         caching_topic = stasis_caching_topic_create(topic, cache);
1743         ast_test_validate(test, NULL != caching_topic);
1744
1745         consumer1 = consumer_create(1);
1746         ast_test_validate(test, NULL != consumer1);
1747         consumer2 = consumer_create(1);
1748         ast_test_validate(test, NULL != consumer2);
1749         consumer3 = consumer_create(1);
1750         ast_test_validate(test, NULL != consumer3);
1751
1752         ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1753         ast_test_validate(test, NULL != test_message_type1);
1754         ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1755         ast_test_validate(test, NULL != test_message_type2);
1756         ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1757         ast_test_validate(test, NULL != test_message_type3);
1758
1759         uut = stasis_message_router_create(
1760                 stasis_caching_get_topic(caching_topic));
1761         ast_test_validate(test, NULL != uut);
1762
1763         ret = stasis_message_router_add_cache_update(
1764                 uut, test_message_type1, consumer_exec, consumer1);
1765         ast_test_validate(test, 0 == ret);
1766         ao2_ref(consumer1, +1);
1767         ret = stasis_message_router_add(
1768                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1769         ast_test_validate(test, 0 == ret);
1770         ao2_ref(consumer2, +1);
1771         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1772         ast_test_validate(test, 0 == ret);
1773         ao2_ref(consumer3, +1);
1774
1775         test_data = ao2_alloc(1, NULL);
1776         ast_test_validate(test, NULL != test_data);
1777         test_message1 = stasis_message_create(test_message_type1, test_data);
1778         ast_test_validate(test, NULL != test_message1);
1779         test_message2 = stasis_message_create(test_message_type2, test_data);
1780         ast_test_validate(test, NULL != test_message2);
1781         test_message3 = stasis_message_create(test_message_type3, test_data);
1782         ast_test_validate(test, NULL != test_message3);
1783
1784         stasis_publish(topic, test_message1);
1785         stasis_publish(topic, test_message2);
1786         stasis_publish(topic, test_message3);
1787
1788         actual_len = consumer_wait_for(consumer1, 1);
1789         ast_test_validate(test, 1 == actual_len);
1790         actual_len = consumer_wait_for(consumer2, 1);
1791         ast_test_validate(test, 1 == actual_len);
1792         /* Uncacheable message should not be passed through */
1793         actual_len = consumer_should_stay(consumer3, 0);
1794         ast_test_validate(test, 0 == actual_len);
1795
1796         actual = consumer1->messages_rxed[0];
1797         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1798         update = stasis_message_data(actual);
1799         ast_test_validate(test, test_message_type1 == update->type);
1800         ast_test_validate(test, test_message1 == update->new_snapshot);
1801
1802         actual = consumer2->messages_rxed[0];
1803         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1804         update = stasis_message_data(actual);
1805         ast_test_validate(test, test_message_type2 == update->type);
1806         ast_test_validate(test, test_message2 == update->new_snapshot);
1807
1808         /* consumer1 and consumer2 do not get the final message. */
1809         ao2_cleanup(consumer1);
1810         ao2_cleanup(consumer2);
1811
1812         return AST_TEST_PASS;
1813 }
1814
1815 AST_TEST_DEFINE(no_to_json)
1816 {
1817         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1818         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1819         RAII_VAR(char *, data, NULL, ao2_cleanup);
1820         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1821         char *expected = "SomeData";
1822
1823         switch (cmd) {
1824         case TEST_INIT:
1825                 info->name = __func__;
1826                 info->category = test_category;
1827                 info->summary = "Test message to_json function";
1828                 info->description = "Test message to_json function";
1829                 return AST_TEST_NOT_RUN;
1830         case TEST_EXECUTE:
1831                 break;
1832         }
1833
1834         /* Test NULL */
1835         actual = stasis_message_to_json(NULL, NULL);
1836         ast_test_validate(test, NULL == actual);
1837
1838         /* Test message with NULL to_json function */
1839         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1840
1841         data = ao2_alloc(strlen(expected) + 1, NULL);
1842         strcpy(data, expected);
1843         uut = stasis_message_create(type, data);
1844         ast_test_validate(test, NULL != uut);
1845
1846         actual = stasis_message_to_json(uut, NULL);
1847         ast_test_validate(test, NULL == actual);
1848
1849         return AST_TEST_PASS;
1850 }
1851
1852 AST_TEST_DEFINE(to_json)
1853 {
1854         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1855         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1856         RAII_VAR(char *, data, NULL, ao2_cleanup);
1857         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1858         const char *expected_text = "SomeData";
1859         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1860
1861         switch (cmd) {
1862         case TEST_INIT:
1863                 info->name = __func__;
1864                 info->category = test_category;
1865                 info->summary = "Test message to_json function when NULL";
1866                 info->description = "Test message to_json function when NULL";
1867                 return AST_TEST_NOT_RUN;
1868         case TEST_EXECUTE:
1869                 break;
1870         }
1871
1872         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1873
1874         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1875         strcpy(data, expected_text);
1876         uut = stasis_message_create(type, data);
1877         ast_test_validate(test, NULL != uut);
1878
1879         expected = ast_json_string_create(expected_text);
1880         actual = stasis_message_to_json(uut, NULL);
1881         ast_test_validate(test, ast_json_equal(expected, actual));
1882
1883         return AST_TEST_PASS;
1884 }
1885
1886 AST_TEST_DEFINE(no_to_ami)
1887 {
1888         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1889         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1890         RAII_VAR(char *, data, NULL, ao2_cleanup);
1891         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1892         char *expected = "SomeData";
1893
1894         switch (cmd) {
1895         case TEST_INIT:
1896                 info->name = __func__;
1897                 info->category = test_category;
1898                 info->summary = "Test message to_ami function when NULL";
1899                 info->description = "Test message to_ami function when NULL";
1900                 return AST_TEST_NOT_RUN;
1901         case TEST_EXECUTE:
1902                 break;
1903         }
1904
1905         /* Test NULL */
1906         actual = stasis_message_to_ami(NULL);
1907         ast_test_validate(test, NULL == actual);
1908
1909         /* Test message with NULL to_ami function */
1910         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1911
1912         data = ao2_alloc(strlen(expected) + 1, NULL);
1913         strcpy(data, expected);
1914         uut = stasis_message_create(type, data);
1915         ast_test_validate(test, NULL != uut);
1916
1917         actual = stasis_message_to_ami(uut);
1918         ast_test_validate(test, NULL == actual);
1919
1920         return AST_TEST_PASS;
1921 }
1922
1923 AST_TEST_DEFINE(to_ami)
1924 {
1925         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1926         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1927         RAII_VAR(char *, data, NULL, ao2_cleanup);
1928         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1929         const char *expected_text = "SomeData";
1930         const char *expected = "Message: SomeData\r\n";
1931
1932         switch (cmd) {
1933         case TEST_INIT:
1934                 info->name = __func__;
1935                 info->category = test_category;
1936                 info->summary = "Test message to_ami function";
1937                 info->description = "Test message to_ami function";
1938                 return AST_TEST_NOT_RUN;
1939         case TEST_EXECUTE:
1940                 break;
1941         }
1942
1943         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1944
1945         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1946         strcpy(data, expected_text);
1947         uut = stasis_message_create(type, data);
1948         ast_test_validate(test, NULL != uut);
1949
1950         actual = stasis_message_to_ami(uut);
1951         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1952
1953         return AST_TEST_PASS;
1954 }
1955
1956 static void noop(void *data, struct stasis_subscription *sub,
1957         struct stasis_message *message)
1958 {
1959         /* no-op */
1960 }
1961
1962 AST_TEST_DEFINE(dtor_order)
1963 {
1964         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1965         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1966
1967         switch (cmd) {
1968         case TEST_INIT:
1969                 info->name = __func__;
1970                 info->category = test_category;
1971                 info->summary = "Test that destruction order doesn't bomb stuff";
1972                 info->description = "Test that destruction order doesn't bomb stuff";
1973                 return AST_TEST_NOT_RUN;
1974         case TEST_EXECUTE:
1975                 break;
1976         }
1977
1978         topic = stasis_topic_create("test-topic");
1979         ast_test_validate(test, NULL != topic);
1980
1981         sub = stasis_subscribe(topic, noop, NULL);
1982         ast_test_validate(test, NULL != sub);
1983
1984         /* With any luck, this won't completely blow everything up */
1985         ao2_cleanup(topic);
1986         stasis_unsubscribe(sub);
1987
1988         /* These refs were cleaned up manually */
1989         topic = NULL;
1990         sub = NULL;
1991
1992         return AST_TEST_PASS;
1993 }
1994
1995 static const char *noop_get_id(struct stasis_message *message)
1996 {
1997         return NULL;
1998 }
1999
2000 AST_TEST_DEFINE(caching_dtor_order)
2001 {
2002         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
2003         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
2004         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
2005                 stasis_caching_unsubscribe);
2006         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
2007
2008         switch (cmd) {
2009         case TEST_INIT:
2010                 info->name = __func__;
2011                 info->category = test_category;
2012                 info->summary = "Test that destruction order doesn't bomb stuff";
2013                 info->description = "Test that destruction order doesn't bomb stuff";
2014                 return AST_TEST_NOT_RUN;
2015         case TEST_EXECUTE:
2016                 break;
2017         }
2018
2019         cache = stasis_cache_create(noop_get_id);
2020         ast_test_validate(test, NULL != cache);
2021
2022         topic = stasis_topic_create("test-topic");
2023         ast_test_validate(test, NULL != topic);
2024
2025         caching_topic = stasis_caching_topic_create(topic, cache);
2026         ast_test_validate(test, NULL != caching_topic);
2027
2028         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
2029                 NULL);
2030         ast_test_validate(test, NULL != sub);
2031
2032         /* With any luck, this won't completely blow everything up */
2033         ao2_cleanup(cache);
2034         ao2_cleanup(topic);
2035         stasis_caching_unsubscribe(caching_topic);
2036         stasis_unsubscribe(sub);
2037
2038         /* These refs were cleaned up manually */
2039         cache = NULL;
2040         topic = NULL;
2041         caching_topic = NULL;
2042         sub = NULL;
2043
2044         return AST_TEST_PASS;
2045 }
2046
2047 static int unload_module(void)
2048 {
2049         AST_TEST_UNREGISTER(message_type);
2050         AST_TEST_UNREGISTER(message);
2051         AST_TEST_UNREGISTER(subscription_messages);
2052         AST_TEST_UNREGISTER(subscription_pool_messages);
2053         AST_TEST_UNREGISTER(publish);
2054         AST_TEST_UNREGISTER(publish_sync);
2055         AST_TEST_UNREGISTER(publish_pool);
2056         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
2057         AST_TEST_UNREGISTER(forward);
2058         AST_TEST_UNREGISTER(cache_filter);
2059         AST_TEST_UNREGISTER(cache);
2060         AST_TEST_UNREGISTER(cache_dump);
2061         AST_TEST_UNREGISTER(cache_eid_aggregate);
2062         AST_TEST_UNREGISTER(router);
2063         AST_TEST_UNREGISTER(router_pool);
2064         AST_TEST_UNREGISTER(router_cache_updates);
2065         AST_TEST_UNREGISTER(interleaving);
2066         AST_TEST_UNREGISTER(subscription_interleaving);
2067         AST_TEST_UNREGISTER(no_to_json);
2068         AST_TEST_UNREGISTER(to_json);
2069         AST_TEST_UNREGISTER(no_to_ami);
2070         AST_TEST_UNREGISTER(to_ami);
2071         AST_TEST_UNREGISTER(dtor_order);
2072         AST_TEST_UNREGISTER(caching_dtor_order);
2073         return 0;
2074 }
2075
2076 static int load_module(void)
2077 {
2078         AST_TEST_REGISTER(message_type);
2079         AST_TEST_REGISTER(message);
2080         AST_TEST_REGISTER(subscription_messages);
2081         AST_TEST_REGISTER(subscription_pool_messages);
2082         AST_TEST_REGISTER(publish);
2083         AST_TEST_REGISTER(publish_sync);
2084         AST_TEST_REGISTER(publish_pool);
2085         AST_TEST_REGISTER(unsubscribe_stops_messages);
2086         AST_TEST_REGISTER(forward);
2087         AST_TEST_REGISTER(cache_filter);
2088         AST_TEST_REGISTER(cache);
2089         AST_TEST_REGISTER(cache_dump);
2090         AST_TEST_REGISTER(cache_eid_aggregate);
2091         AST_TEST_REGISTER(router);
2092         AST_TEST_REGISTER(router_pool);
2093         AST_TEST_REGISTER(router_cache_updates);
2094         AST_TEST_REGISTER(interleaving);
2095         AST_TEST_REGISTER(subscription_interleaving);
2096         AST_TEST_REGISTER(no_to_json);
2097         AST_TEST_REGISTER(to_json);
2098         AST_TEST_REGISTER(no_to_ami);
2099         AST_TEST_REGISTER(to_ami);
2100         AST_TEST_REGISTER(dtor_order);
2101         AST_TEST_REGISTER(caching_dtor_order);
2102         return AST_MODULE_LOAD_SUCCESS;
2103 }
2104
2105 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
2106         .load = load_module,
2107         .unload = unload_module
2108 );