tests/test_stasis: Resolve compilation issues from Asterisk 12 merge
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message, const struct stasis_message_sanitizer *sanitize)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, stasis_message_type_create(NULL, NULL, NULL) == STASIS_MESSAGE_TYPE_ERROR);
88         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &uut) == STASIS_MESSAGE_TYPE_SUCCESS);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut1, NULL, ao2_cleanup);
98         RAII_VAR(struct stasis_message *, uut2, NULL, ao2_cleanup);
99         RAII_VAR(char *, data, NULL, ao2_cleanup);
100         char *expected = "SomeData";
101         struct timeval expected_timestamp;
102         struct timeval time_diff;
103         struct ast_eid foreign_eid;
104
105         switch (cmd) {
106         case TEST_INIT:
107                 info->name = __func__;
108                 info->category = test_category;
109                 info->summary = "Test basic message functions";
110                 info->description = "Test basic message functions";
111                 return AST_TEST_NOT_RUN;
112         case TEST_EXECUTE:
113                 break;
114         }
115
116
117         memset(&foreign_eid, 0xFF, sizeof(foreign_eid));
118
119         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
120
121         ast_test_validate(test, NULL == stasis_message_create_full(NULL, NULL, NULL));
122         ast_test_validate(test, NULL == stasis_message_create_full(type, NULL, NULL));
123
124         data = ao2_alloc(strlen(expected) + 1, NULL);
125         strcpy(data, expected);/* Safe */
126         expected_timestamp = ast_tvnow();
127         uut1 = stasis_message_create_full(type, data, &foreign_eid);
128         uut2 = stasis_message_create_full(type, data, NULL);
129
130         ast_test_validate(test, NULL != uut1);
131         ast_test_validate(test, NULL != uut2);
132         ast_test_validate(test, type == stasis_message_type(uut1));
133         ast_test_validate(test, type == stasis_message_type(uut2));
134         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut1)));
135         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut2)));
136         ast_test_validate(test, NULL != stasis_message_eid(uut1));
137         ast_test_validate(test, NULL == stasis_message_eid(uut2));
138         ast_test_validate(test, !ast_eid_cmp(&foreign_eid, stasis_message_eid(uut1)));
139
140         ast_test_validate(test, 3 == ao2_ref(data, 0)); /* uut1 and uut2 have ref to data */
141
142         time_diff = ast_tvsub(*stasis_message_timestamp(uut1), expected_timestamp);
143         /* 10ms is certainly long enough for the two calls to complete */
144         ast_test_validate(test, time_diff.tv_sec == 0);
145         ast_test_validate(test, time_diff.tv_usec < 10000);
146
147         ao2_ref(uut1, -1);
148         uut1 = NULL;
149         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut1 unreffed data */
150         ao2_ref(uut2, -1);
151         uut2 = NULL;
152         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut2 unreffed data */
153
154         return AST_TEST_PASS;
155 }
156
157 struct consumer {
158         ast_cond_t out;
159         struct stasis_message **messages_rxed;
160         size_t messages_rxed_len;
161         int ignore_subscriptions;
162         int complete;
163 };
164
165 static void consumer_dtor(void *obj)
166 {
167         struct consumer *consumer = obj;
168
169         ast_cond_destroy(&consumer->out);
170
171         while (consumer->messages_rxed_len > 0) {
172                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
173         }
174         ast_free(consumer->messages_rxed);
175         consumer->messages_rxed = NULL;
176 }
177
178 static struct consumer *consumer_create(int ignore_subscriptions)
179 {
180         struct consumer *consumer;
181
182         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
183         if (!consumer) {
184                 return NULL;
185         }
186
187         consumer->ignore_subscriptions = ignore_subscriptions;
188         consumer->messages_rxed = ast_malloc(sizeof(*consumer->messages_rxed));
189         if (!consumer->messages_rxed) {
190                 ao2_cleanup(consumer);
191                 return NULL;
192         }
193
194         ast_cond_init(&consumer->out, NULL);
195
196         return consumer;
197 }
198
199 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
200 {
201         struct consumer *consumer = data;
202         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
203         SCOPED_AO2LOCK(lock, consumer);
204
205         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
206                 ++consumer->messages_rxed_len;
207                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
208                 ast_assert(consumer->messages_rxed != NULL);
209                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
210                 ao2_ref(message, +1);
211         }
212
213         if (stasis_subscription_final_message(sub, message)) {
214                 consumer->complete = 1;
215                 consumer_needs_cleanup = consumer;
216         }
217
218         ast_cond_signal(&consumer->out);
219 }
220
221 static void consumer_exec_sync(void *data, struct stasis_subscription *sub, struct stasis_message *message)
222 {
223         struct consumer *consumer = data;
224         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
225         SCOPED_AO2LOCK(lock, consumer);
226
227         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
228                 ++consumer->messages_rxed_len;
229                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
230                 ast_assert(consumer->messages_rxed != NULL);
231                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
232                 ao2_ref(message, +1);
233         }
234
235         if (stasis_subscription_final_message(sub, message)) {
236                 consumer->complete = 1;
237                 consumer_needs_cleanup = consumer;
238         }
239 }
240
241 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
242 {
243         struct timeval start = ast_tvnow();
244         struct timespec end = {
245                 .tv_sec = start.tv_sec + 30,
246                 .tv_nsec = start.tv_usec * 1000
247         };
248
249         SCOPED_AO2LOCK(lock, consumer);
250
251         while (consumer->messages_rxed_len < expected_len) {
252                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
253
254                 if (r == ETIMEDOUT) {
255                         break;
256                 }
257                 ast_assert(r == 0); /* Not expecting any othet types of errors */
258         }
259         return consumer->messages_rxed_len;
260 }
261
262 static int consumer_wait_for_completion(struct consumer *consumer)
263 {
264         struct timeval start = ast_tvnow();
265         struct timespec end = {
266                 .tv_sec = start.tv_sec + 3,
267                 .tv_nsec = start.tv_usec * 1000
268         };
269
270         SCOPED_AO2LOCK(lock, consumer);
271
272         while (!consumer->complete) {
273                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
274
275                 if (r == ETIMEDOUT) {
276                         break;
277                 }
278                 ast_assert(r == 0); /* Not expecting any othet types of errors */
279         }
280         return consumer->complete;
281 }
282
283 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
284 {
285         struct timeval start = ast_tvnow();
286         struct timeval diff = {
287                 .tv_sec = 0,
288                 .tv_usec = 100000 /* wait for 100ms */
289         };
290         struct timeval end_tv = ast_tvadd(start, diff);
291         struct timespec end = {
292                 .tv_sec = end_tv.tv_sec,
293                 .tv_nsec = end_tv.tv_usec * 1000
294         };
295
296         SCOPED_AO2LOCK(lock, consumer);
297
298         while (consumer->messages_rxed_len == expected_len) {
299                 int r = ast_cond_timedwait(&consumer->out, ao2_object_get_lockaddr(consumer), &end);
300
301                 if (r == ETIMEDOUT) {
302                         break;
303                 }
304                 ast_assert(r == 0); /* Not expecting any othet types of errors */
305         }
306         return consumer->messages_rxed_len;
307 }
308
309 AST_TEST_DEFINE(subscription_messages)
310 {
311         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
312         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
313         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
314         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
315         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
316         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
317         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
318         int complete;
319         struct stasis_subscription_change *change;
320
321         switch (cmd) {
322         case TEST_INIT:
323                 info->name = __func__;
324                 info->category = test_category;
325                 info->summary = "Test subscribe/unsubscribe messages";
326                 info->description = "Test subscribe/unsubscribe messages";
327                 return AST_TEST_NOT_RUN;
328         case TEST_EXECUTE:
329                 break;
330         }
331
332         topic = stasis_topic_create("TestTopic");
333         ast_test_validate(test, NULL != topic);
334
335         consumer = consumer_create(0);
336         ast_test_validate(test, NULL != consumer);
337
338         uut = stasis_subscribe(topic, consumer_exec, consumer);
339         ast_test_validate(test, NULL != uut);
340         ao2_ref(consumer, +1);
341         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
342
343         uut = stasis_unsubscribe(uut);
344         complete = consumer_wait_for_completion(consumer);
345         ast_test_validate(test, 1 == complete);
346
347         ast_test_validate(test, 2 == consumer->messages_rxed_len);
348         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
349         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
350
351         change = stasis_message_data(consumer->messages_rxed[0]);
352         ast_test_validate(test, topic == change->topic);
353         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
354         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
355
356         change = stasis_message_data(consumer->messages_rxed[1]);
357         ast_test_validate(test, topic == change->topic);
358         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
359         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
360
361         return AST_TEST_PASS;
362 }
363
364 AST_TEST_DEFINE(subscription_pool_messages)
365 {
366         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
367         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
368         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
369         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
370         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
371         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
372         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
373         int complete;
374         struct stasis_subscription_change *change;
375
376         switch (cmd) {
377         case TEST_INIT:
378                 info->name = __func__;
379                 info->category = test_category;
380                 info->summary = "Test subscribe/unsubscribe messages using a threadpool subscription";
381                 info->description = "Test subscribe/unsubscribe messages using a threadpool subscription";
382                 return AST_TEST_NOT_RUN;
383         case TEST_EXECUTE:
384                 break;
385         }
386
387         topic = stasis_topic_create("TestTopic");
388         ast_test_validate(test, NULL != topic);
389
390         consumer = consumer_create(0);
391         ast_test_validate(test, NULL != consumer);
392
393         uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
394         ast_test_validate(test, NULL != uut);
395         ao2_ref(consumer, +1);
396         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
397
398         uut = stasis_unsubscribe(uut);
399         complete = consumer_wait_for_completion(consumer);
400         ast_test_validate(test, 1 == complete);
401
402         ast_test_validate(test, 2 == consumer->messages_rxed_len);
403         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
404         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
405
406         change = stasis_message_data(consumer->messages_rxed[0]);
407         ast_test_validate(test, topic == change->topic);
408         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
409         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
410
411         change = stasis_message_data(consumer->messages_rxed[1]);
412         ast_test_validate(test, topic == change->topic);
413         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
414         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
415
416         return AST_TEST_PASS;
417 }
418
419 AST_TEST_DEFINE(publish)
420 {
421         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
422         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
423         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
424         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
425         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
426         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
427         int actual_len;
428         const char *actual;
429
430         switch (cmd) {
431         case TEST_INIT:
432                 info->name = __func__;
433                 info->category = test_category;
434                 info->summary = "Test publishing";
435                 info->description = "Test publishing";
436                 return AST_TEST_NOT_RUN;
437         case TEST_EXECUTE:
438                 break;
439         }
440
441         topic = stasis_topic_create("TestTopic");
442         ast_test_validate(test, NULL != topic);
443
444         consumer = consumer_create(1);
445         ast_test_validate(test, NULL != consumer);
446
447         uut = stasis_subscribe(topic, consumer_exec, consumer);
448         ast_test_validate(test, NULL != uut);
449         ao2_ref(consumer, +1);
450
451         test_data = ao2_alloc(1, NULL);
452         ast_test_validate(test, NULL != test_data);
453         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
454         test_message = stasis_message_create(test_message_type, test_data);
455
456         stasis_publish(topic, test_message);
457
458         actual_len = consumer_wait_for(consumer, 1);
459         ast_test_validate(test, 1 == actual_len);
460         actual = stasis_message_data(consumer->messages_rxed[0]);
461         ast_test_validate(test, test_data == actual);
462
463         return AST_TEST_PASS;
464 }
465
466 AST_TEST_DEFINE(publish_sync)
467 {
468         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
469         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
470         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
471         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
472         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
473         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
474         int actual_len;
475         const char *actual;
476
477         switch (cmd) {
478         case TEST_INIT:
479                 info->name = __func__;
480                 info->category = test_category;
481                 info->summary = "Test synchronous publishing";
482                 info->description = "Test synchronous publishing";
483                 return AST_TEST_NOT_RUN;
484         case TEST_EXECUTE:
485                 break;
486         }
487
488         topic = stasis_topic_create("TestTopic");
489         ast_test_validate(test, NULL != topic);
490
491         consumer = consumer_create(1);
492         ast_test_validate(test, NULL != consumer);
493
494         uut = stasis_subscribe(topic, consumer_exec_sync, consumer);
495         ast_test_validate(test, NULL != uut);
496         ao2_ref(consumer, +1);
497
498         test_data = ao2_alloc(1, NULL);
499         ast_test_validate(test, NULL != test_data);
500         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
501         test_message = stasis_message_create(test_message_type, test_data);
502
503         stasis_publish_sync(uut, test_message);
504
505         actual_len = consumer->messages_rxed_len;
506         ast_test_validate(test, 1 == actual_len);
507         actual = stasis_message_data(consumer->messages_rxed[0]);
508         ast_test_validate(test, test_data == actual);
509
510         return AST_TEST_PASS;
511 }
512
513 AST_TEST_DEFINE(publish_pool)
514 {
515         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
516         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
517         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
518         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
519         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
520         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
521         int actual_len;
522         const char *actual;
523
524         switch (cmd) {
525         case TEST_INIT:
526                 info->name = __func__;
527                 info->category = test_category;
528                 info->summary = "Test publishing with a threadpool";
529                 info->description = "Test publishing to a subscriber whose\n"
530                         "subscription dictates messages are received through a\n"
531                         "threadpool.";
532                 return AST_TEST_NOT_RUN;
533         case TEST_EXECUTE:
534                 break;
535         }
536
537         topic = stasis_topic_create("TestTopic");
538         ast_test_validate(test, NULL != topic);
539
540         consumer = consumer_create(1);
541         ast_test_validate(test, NULL != consumer);
542
543         uut = stasis_subscribe_pool(topic, consumer_exec, consumer);
544         ast_test_validate(test, NULL != uut);
545         ao2_ref(consumer, +1);
546
547         test_data = ao2_alloc(1, NULL);
548         ast_test_validate(test, NULL != test_data);
549         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
550         test_message = stasis_message_create(test_message_type, test_data);
551
552         stasis_publish(topic, test_message);
553
554         actual_len = consumer_wait_for(consumer, 1);
555         ast_test_validate(test, 1 == actual_len);
556         actual = stasis_message_data(consumer->messages_rxed[0]);
557         ast_test_validate(test, test_data == actual);
558
559         return AST_TEST_PASS;
560 }
561
562 AST_TEST_DEFINE(unsubscribe_stops_messages)
563 {
564         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
565         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
566         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
567         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
568         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
569         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
570         int actual_len;
571
572         switch (cmd) {
573         case TEST_INIT:
574                 info->name = __func__;
575                 info->category = test_category;
576                 info->summary = "Test simple subscriptions";
577                 info->description = "Test simple subscriptions";
578                 return AST_TEST_NOT_RUN;
579         case TEST_EXECUTE:
580                 break;
581         }
582
583         topic = stasis_topic_create("TestTopic");
584         ast_test_validate(test, NULL != topic);
585
586         consumer = consumer_create(1);
587         ast_test_validate(test, NULL != consumer);
588
589         uut = stasis_subscribe(topic, consumer_exec, consumer);
590         ast_test_validate(test, NULL != uut);
591         ao2_ref(consumer, +1);
592
593         uut = stasis_unsubscribe(uut);
594
595         test_data = ao2_alloc(1, NULL);
596         ast_test_validate(test, NULL != test_data);
597         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
598         test_message = stasis_message_create(test_message_type, test_data);
599
600         stasis_publish(topic, test_message);
601
602         actual_len = consumer_should_stay(consumer, 0);
603         ast_test_validate(test, 0 == actual_len);
604
605         return AST_TEST_PASS;
606 }
607
608 AST_TEST_DEFINE(forward)
609 {
610         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
611         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
612
613         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
614         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
615
616         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
617         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
618         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
619
620         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
621         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
622         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
623         int actual_len;
624
625         switch (cmd) {
626         case TEST_INIT:
627                 info->name = __func__;
628                 info->category = test_category;
629                 info->summary = "Test sending events to a parent topic";
630                 info->description = "Test sending events to a parent topic.\n"
631                         "This test creates three topics (one parent, two children)\n"
632                         "and publishes a message to one child, and verifies it's\n"
633                         "only seen by that child and the parent";
634                 return AST_TEST_NOT_RUN;
635         case TEST_EXECUTE:
636                 break;
637         }
638
639         parent_topic = stasis_topic_create("ParentTestTopic");
640         ast_test_validate(test, NULL != parent_topic);
641         topic = stasis_topic_create("TestTopic");
642         ast_test_validate(test, NULL != topic);
643
644         forward_sub = stasis_forward_all(topic, parent_topic);
645         ast_test_validate(test, NULL != forward_sub);
646
647         parent_consumer = consumer_create(1);
648         ast_test_validate(test, NULL != parent_consumer);
649         consumer = consumer_create(1);
650         ast_test_validate(test, NULL != consumer);
651
652         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
653         ast_test_validate(test, NULL != parent_sub);
654         ao2_ref(parent_consumer, +1);
655         sub = stasis_subscribe(topic, consumer_exec, consumer);
656         ast_test_validate(test, NULL != sub);
657         ao2_ref(consumer, +1);
658
659         test_data = ao2_alloc(1, NULL);
660         ast_test_validate(test, NULL != test_data);
661         ast_test_validate(test, stasis_message_type_create("TestMessage", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
662         test_message = stasis_message_create(test_message_type, test_data);
663
664         stasis_publish(topic, test_message);
665
666         actual_len = consumer_wait_for(consumer, 1);
667         ast_test_validate(test, 1 == actual_len);
668         actual_len = consumer_wait_for(parent_consumer, 1);
669         ast_test_validate(test, 1 == actual_len);
670
671         return AST_TEST_PASS;
672 }
673
674 AST_TEST_DEFINE(interleaving)
675 {
676         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
677         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
678         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
679
680         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
681
682         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
683
684         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
685         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
686         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
687
688         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
689         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
690         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
691
692         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
693
694         int actual_len;
695
696         switch (cmd) {
697         case TEST_INIT:
698                 info->name = __func__;
699                 info->category = test_category;
700                 info->summary = "Test sending interleaved events to a parent topic";
701                 info->description = "Test sending events to a parent topic.\n"
702                         "This test creates three topics (one parent, two children)\n"
703                         "and publishes messages alternately between the children.\n"
704                         "It verifies that the messages are received in the expected\n"
705                         "order.";
706                 return AST_TEST_NOT_RUN;
707         case TEST_EXECUTE:
708                 break;
709         }
710
711         ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
712         ast_test_validate(test, NULL != test_message_type);
713
714         test_data = ao2_alloc(1, NULL);
715         ast_test_validate(test, NULL != test_data);
716
717         test_message1 = stasis_message_create(test_message_type, test_data);
718         ast_test_validate(test, NULL != test_message1);
719         test_message2 = stasis_message_create(test_message_type, test_data);
720         ast_test_validate(test, NULL != test_message2);
721         test_message3 = stasis_message_create(test_message_type, test_data);
722         ast_test_validate(test, NULL != test_message3);
723
724         parent_topic = stasis_topic_create("ParentTestTopic");
725         ast_test_validate(test, NULL != parent_topic);
726         topic1 = stasis_topic_create("Topic1");
727         ast_test_validate(test, NULL != topic1);
728         topic2 = stasis_topic_create("Topic2");
729         ast_test_validate(test, NULL != topic2);
730
731         forward_sub1 = stasis_forward_all(topic1, parent_topic);
732         ast_test_validate(test, NULL != forward_sub1);
733         forward_sub2 = stasis_forward_all(topic2, parent_topic);
734         ast_test_validate(test, NULL != forward_sub2);
735
736         consumer = consumer_create(1);
737         ast_test_validate(test, NULL != consumer);
738
739         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
740         ast_test_validate(test, NULL != sub);
741         ao2_ref(consumer, +1);
742
743         stasis_publish(topic1, test_message1);
744         stasis_publish(topic2, test_message2);
745         stasis_publish(topic1, test_message3);
746
747         actual_len = consumer_wait_for(consumer, 3);
748         ast_test_validate(test, 3 == actual_len);
749
750         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
751         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
752         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
753
754         return AST_TEST_PASS;
755 }
756
757 AST_TEST_DEFINE(subscription_interleaving)
758 {
759         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
760         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
761         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
762
763         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
764
765         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
766
767         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
768         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
769         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
770
771         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
772         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
773         RAII_VAR(struct stasis_subscription *, sub1, NULL, stasis_unsubscribe);
774         RAII_VAR(struct stasis_subscription *, sub2, NULL, stasis_unsubscribe);
775
776         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
777         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
778
779         int actual_len;
780
781         switch (cmd) {
782         case TEST_INIT:
783                 info->name = __func__;
784                 info->category = test_category;
785                 info->summary = "Test sending interleaved events to a parent topic with different subscribers";
786                 info->description = "Test sending events to a parent topic.\n"
787                         "This test creates three topics (one parent, two children)\n"
788                         "and publishes messages alternately between the children.\n"
789                         "It verifies that the messages are received in the expected\n"
790                         "order, for different subscription types: one with a dedicated\n"
791                         "thread, the other on the Stasis threadpool.\n";
792                 return AST_TEST_NOT_RUN;
793         case TEST_EXECUTE:
794                 break;
795         }
796
797         ast_test_validate(test, stasis_message_type_create("test", NULL, &test_message_type) == STASIS_MESSAGE_TYPE_SUCCESS);
798         ast_test_validate(test, NULL != test_message_type);
799
800         test_data = ao2_alloc(1, NULL);
801         ast_test_validate(test, NULL != test_data);
802
803         test_message1 = stasis_message_create(test_message_type, test_data);
804         ast_test_validate(test, NULL != test_message1);
805         test_message2 = stasis_message_create(test_message_type, test_data);
806         ast_test_validate(test, NULL != test_message2);
807         test_message3 = stasis_message_create(test_message_type, test_data);
808         ast_test_validate(test, NULL != test_message3);
809
810         parent_topic = stasis_topic_create("ParentTestTopic");
811         ast_test_validate(test, NULL != parent_topic);
812         topic1 = stasis_topic_create("Topic1");
813         ast_test_validate(test, NULL != topic1);
814         topic2 = stasis_topic_create("Topic2");
815         ast_test_validate(test, NULL != topic2);
816
817         forward_sub1 = stasis_forward_all(topic1, parent_topic);
818         ast_test_validate(test, NULL != forward_sub1);
819         forward_sub2 = stasis_forward_all(topic2, parent_topic);
820         ast_test_validate(test, NULL != forward_sub2);
821
822         consumer1 = consumer_create(1);
823         ast_test_validate(test, NULL != consumer1);
824
825         consumer2 = consumer_create(1);
826         ast_test_validate(test, NULL != consumer2);
827
828         sub1 = stasis_subscribe(parent_topic, consumer_exec, consumer1);
829         ast_test_validate(test, NULL != sub1);
830         ao2_ref(consumer1, +1);
831
832         sub2 = stasis_subscribe_pool(parent_topic, consumer_exec, consumer2);
833         ast_test_validate(test, NULL != sub2);
834         ao2_ref(consumer2, +1);
835
836         stasis_publish(topic1, test_message1);
837         stasis_publish(topic2, test_message2);
838         stasis_publish(topic1, test_message3);
839
840         actual_len = consumer_wait_for(consumer1, 3);
841         ast_test_validate(test, 3 == actual_len);
842
843         actual_len = consumer_wait_for(consumer2, 3);
844         ast_test_validate(test, 3 == actual_len);
845
846         ast_test_validate(test, test_message1 == consumer1->messages_rxed[0]);
847         ast_test_validate(test, test_message2 == consumer1->messages_rxed[1]);
848         ast_test_validate(test, test_message3 == consumer1->messages_rxed[2]);
849
850         ast_test_validate(test, test_message1 == consumer2->messages_rxed[0]);
851         ast_test_validate(test, test_message2 == consumer2->messages_rxed[1]);
852         ast_test_validate(test, test_message3 == consumer2->messages_rxed[2]);
853
854         return AST_TEST_PASS;
855 }
856
857 struct cache_test_data {
858         char *id;
859         char *value;
860 };
861
862 static void cache_test_data_dtor(void *obj)
863 {
864         struct cache_test_data *data = obj;
865
866         ast_free(data->id);
867         ast_free(data->value);
868 }
869
870 static struct stasis_message *cache_test_message_create_full(struct stasis_message_type *type, const char *name, const char *value, struct ast_eid *eid)
871 {
872         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
873
874         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
875         if (data == NULL) {
876                 return NULL;
877         }
878
879         ast_assert(name != NULL);
880         ast_assert(value != NULL);
881
882         data->id = ast_strdup(name);
883         data->value = ast_strdup(value);
884         if (!data->id || !data->value) {
885                 return NULL;
886         }
887
888         return stasis_message_create_full(type, data, eid);
889 }
890
891 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
892 {
893         return cache_test_message_create_full(type, name, value, &ast_eid_default);
894 }
895
896 static const char *cache_test_data_id(struct stasis_message *message)
897 {
898         struct cache_test_data *cachable = stasis_message_data(message);
899
900         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
901                 return NULL;
902         }
903         return cachable->id;
904 }
905
906 static struct stasis_message *cache_test_aggregate_calc_fn(struct stasis_cache_entry *entry, struct stasis_message *new_snapshot)
907 {
908         struct stasis_message *aggregate_snapshot;
909         struct stasis_message *snapshot;
910         struct stasis_message_type *type = NULL;
911         struct cache_test_data *test_data = NULL;
912         int idx;
913         int accumulated = 0;
914         char aggregate_str[30];
915
916         /* Accumulate the aggregate value. */
917         snapshot = stasis_cache_entry_get_local(entry);
918         if (snapshot) {
919                 type = stasis_message_type(snapshot);
920                 test_data = stasis_message_data(snapshot);
921                 accumulated += atoi(test_data->value);
922         }
923         for (idx = 0; ; ++idx) {
924                 snapshot = stasis_cache_entry_get_remote(entry, idx);
925                 if (!snapshot) {
926                         break;
927                 }
928
929                 type = stasis_message_type(snapshot);
930                 test_data = stasis_message_data(snapshot);
931                 accumulated += atoi(test_data->value);
932         }
933
934         if (!test_data) {
935                 /* There are no test entries cached.  Delete the aggregate. */
936                 return NULL;
937         }
938
939         snapshot = stasis_cache_entry_get_aggregate(entry);
940         if (snapshot) {
941                 type = stasis_message_type(snapshot);
942                 test_data = stasis_message_data(snapshot);
943                 if (accumulated == atoi(test_data->value)) {
944                         /* Aggregate test entry did not change. */
945                         return ao2_bump(snapshot);
946                 }
947         }
948
949         snprintf(aggregate_str, sizeof(aggregate_str), "%d", accumulated);
950         aggregate_snapshot = cache_test_message_create_full(type, test_data->id, aggregate_str, NULL);
951         if (!aggregate_snapshot) {
952                 /* Bummer.  We have to keep the old aggregate snapshot. */
953                 ast_log(LOG_ERROR, "Could not create aggregate snapshot.\n");
954                 return ao2_bump(snapshot);
955         }
956
957         return aggregate_snapshot;
958 }
959
960 static void cache_test_aggregate_publish_fn(struct stasis_topic *topic, struct stasis_message *aggregate)
961 {
962         stasis_publish(topic, aggregate);
963 }
964
965 static int check_cache_aggregate(struct stasis_cache *cache, struct stasis_message_type *cache_type, const char *id, const char *value)
966 {
967         RAII_VAR(struct stasis_message *, aggregate, NULL, ao2_cleanup);
968         struct cache_test_data *test_data;
969
970         aggregate = stasis_cache_get_by_eid(cache, cache_type, id, NULL);
971         if (!aggregate) {
972                 /* No aggregate, return true if given no value. */
973                 return !value;
974         }
975
976         /* Return true if the given value matches the aggregate value. */
977         test_data = stasis_message_data(aggregate);
978         return value && !strcmp(value, test_data->value);
979 }
980
981 AST_TEST_DEFINE(cache_filter)
982 {
983         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
984         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
985         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
986         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
987         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
988         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
989         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
990         int actual_len;
991
992         switch (cmd) {
993         case TEST_INIT:
994                 info->name = __func__;
995                 info->category = test_category;
996                 info->summary = "Test caching topics only forward cache_update messages.";
997                 info->description = "Test caching topics only forward cache_update messages.";
998                 return AST_TEST_NOT_RUN;
999         case TEST_EXECUTE:
1000                 break;
1001         }
1002
1003         ast_test_validate(test, stasis_message_type_create("NonCacheable", NULL, &non_cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1004         ast_test_validate(test, NULL != non_cache_type);
1005         topic = stasis_topic_create("SomeTopic");
1006         ast_test_validate(test, NULL != topic);
1007         cache = stasis_cache_create(cache_test_data_id);
1008         ast_test_validate(test, NULL != cache);
1009         caching_topic = stasis_caching_topic_create(topic, cache);
1010         ast_test_validate(test, NULL != caching_topic);
1011         consumer = consumer_create(1);
1012         ast_test_validate(test, NULL != consumer);
1013         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1014         ast_test_validate(test, NULL != sub);
1015         ao2_ref(consumer, +1);
1016
1017         test_message = cache_test_message_create(non_cache_type, "1", "1");
1018         ast_test_validate(test, NULL != test_message);
1019
1020         stasis_publish(topic, test_message);
1021
1022         actual_len = consumer_should_stay(consumer, 0);
1023         ast_test_validate(test, 0 == actual_len);
1024
1025         return AST_TEST_PASS;
1026 }
1027
1028 AST_TEST_DEFINE(cache)
1029 {
1030         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1031         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1032         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1033         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1034         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1035         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1036         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1037         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1038         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1039         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1040         int actual_len;
1041         struct stasis_cache_update *actual_update;
1042
1043         switch (cmd) {
1044         case TEST_INIT:
1045                 info->name = __func__;
1046                 info->category = test_category;
1047                 info->summary = "Test passing messages through cache topic unscathed.";
1048                 info->description = "Test passing messages through cache topic unscathed.";
1049                 return AST_TEST_NOT_RUN;
1050         case TEST_EXECUTE:
1051                 break;
1052         }
1053
1054         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1055         ast_test_validate(test, NULL != cache_type);
1056         topic = stasis_topic_create("SomeTopic");
1057         ast_test_validate(test, NULL != topic);
1058         cache = stasis_cache_create(cache_test_data_id);
1059         ast_test_validate(test, NULL != cache);
1060         caching_topic = stasis_caching_topic_create(topic, cache);
1061         ast_test_validate(test, NULL != caching_topic);
1062         consumer = consumer_create(1);
1063         ast_test_validate(test, NULL != consumer);
1064         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1065         ast_test_validate(test, NULL != sub);
1066         ao2_ref(consumer, +1);
1067
1068         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1069         ast_test_validate(test, NULL != test_message1_1);
1070         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1071         ast_test_validate(test, NULL != test_message2_1);
1072
1073         /* Post a couple of snapshots */
1074         stasis_publish(topic, test_message1_1);
1075         stasis_publish(topic, test_message2_1);
1076         actual_len = consumer_wait_for(consumer, 2);
1077         ast_test_validate(test, 2 == actual_len);
1078
1079         /* Check for new snapshot messages */
1080         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
1081         actual_update = stasis_message_data(consumer->messages_rxed[0]);
1082         ast_test_validate(test, NULL == actual_update->old_snapshot);
1083         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
1084         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
1085         /* stasis_cache_get returned a ref, so unref test_message1_1 */
1086         ao2_ref(test_message1_1, -1);
1087
1088         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
1089         actual_update = stasis_message_data(consumer->messages_rxed[1]);
1090         ast_test_validate(test, NULL == actual_update->old_snapshot);
1091         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
1092         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
1093         /* stasis_cache_get returned a ref, so unref test_message2_1 */
1094         ao2_ref(test_message2_1, -1);
1095
1096         /* Update snapshot 2 */
1097         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1098         ast_test_validate(test, NULL != test_message2_2);
1099         stasis_publish(topic, test_message2_2);
1100
1101         actual_len = consumer_wait_for(consumer, 3);
1102         ast_test_validate(test, 3 == actual_len);
1103
1104         actual_update = stasis_message_data(consumer->messages_rxed[2]);
1105         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
1106         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
1107         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
1108         /* stasis_cache_get returned a ref, so unref test_message2_2 */
1109         ao2_ref(test_message2_2, -1);
1110
1111         /* Clear snapshot 1 */
1112         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1113         ast_test_validate(test, NULL != test_message1_clear);
1114         stasis_publish(topic, test_message1_clear);
1115
1116         actual_len = consumer_wait_for(consumer, 4);
1117         ast_test_validate(test, 4 == actual_len);
1118
1119         actual_update = stasis_message_data(consumer->messages_rxed[3]);
1120         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
1121         ast_test_validate(test, NULL == actual_update->new_snapshot);
1122         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
1123
1124         return AST_TEST_PASS;
1125 }
1126
1127 AST_TEST_DEFINE(cache_dump)
1128 {
1129         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1130         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1131         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1132         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1133         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
1134         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1135         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1136         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1137         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1138         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1139         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1140         int actual_len;
1141         struct ao2_iterator i;
1142         void *obj;
1143
1144         switch (cmd) {
1145         case TEST_INIT:
1146                 info->name = __func__;
1147                 info->category = test_category;
1148                 info->summary = "Test cache dump routines.";
1149                 info->description = "Test cache dump routines.";
1150                 return AST_TEST_NOT_RUN;
1151         case TEST_EXECUTE:
1152                 break;
1153         }
1154
1155         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1156         ast_test_validate(test, NULL != cache_type);
1157         topic = stasis_topic_create("SomeTopic");
1158         ast_test_validate(test, NULL != topic);
1159         cache = stasis_cache_create(cache_test_data_id);
1160         ast_test_validate(test, NULL != cache);
1161         caching_topic = stasis_caching_topic_create(topic, cache);
1162         ast_test_validate(test, NULL != caching_topic);
1163         consumer = consumer_create(1);
1164         ast_test_validate(test, NULL != consumer);
1165         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
1166         ast_test_validate(test, NULL != sub);
1167         ao2_ref(consumer, +1);
1168
1169         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
1170         ast_test_validate(test, NULL != test_message1_1);
1171         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
1172         ast_test_validate(test, NULL != test_message2_1);
1173
1174         /* Post a couple of snapshots */
1175         stasis_publish(topic, test_message1_1);
1176         stasis_publish(topic, test_message2_1);
1177         actual_len = consumer_wait_for(consumer, 2);
1178         ast_test_validate(test, 2 == actual_len);
1179
1180         /* Check the cache */
1181         ao2_cleanup(cache_dump);
1182         cache_dump = stasis_cache_dump(cache, NULL);
1183         ast_test_validate(test, NULL != cache_dump);
1184         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1185         i = ao2_iterator_init(cache_dump, 0);
1186         while ((obj = ao2_iterator_next(&i))) {
1187                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1188                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
1189         }
1190         ao2_iterator_destroy(&i);
1191
1192         /* Update snapshot 2 */
1193         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
1194         ast_test_validate(test, NULL != test_message2_2);
1195         stasis_publish(topic, test_message2_2);
1196
1197         actual_len = consumer_wait_for(consumer, 3);
1198         ast_test_validate(test, 3 == actual_len);
1199
1200         /* Check the cache */
1201         ao2_cleanup(cache_dump);
1202         cache_dump = stasis_cache_dump(cache, NULL);
1203         ast_test_validate(test, NULL != cache_dump);
1204         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1205         i = ao2_iterator_init(cache_dump, 0);
1206         while ((obj = ao2_iterator_next(&i))) {
1207                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1208                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
1209         }
1210         ao2_iterator_destroy(&i);
1211
1212         /* Clear snapshot 1 */
1213         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1214         ast_test_validate(test, NULL != test_message1_clear);
1215         stasis_publish(topic, test_message1_clear);
1216
1217         actual_len = consumer_wait_for(consumer, 4);
1218         ast_test_validate(test, 4 == actual_len);
1219
1220         /* Check the cache */
1221         ao2_cleanup(cache_dump);
1222         cache_dump = stasis_cache_dump(cache, NULL);
1223         ast_test_validate(test, NULL != cache_dump);
1224         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1225         i = ao2_iterator_init(cache_dump, 0);
1226         while ((obj = ao2_iterator_next(&i))) {
1227                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1228                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1229         }
1230         ao2_iterator_destroy(&i);
1231
1232         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
1233         ao2_cleanup(cache_dump);
1234         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
1235         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
1236
1237         return AST_TEST_PASS;
1238 }
1239
1240 AST_TEST_DEFINE(cache_eid_aggregate)
1241 {
1242         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
1243         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1244         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1245         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
1246         RAII_VAR(struct consumer *, cache_consumer, NULL, ao2_cleanup);
1247         RAII_VAR(struct consumer *, topic_consumer, NULL, ao2_cleanup);
1248         RAII_VAR(struct stasis_subscription *, topic_sub, NULL, stasis_unsubscribe);
1249         RAII_VAR(struct stasis_subscription *, cache_sub, NULL, stasis_unsubscribe);
1250         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
1251         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
1252         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
1253         RAII_VAR(struct stasis_message *, test_message2_3, NULL, ao2_cleanup);
1254         RAII_VAR(struct stasis_message *, test_message2_4, NULL, ao2_cleanup);
1255         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
1256         RAII_VAR(struct stasis_message *, test_message2_clear, NULL, ao2_cleanup);
1257         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
1258         int actual_len;
1259         struct ao2_iterator i;
1260         void *obj;
1261         struct ast_eid foreign_eid1;
1262         struct ast_eid foreign_eid2;
1263
1264         switch (cmd) {
1265         case TEST_INIT:
1266                 info->name = __func__;
1267                 info->category = test_category;
1268                 info->summary = "Test cache eid and aggregate support.";
1269                 info->description = "Test cache eid and aggregate support.";
1270                 return AST_TEST_NOT_RUN;
1271         case TEST_EXECUTE:
1272                 break;
1273         }
1274
1275         memset(&foreign_eid1, 0xAA, sizeof(foreign_eid1));
1276         memset(&foreign_eid2, 0xBB, sizeof(foreign_eid2));
1277
1278         ast_test_validate(test, stasis_message_type_create("Cacheable", NULL, &cache_type) == STASIS_MESSAGE_TYPE_SUCCESS);
1279         ast_test_validate(test, NULL != cache_type);
1280
1281         topic = stasis_topic_create("SomeTopic");
1282         ast_test_validate(test, NULL != topic);
1283
1284         /* To consume events published to the topic. */
1285         topic_consumer = consumer_create(1);
1286         ast_test_validate(test, NULL != topic_consumer);
1287
1288         topic_sub = stasis_subscribe(topic, consumer_exec, topic_consumer);
1289         ast_test_validate(test, NULL != topic_sub);
1290         ao2_ref(topic_consumer, +1);
1291
1292         cache = stasis_cache_create_full(cache_test_data_id,
1293                 cache_test_aggregate_calc_fn, cache_test_aggregate_publish_fn);
1294         ast_test_validate(test, NULL != cache);
1295
1296         caching_topic = stasis_caching_topic_create(topic, cache);
1297         ast_test_validate(test, NULL != caching_topic);
1298
1299         /* To consume update events published to the caching_topic. */
1300         cache_consumer = consumer_create(1);
1301         ast_test_validate(test, NULL != cache_consumer);
1302
1303         cache_sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, cache_consumer);
1304         ast_test_validate(test, NULL != cache_sub);
1305         ao2_ref(cache_consumer, +1);
1306
1307         /* Create test messages. */
1308         test_message1_1 = cache_test_message_create_full(cache_type, "1", "1", &ast_eid_default);
1309         ast_test_validate(test, NULL != test_message1_1);
1310         test_message2_1 = cache_test_message_create_full(cache_type, "2", "1", &ast_eid_default);
1311         ast_test_validate(test, NULL != test_message2_1);
1312         test_message2_2 = cache_test_message_create_full(cache_type, "2", "2", &foreign_eid1);
1313         ast_test_validate(test, NULL != test_message2_2);
1314         test_message2_3 = cache_test_message_create_full(cache_type, "2", "3", &foreign_eid2);
1315         ast_test_validate(test, NULL != test_message2_3);
1316         test_message2_4 = cache_test_message_create_full(cache_type, "2", "4", &foreign_eid2);
1317         ast_test_validate(test, NULL != test_message2_4);
1318
1319         /* Post some snapshots */
1320         stasis_publish(topic, test_message1_1);
1321         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", "1"));
1322         stasis_publish(topic, test_message2_1);
1323         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "1"));
1324         stasis_publish(topic, test_message2_2);
1325         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "3"));
1326
1327         actual_len = consumer_wait_for(cache_consumer, 6);
1328         ast_test_validate(test, 6 == actual_len);
1329         actual_len = consumer_wait_for(topic_consumer, 6);
1330         ast_test_validate(test, 6 == actual_len);
1331
1332         /* Check the cache */
1333         ao2_cleanup(cache_dump);
1334         cache_dump = stasis_cache_dump_all(cache, NULL);
1335         ast_test_validate(test, NULL != cache_dump);
1336         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1337         i = ao2_iterator_init(cache_dump, 0);
1338         while ((obj = ao2_iterator_next(&i))) {
1339                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1340
1341                 ast_test_validate(test,
1342                         actual_cache_entry == test_message1_1
1343                         || actual_cache_entry == test_message2_1
1344                         || actual_cache_entry == test_message2_2);
1345         }
1346         ao2_iterator_destroy(&i);
1347
1348         /* Check the local cached items */
1349         ao2_cleanup(cache_dump);
1350         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &ast_eid_default);
1351         ast_test_validate(test, NULL != cache_dump);
1352         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1353         i = ao2_iterator_init(cache_dump, 0);
1354         while ((obj = ao2_iterator_next(&i))) {
1355                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1356
1357                 ast_test_validate(test,
1358                         actual_cache_entry == test_message1_1
1359                         || actual_cache_entry == test_message2_1);
1360         }
1361         ao2_iterator_destroy(&i);
1362
1363         /* Post snapshot 2 from another eid. */
1364         stasis_publish(topic, test_message2_3);
1365         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "6"));
1366
1367         actual_len = consumer_wait_for(cache_consumer, 8);
1368         ast_test_validate(test, 8 == actual_len);
1369         actual_len = consumer_wait_for(topic_consumer, 8);
1370         ast_test_validate(test, 8 == actual_len);
1371
1372         /* Check the cache */
1373         ao2_cleanup(cache_dump);
1374         cache_dump = stasis_cache_dump_all(cache, NULL);
1375         ast_test_validate(test, NULL != cache_dump);
1376         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1377         i = ao2_iterator_init(cache_dump, 0);
1378         while ((obj = ao2_iterator_next(&i))) {
1379                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1380
1381                 ast_test_validate(test,
1382                         actual_cache_entry == test_message1_1
1383                         || actual_cache_entry == test_message2_1
1384                         || actual_cache_entry == test_message2_2
1385                         || actual_cache_entry == test_message2_3);
1386         }
1387         ao2_iterator_destroy(&i);
1388
1389         /* Check the remote cached items */
1390         ao2_cleanup(cache_dump);
1391         cache_dump = stasis_cache_dump_by_eid(cache, NULL, &foreign_eid1);
1392         ast_test_validate(test, NULL != cache_dump);
1393         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
1394         i = ao2_iterator_init(cache_dump, 0);
1395         while ((obj = ao2_iterator_next(&i))) {
1396                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1397
1398                 ast_test_validate(test, actual_cache_entry == test_message2_2);
1399         }
1400         ao2_iterator_destroy(&i);
1401
1402         /* Post snapshot 2 from a repeated eid. */
1403         stasis_publish(topic, test_message2_4);
1404         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "7"));
1405
1406         actual_len = consumer_wait_for(cache_consumer, 10);
1407         ast_test_validate(test, 10 == actual_len);
1408         actual_len = consumer_wait_for(topic_consumer, 10);
1409         ast_test_validate(test, 10 == actual_len);
1410
1411         /* Check the cache */
1412         ao2_cleanup(cache_dump);
1413         cache_dump = stasis_cache_dump_all(cache, NULL);
1414         ast_test_validate(test, NULL != cache_dump);
1415         ast_test_validate(test, 4 == ao2_container_count(cache_dump));
1416         i = ao2_iterator_init(cache_dump, 0);
1417         while ((obj = ao2_iterator_next(&i))) {
1418                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1419
1420                 ast_test_validate(test,
1421                         actual_cache_entry == test_message1_1
1422                         || actual_cache_entry == test_message2_1
1423                         || actual_cache_entry == test_message2_2
1424                         || actual_cache_entry == test_message2_4);
1425         }
1426         ao2_iterator_destroy(&i);
1427
1428         /* Check all snapshot 2 cache entries. */
1429         ao2_cleanup(cache_dump);
1430         cache_dump = stasis_cache_get_all(cache, cache_type, "2");
1431         ast_test_validate(test, NULL != cache_dump);
1432         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1433         i = ao2_iterator_init(cache_dump, 0);
1434         while ((obj = ao2_iterator_next(&i))) {
1435                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1436
1437                 ast_test_validate(test,
1438                         actual_cache_entry == test_message2_1
1439                         || actual_cache_entry == test_message2_2
1440                         || actual_cache_entry == test_message2_4);
1441         }
1442         ao2_iterator_destroy(&i);
1443
1444         /* Clear snapshot 1 */
1445         test_message1_clear = stasis_cache_clear_create(test_message1_1);
1446         ast_test_validate(test, NULL != test_message1_clear);
1447         stasis_publish(topic, test_message1_clear);
1448         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "1", NULL));
1449
1450         actual_len = consumer_wait_for(cache_consumer, 12);
1451         ast_test_validate(test, 12 == actual_len);
1452         actual_len = consumer_wait_for(topic_consumer, 11);
1453         ast_test_validate(test, 11 == actual_len);
1454
1455         /* Check the cache */
1456         ao2_cleanup(cache_dump);
1457         cache_dump = stasis_cache_dump_all(cache, NULL);
1458         ast_test_validate(test, NULL != cache_dump);
1459         ast_test_validate(test, 3 == ao2_container_count(cache_dump));
1460         i = ao2_iterator_init(cache_dump, 0);
1461         while ((obj = ao2_iterator_next(&i))) {
1462                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1463
1464                 ast_test_validate(test,
1465                         actual_cache_entry == test_message2_1
1466                         || actual_cache_entry == test_message2_2
1467                         || actual_cache_entry == test_message2_4);
1468         }
1469         ao2_iterator_destroy(&i);
1470
1471         /* Clear snapshot 2 from a remote eid */
1472         test_message2_clear = stasis_cache_clear_create(test_message2_2);
1473         ast_test_validate(test, NULL != test_message2_clear);
1474         stasis_publish(topic, test_message2_clear);
1475         ast_test_validate(test, check_cache_aggregate(cache, cache_type, "2", "5"));
1476
1477         actual_len = consumer_wait_for(cache_consumer, 14);
1478         ast_test_validate(test, 14 == actual_len);
1479         actual_len = consumer_wait_for(topic_consumer, 13);
1480         ast_test_validate(test, 13 == actual_len);
1481
1482         /* Check the cache */
1483         ao2_cleanup(cache_dump);
1484         cache_dump = stasis_cache_dump_all(cache, NULL);
1485         ast_test_validate(test, NULL != cache_dump);
1486         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
1487         i = ao2_iterator_init(cache_dump, 0);
1488         while ((obj = ao2_iterator_next(&i))) {
1489                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
1490
1491                 ast_test_validate(test,
1492                         actual_cache_entry == test_message2_1
1493                         || actual_cache_entry == test_message2_4);
1494         }
1495         ao2_iterator_destroy(&i);
1496
1497         return AST_TEST_PASS;
1498 }
1499
1500 AST_TEST_DEFINE(router)
1501 {
1502         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1503         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1504         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1505         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1506         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1507         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1508         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1509         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1510         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1511         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1512         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1513         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1514         int actual_len, ret;
1515         struct stasis_message *actual;
1516
1517         switch (cmd) {
1518         case TEST_INIT:
1519                 info->name = __func__;
1520                 info->category = test_category;
1521                 info->summary = "Test simple message routing";
1522                 info->description = "Test simple message routing";
1523                 return AST_TEST_NOT_RUN;
1524         case TEST_EXECUTE:
1525                 break;
1526         }
1527
1528         topic = stasis_topic_create("TestTopic");
1529         ast_test_validate(test, NULL != topic);
1530
1531         consumer1 = consumer_create(1);
1532         ast_test_validate(test, NULL != consumer1);
1533         consumer2 = consumer_create(1);
1534         ast_test_validate(test, NULL != consumer2);
1535         consumer3 = consumer_create(1);
1536         ast_test_validate(test, NULL != consumer3);
1537
1538         ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1539         ast_test_validate(test, NULL != test_message_type1);
1540         ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1541         ast_test_validate(test, NULL != test_message_type2);
1542         ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1543         ast_test_validate(test, NULL != test_message_type3);
1544
1545         uut = stasis_message_router_create(topic);
1546         ast_test_validate(test, NULL != uut);
1547
1548         ret = stasis_message_router_add(
1549                 uut, test_message_type1, consumer_exec, consumer1);
1550         ast_test_validate(test, 0 == ret);
1551         ao2_ref(consumer1, +1);
1552         ret = stasis_message_router_add(
1553                 uut, test_message_type2, consumer_exec, consumer2);
1554         ast_test_validate(test, 0 == ret);
1555         ao2_ref(consumer2, +1);
1556         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1557         ast_test_validate(test, 0 == ret);
1558         ao2_ref(consumer3, +1);
1559
1560         test_data = ao2_alloc(1, NULL);
1561         ast_test_validate(test, NULL != test_data);
1562         test_message1 = stasis_message_create(test_message_type1, test_data);
1563         ast_test_validate(test, NULL != test_message1);
1564         test_message2 = stasis_message_create(test_message_type2, test_data);
1565         ast_test_validate(test, NULL != test_message2);
1566         test_message3 = stasis_message_create(test_message_type3, test_data);
1567         ast_test_validate(test, NULL != test_message3);
1568
1569         stasis_publish(topic, test_message1);
1570         stasis_publish(topic, test_message2);
1571         stasis_publish(topic, test_message3);
1572
1573         actual_len = consumer_wait_for(consumer1, 1);
1574         ast_test_validate(test, 1 == actual_len);
1575         actual_len = consumer_wait_for(consumer2, 1);
1576         ast_test_validate(test, 1 == actual_len);
1577         actual_len = consumer_wait_for(consumer3, 1);
1578         ast_test_validate(test, 1 == actual_len);
1579
1580         actual = consumer1->messages_rxed[0];
1581         ast_test_validate(test, test_message1 == actual);
1582
1583         actual = consumer2->messages_rxed[0];
1584         ast_test_validate(test, test_message2 == actual);
1585
1586         actual = consumer3->messages_rxed[0];
1587         ast_test_validate(test, test_message3 == actual);
1588
1589         /* consumer1 and consumer2 do not get the final message. */
1590         ao2_cleanup(consumer1);
1591         ao2_cleanup(consumer2);
1592
1593         return AST_TEST_PASS;
1594 }
1595
1596 AST_TEST_DEFINE(router_pool)
1597 {
1598         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1599         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1600         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1601         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1602         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1603         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1604         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1605         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1606         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1607         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1608         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1609         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1610         int actual_len, ret;
1611         struct stasis_message *actual;
1612
1613         switch (cmd) {
1614         case TEST_INIT:
1615                 info->name = __func__;
1616                 info->category = test_category;
1617                 info->summary = "Test message routing via threadpool";
1618                 info->description = "Test simple message routing when\n"
1619                         "the subscriptions dictate usage of the Stasis\n"
1620                         "threadpool.\n";
1621                 return AST_TEST_NOT_RUN;
1622         case TEST_EXECUTE:
1623                 break;
1624         }
1625
1626         topic = stasis_topic_create("TestTopic");
1627         ast_test_validate(test, NULL != topic);
1628
1629         consumer1 = consumer_create(1);
1630         ast_test_validate(test, NULL != consumer1);
1631         consumer2 = consumer_create(1);
1632         ast_test_validate(test, NULL != consumer2);
1633         consumer3 = consumer_create(1);
1634         ast_test_validate(test, NULL != consumer3);
1635
1636         ast_test_validate(test, stasis_message_type_create("TestMessage1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1637         ast_test_validate(test, NULL != test_message_type1);
1638         ast_test_validate(test, stasis_message_type_create("TestMessage2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1639         ast_test_validate(test, NULL != test_message_type2);
1640         ast_test_validate(test, stasis_message_type_create("TestMessage3", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1641         ast_test_validate(test, NULL != test_message_type3);
1642
1643         uut = stasis_message_router_create_pool(topic);
1644         ast_test_validate(test, NULL != uut);
1645
1646         ret = stasis_message_router_add(
1647                 uut, test_message_type1, consumer_exec, consumer1);
1648         ast_test_validate(test, 0 == ret);
1649         ao2_ref(consumer1, +1);
1650         ret = stasis_message_router_add(
1651                 uut, test_message_type2, consumer_exec, consumer2);
1652         ast_test_validate(test, 0 == ret);
1653         ao2_ref(consumer2, +1);
1654         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1655         ast_test_validate(test, 0 == ret);
1656         ao2_ref(consumer3, +1);
1657
1658         test_data = ao2_alloc(1, NULL);
1659         ast_test_validate(test, NULL != test_data);
1660         test_message1 = stasis_message_create(test_message_type1, test_data);
1661         ast_test_validate(test, NULL != test_message1);
1662         test_message2 = stasis_message_create(test_message_type2, test_data);
1663         ast_test_validate(test, NULL != test_message2);
1664         test_message3 = stasis_message_create(test_message_type3, test_data);
1665         ast_test_validate(test, NULL != test_message3);
1666
1667         stasis_publish(topic, test_message1);
1668         stasis_publish(topic, test_message2);
1669         stasis_publish(topic, test_message3);
1670
1671         actual_len = consumer_wait_for(consumer1, 1);
1672         ast_test_validate(test, 1 == actual_len);
1673         actual_len = consumer_wait_for(consumer2, 1);
1674         ast_test_validate(test, 1 == actual_len);
1675         actual_len = consumer_wait_for(consumer3, 1);
1676         ast_test_validate(test, 1 == actual_len);
1677
1678         actual = consumer1->messages_rxed[0];
1679         ast_test_validate(test, test_message1 == actual);
1680
1681         actual = consumer2->messages_rxed[0];
1682         ast_test_validate(test, test_message2 == actual);
1683
1684         actual = consumer3->messages_rxed[0];
1685         ast_test_validate(test, test_message3 == actual);
1686
1687         /* consumer1 and consumer2 do not get the final message. */
1688         ao2_cleanup(consumer1);
1689         ao2_cleanup(consumer2);
1690
1691         return AST_TEST_PASS;
1692 }
1693
1694 static const char *cache_simple(struct stasis_message *message)
1695 {
1696         const char *type_name =
1697                 stasis_message_type_name(stasis_message_type(message));
1698         if (!ast_begins_with(type_name, "Cache")) {
1699                 return NULL;
1700         }
1701
1702         return "cached";
1703 }
1704
1705 AST_TEST_DEFINE(router_cache_updates)
1706 {
1707         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1708         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1709         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
1710         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
1711         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
1712         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
1713         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
1714         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
1715         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
1716         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
1717         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
1718         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
1719         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
1720         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
1721         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
1722         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
1723         struct stasis_cache_update *update;
1724         int actual_len, ret;
1725         struct stasis_message *actual;
1726
1727         switch (cmd) {
1728         case TEST_INIT:
1729                 info->name = __func__;
1730                 info->category = test_category;
1731                 info->summary = "Test special handling cache_update messages";
1732                 info->description = "Test special handling cache_update messages";
1733                 return AST_TEST_NOT_RUN;
1734         case TEST_EXECUTE:
1735                 break;
1736         }
1737
1738         topic = stasis_topic_create("TestTopic");
1739         ast_test_validate(test, NULL != topic);
1740
1741         cache = stasis_cache_create(cache_simple);
1742         ast_test_validate(test, NULL != cache);
1743         caching_topic = stasis_caching_topic_create(topic, cache);
1744         ast_test_validate(test, NULL != caching_topic);
1745
1746         consumer1 = consumer_create(1);
1747         ast_test_validate(test, NULL != consumer1);
1748         consumer2 = consumer_create(1);
1749         ast_test_validate(test, NULL != consumer2);
1750         consumer3 = consumer_create(1);
1751         ast_test_validate(test, NULL != consumer3);
1752
1753         ast_test_validate(test, stasis_message_type_create("Cache1", NULL, &test_message_type1) == STASIS_MESSAGE_TYPE_SUCCESS);
1754         ast_test_validate(test, NULL != test_message_type1);
1755         ast_test_validate(test, stasis_message_type_create("Cache2", NULL, &test_message_type2) == STASIS_MESSAGE_TYPE_SUCCESS);
1756         ast_test_validate(test, NULL != test_message_type2);
1757         ast_test_validate(test, stasis_message_type_create("NonCache", NULL, &test_message_type3) == STASIS_MESSAGE_TYPE_SUCCESS);
1758         ast_test_validate(test, NULL != test_message_type3);
1759
1760         uut = stasis_message_router_create(
1761                 stasis_caching_get_topic(caching_topic));
1762         ast_test_validate(test, NULL != uut);
1763
1764         ret = stasis_message_router_add_cache_update(
1765                 uut, test_message_type1, consumer_exec, consumer1);
1766         ast_test_validate(test, 0 == ret);
1767         ao2_ref(consumer1, +1);
1768         ret = stasis_message_router_add(
1769                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1770         ast_test_validate(test, 0 == ret);
1771         ao2_ref(consumer2, +1);
1772         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1773         ast_test_validate(test, 0 == ret);
1774         ao2_ref(consumer3, +1);
1775
1776         test_data = ao2_alloc(1, NULL);
1777         ast_test_validate(test, NULL != test_data);
1778         test_message1 = stasis_message_create(test_message_type1, test_data);
1779         ast_test_validate(test, NULL != test_message1);
1780         test_message2 = stasis_message_create(test_message_type2, test_data);
1781         ast_test_validate(test, NULL != test_message2);
1782         test_message3 = stasis_message_create(test_message_type3, test_data);
1783         ast_test_validate(test, NULL != test_message3);
1784
1785         stasis_publish(topic, test_message1);
1786         stasis_publish(topic, test_message2);
1787         stasis_publish(topic, test_message3);
1788
1789         actual_len = consumer_wait_for(consumer1, 1);
1790         ast_test_validate(test, 1 == actual_len);
1791         actual_len = consumer_wait_for(consumer2, 1);
1792         ast_test_validate(test, 1 == actual_len);
1793         /* Uncacheable message should not be passed through */
1794         actual_len = consumer_should_stay(consumer3, 0);
1795         ast_test_validate(test, 0 == actual_len);
1796
1797         actual = consumer1->messages_rxed[0];
1798         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1799         update = stasis_message_data(actual);
1800         ast_test_validate(test, test_message_type1 == update->type);
1801         ast_test_validate(test, test_message1 == update->new_snapshot);
1802
1803         actual = consumer2->messages_rxed[0];
1804         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1805         update = stasis_message_data(actual);
1806         ast_test_validate(test, test_message_type2 == update->type);
1807         ast_test_validate(test, test_message2 == update->new_snapshot);
1808
1809         /* consumer1 and consumer2 do not get the final message. */
1810         ao2_cleanup(consumer1);
1811         ao2_cleanup(consumer2);
1812
1813         return AST_TEST_PASS;
1814 }
1815
1816 AST_TEST_DEFINE(no_to_json)
1817 {
1818         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1819         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1820         RAII_VAR(char *, data, NULL, ao2_cleanup);
1821         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1822         char *expected = "SomeData";
1823
1824         switch (cmd) {
1825         case TEST_INIT:
1826                 info->name = __func__;
1827                 info->category = test_category;
1828                 info->summary = "Test message to_json function";
1829                 info->description = "Test message to_json function";
1830                 return AST_TEST_NOT_RUN;
1831         case TEST_EXECUTE:
1832                 break;
1833         }
1834
1835         /* Test NULL */
1836         actual = stasis_message_to_json(NULL, NULL);
1837         ast_test_validate(test, NULL == actual);
1838
1839         /* Test message with NULL to_json function */
1840         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1841
1842         data = ao2_alloc(strlen(expected) + 1, NULL);
1843         strcpy(data, expected);
1844         uut = stasis_message_create(type, data);
1845         ast_test_validate(test, NULL != uut);
1846
1847         actual = stasis_message_to_json(uut, NULL);
1848         ast_test_validate(test, NULL == actual);
1849
1850         return AST_TEST_PASS;
1851 }
1852
1853 AST_TEST_DEFINE(to_json)
1854 {
1855         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1856         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1857         RAII_VAR(char *, data, NULL, ao2_cleanup);
1858         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1859         const char *expected_text = "SomeData";
1860         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1861
1862         switch (cmd) {
1863         case TEST_INIT:
1864                 info->name = __func__;
1865                 info->category = test_category;
1866                 info->summary = "Test message to_json function when NULL";
1867                 info->description = "Test message to_json function when NULL";
1868                 return AST_TEST_NOT_RUN;
1869         case TEST_EXECUTE:
1870                 break;
1871         }
1872
1873         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1874
1875         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1876         strcpy(data, expected_text);
1877         uut = stasis_message_create(type, data);
1878         ast_test_validate(test, NULL != uut);
1879
1880         expected = ast_json_string_create(expected_text);
1881         actual = stasis_message_to_json(uut, NULL);
1882         ast_test_validate(test, ast_json_equal(expected, actual));
1883
1884         return AST_TEST_PASS;
1885 }
1886
1887 AST_TEST_DEFINE(no_to_ami)
1888 {
1889         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1890         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1891         RAII_VAR(char *, data, NULL, ao2_cleanup);
1892         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1893         char *expected = "SomeData";
1894
1895         switch (cmd) {
1896         case TEST_INIT:
1897                 info->name = __func__;
1898                 info->category = test_category;
1899                 info->summary = "Test message to_ami function when NULL";
1900                 info->description = "Test message to_ami function when NULL";
1901                 return AST_TEST_NOT_RUN;
1902         case TEST_EXECUTE:
1903                 break;
1904         }
1905
1906         /* Test NULL */
1907         actual = stasis_message_to_ami(NULL);
1908         ast_test_validate(test, NULL == actual);
1909
1910         /* Test message with NULL to_ami function */
1911         ast_test_validate(test, stasis_message_type_create("SomeMessage", NULL, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1912
1913         data = ao2_alloc(strlen(expected) + 1, NULL);
1914         strcpy(data, expected);
1915         uut = stasis_message_create(type, data);
1916         ast_test_validate(test, NULL != uut);
1917
1918         actual = stasis_message_to_ami(uut);
1919         ast_test_validate(test, NULL == actual);
1920
1921         return AST_TEST_PASS;
1922 }
1923
1924 AST_TEST_DEFINE(to_ami)
1925 {
1926         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1927         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1928         RAII_VAR(char *, data, NULL, ao2_cleanup);
1929         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1930         const char *expected_text = "SomeData";
1931         const char *expected = "Message: SomeData\r\n";
1932
1933         switch (cmd) {
1934         case TEST_INIT:
1935                 info->name = __func__;
1936                 info->category = test_category;
1937                 info->summary = "Test message to_ami function";
1938                 info->description = "Test message to_ami function";
1939                 return AST_TEST_NOT_RUN;
1940         case TEST_EXECUTE:
1941                 break;
1942         }
1943
1944         ast_test_validate(test, stasis_message_type_create("SomeMessage", &fake_vtable, &type) == STASIS_MESSAGE_TYPE_SUCCESS);
1945
1946         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1947         strcpy(data, expected_text);
1948         uut = stasis_message_create(type, data);
1949         ast_test_validate(test, NULL != uut);
1950
1951         actual = stasis_message_to_ami(uut);
1952         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1953
1954         return AST_TEST_PASS;
1955 }
1956
1957 static void noop(void *data, struct stasis_subscription *sub,
1958         struct stasis_message *message)
1959 {
1960         /* no-op */
1961 }
1962
1963 AST_TEST_DEFINE(dtor_order)
1964 {
1965         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1966         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1967
1968         switch (cmd) {
1969         case TEST_INIT:
1970                 info->name = __func__;
1971                 info->category = test_category;
1972                 info->summary = "Test that destruction order doesn't bomb stuff";
1973                 info->description = "Test that destruction order doesn't bomb stuff";
1974                 return AST_TEST_NOT_RUN;
1975         case TEST_EXECUTE:
1976                 break;
1977         }
1978
1979         topic = stasis_topic_create("test-topic");
1980         ast_test_validate(test, NULL != topic);
1981
1982         sub = stasis_subscribe(topic, noop, NULL);
1983         ast_test_validate(test, NULL != sub);
1984
1985         /* With any luck, this won't completely blow everything up */
1986         ao2_cleanup(topic);
1987         stasis_unsubscribe(sub);
1988
1989         /* These refs were cleaned up manually */
1990         topic = NULL;
1991         sub = NULL;
1992
1993         return AST_TEST_PASS;
1994 }
1995
1996 static const char *noop_get_id(struct stasis_message *message)
1997 {
1998         return NULL;
1999 }
2000
2001 AST_TEST_DEFINE(caching_dtor_order)
2002 {
2003         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
2004         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
2005         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
2006                 stasis_caching_unsubscribe);
2007         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
2008
2009         switch (cmd) {
2010         case TEST_INIT:
2011                 info->name = __func__;
2012                 info->category = test_category;
2013                 info->summary = "Test that destruction order doesn't bomb stuff";
2014                 info->description = "Test that destruction order doesn't bomb stuff";
2015                 return AST_TEST_NOT_RUN;
2016         case TEST_EXECUTE:
2017                 break;
2018         }
2019
2020         cache = stasis_cache_create(noop_get_id);
2021         ast_test_validate(test, NULL != cache);
2022
2023         topic = stasis_topic_create("test-topic");
2024         ast_test_validate(test, NULL != topic);
2025
2026         caching_topic = stasis_caching_topic_create(topic, cache);
2027         ast_test_validate(test, NULL != caching_topic);
2028
2029         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
2030                 NULL);
2031         ast_test_validate(test, NULL != sub);
2032
2033         /* With any luck, this won't completely blow everything up */
2034         ao2_cleanup(cache);
2035         ao2_cleanup(topic);
2036         stasis_caching_unsubscribe(caching_topic);
2037         stasis_unsubscribe(sub);
2038
2039         /* These refs were cleaned up manually */
2040         cache = NULL;
2041         topic = NULL;
2042         caching_topic = NULL;
2043         sub = NULL;
2044
2045         return AST_TEST_PASS;
2046 }
2047
2048 static int unload_module(void)
2049 {
2050         AST_TEST_UNREGISTER(message_type);
2051         AST_TEST_UNREGISTER(message);
2052         AST_TEST_UNREGISTER(subscription_messages);
2053         AST_TEST_UNREGISTER(subscription_pool_messages);
2054         AST_TEST_UNREGISTER(publish);
2055         AST_TEST_UNREGISTER(publish_sync);
2056         AST_TEST_UNREGISTER(publish_pool);
2057         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
2058         AST_TEST_UNREGISTER(forward);
2059         AST_TEST_UNREGISTER(cache_filter);
2060         AST_TEST_UNREGISTER(cache);
2061         AST_TEST_UNREGISTER(cache_dump);
2062         AST_TEST_UNREGISTER(cache_eid_aggregate);
2063         AST_TEST_UNREGISTER(router);
2064         AST_TEST_UNREGISTER(router_pool);
2065         AST_TEST_UNREGISTER(router_cache_updates);
2066         AST_TEST_UNREGISTER(interleaving);
2067         AST_TEST_UNREGISTER(subscription_interleaving);
2068         AST_TEST_UNREGISTER(no_to_json);
2069         AST_TEST_UNREGISTER(to_json);
2070         AST_TEST_UNREGISTER(no_to_ami);
2071         AST_TEST_UNREGISTER(to_ami);
2072         AST_TEST_UNREGISTER(dtor_order);
2073         AST_TEST_UNREGISTER(caching_dtor_order);
2074         return 0;
2075 }
2076
2077 static int load_module(void)
2078 {
2079         AST_TEST_REGISTER(message_type);
2080         AST_TEST_REGISTER(message);
2081         AST_TEST_REGISTER(subscription_messages);
2082         AST_TEST_REGISTER(subscription_pool_messages);
2083         AST_TEST_REGISTER(publish);
2084         AST_TEST_REGISTER(publish_sync);
2085         AST_TEST_REGISTER(publish_pool);
2086         AST_TEST_REGISTER(unsubscribe_stops_messages);
2087         AST_TEST_REGISTER(forward);
2088         AST_TEST_REGISTER(cache_filter);
2089         AST_TEST_REGISTER(cache);
2090         AST_TEST_REGISTER(cache_dump);
2091         AST_TEST_REGISTER(cache_eid_aggregate);
2092         AST_TEST_REGISTER(router);
2093         AST_TEST_REGISTER(router_pool);
2094         AST_TEST_REGISTER(router_cache_updates);
2095         AST_TEST_REGISTER(interleaving);
2096         AST_TEST_REGISTER(subscription_interleaving);
2097         AST_TEST_REGISTER(no_to_json);
2098         AST_TEST_REGISTER(to_json);
2099         AST_TEST_REGISTER(no_to_ami);
2100         AST_TEST_REGISTER(to_ami);
2101         AST_TEST_REGISTER(dtor_order);
2102         AST_TEST_REGISTER(caching_dtor_order);
2103         return AST_MODULE_LOAD_SUCCESS;
2104 }
2105
2106 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
2107                 .load = load_module,
2108                 .unload = unload_module
2109         );