Multiple revisions 399887,400138,400178,400180-400181
[asterisk/asterisk.git] / tests / test_stasis.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test Stasis message bus.
21  *
22  * \author\verbatim David M. Lee, II <dlee@digium.com> \endverbatim
23  *
24  * \ingroup tests
25  */
26
27 /*** MODULEINFO
28         <depend>TEST_FRAMEWORK</depend>
29         <support_level>core</support_level>
30  ***/
31
32 #include "asterisk.h"
33
34 ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
35
36 #include "asterisk/astobj2.h"
37 #include "asterisk/module.h"
38 #include "asterisk/stasis.h"
39 #include "asterisk/stasis_message_router.h"
40 #include "asterisk/test.h"
41
42 static const char *test_category = "/stasis/core/";
43
44 static struct ast_json *fake_json(struct stasis_message *message)
45 {
46         const char *text = stasis_message_data(message);
47
48         return ast_json_string_create(text);
49 }
50
51 static struct ast_manager_event_blob *fake_ami(struct stasis_message *message)
52 {
53         RAII_VAR(struct ast_manager_event_blob *, res, NULL, ao2_cleanup);
54         const char *text = stasis_message_data(message);
55
56         res = ast_manager_event_blob_create(EVENT_FLAG_TEST, "FakeMI",
57                 "Message: %s\r\n", text);
58
59         if (res == NULL) {
60                 return NULL;
61         }
62
63         ao2_ref(res, +1);
64         return res;
65 }
66
67 static struct stasis_message_vtable fake_vtable = {
68         .to_json = fake_json,
69         .to_ami = fake_ami
70 };
71
72 AST_TEST_DEFINE(message_type)
73 {
74         RAII_VAR(struct stasis_message_type *, uut, NULL, ao2_cleanup);
75
76         switch (cmd) {
77         case TEST_INIT:
78                 info->name = __func__;
79                 info->category = test_category;
80                 info->summary = "Test basic message_type functions";
81                 info->description = "Test basic message_type functions";
82                 return AST_TEST_NOT_RUN;
83         case TEST_EXECUTE:
84                 break;
85         }
86
87         ast_test_validate(test, NULL == stasis_message_type_create(NULL, NULL));
88         uut = stasis_message_type_create("SomeMessage", NULL);
89         ast_test_validate(test, 0 == strcmp(stasis_message_type_name(uut), "SomeMessage"));
90
91         return AST_TEST_PASS;
92 }
93
94 AST_TEST_DEFINE(message)
95 {
96         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
97         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
98         RAII_VAR(char *, data, NULL, ao2_cleanup);
99         char *expected = "SomeData";
100         struct timeval expected_timestamp;
101         struct timeval time_diff;
102
103         switch (cmd) {
104         case TEST_INIT:
105                 info->name = __func__;
106                 info->category = test_category;
107                 info->summary = "Test basic message functions";
108                 info->description = "Test basic message functions";
109                 return AST_TEST_NOT_RUN;
110         case TEST_EXECUTE:
111                 break;
112         }
113
114
115         type = stasis_message_type_create("SomeMessage", NULL);
116
117         ast_test_validate(test, NULL == stasis_message_create(NULL, NULL));
118         ast_test_validate(test, NULL == stasis_message_create(type, NULL));
119
120         data = ao2_alloc(strlen(expected) + 1, NULL);
121         strcpy(data, expected);
122         expected_timestamp = ast_tvnow();
123         uut = stasis_message_create(type, data);
124
125         ast_test_validate(test, NULL != uut);
126         ast_test_validate(test, type == stasis_message_type(uut));
127         ast_test_validate(test, 0 == strcmp(expected, stasis_message_data(uut)));
128         ast_test_validate(test, 2 == ao2_ref(data, 0)); /* uut has ref to data */
129
130         time_diff = ast_tvsub(*stasis_message_timestamp(uut), expected_timestamp);
131         /* 10ms is certainly long enough for the two calls to complete */
132         ast_test_validate(test, time_diff.tv_sec == 0);
133         ast_test_validate(test, time_diff.tv_usec < 10000);
134
135         ao2_ref(uut, -1);
136         uut = NULL;
137         ast_test_validate(test, 1 == ao2_ref(data, 0)); /* uut unreffed data */
138
139         return AST_TEST_PASS;
140 }
141
142 struct consumer {
143         ast_mutex_t lock;
144         ast_cond_t out;
145         struct stasis_message **messages_rxed;
146         size_t messages_rxed_len;
147         int ignore_subscriptions;
148         int complete;
149 };
150
151 static void consumer_dtor(void *obj) {
152         struct consumer *consumer = obj;
153
154         ast_mutex_destroy(&consumer->lock);
155         ast_cond_destroy(&consumer->out);
156
157         while (consumer->messages_rxed_len > 0) {
158                 ao2_cleanup(consumer->messages_rxed[--consumer->messages_rxed_len]);
159         }
160         ast_free(consumer->messages_rxed);
161         consumer->messages_rxed = NULL;
162 }
163
164 static struct consumer *consumer_create(int ignore_subscriptions) {
165         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
166
167         consumer = ao2_alloc(sizeof(*consumer), consumer_dtor);
168
169         if (!consumer) {
170                 return NULL;
171         }
172
173         consumer->ignore_subscriptions = ignore_subscriptions;
174         consumer->messages_rxed = ast_malloc(0);
175         if (!consumer->messages_rxed) {
176                 return NULL;
177         }
178
179         ast_mutex_init(&consumer->lock);
180         ast_cond_init(&consumer->out, NULL);
181
182         ao2_ref(consumer, +1);
183         return consumer;
184 }
185
186 static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
187 {
188         struct consumer *consumer = data;
189         RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
190         SCOPED_MUTEX(lock, &consumer->lock);
191
192         if (!consumer->ignore_subscriptions || stasis_message_type(message) != stasis_subscription_change_type()) {
193
194                 ++consumer->messages_rxed_len;
195                 consumer->messages_rxed = ast_realloc(consumer->messages_rxed, sizeof(*consumer->messages_rxed) * consumer->messages_rxed_len);
196                 ast_assert(consumer->messages_rxed != NULL);
197                 consumer->messages_rxed[consumer->messages_rxed_len - 1] = message;
198                 ao2_ref(message, +1);
199         }
200
201         if (stasis_subscription_final_message(sub, message)) {
202                 consumer->complete = 1;
203                 consumer_needs_cleanup = consumer;
204         }
205
206         ast_cond_signal(&consumer->out);
207 }
208
209 static int consumer_wait_for(struct consumer *consumer, size_t expected_len)
210 {
211         struct timeval start = ast_tvnow();
212         struct timespec end = {
213                 .tv_sec = start.tv_sec + 30,
214                 .tv_nsec = start.tv_usec * 1000
215         };
216
217         SCOPED_MUTEX(lock, &consumer->lock);
218
219         while (consumer->messages_rxed_len < expected_len) {
220                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
221                 if (r == ETIMEDOUT) {
222                         break;
223                 }
224                 ast_assert(r == 0); /* Not expecting any othet types of errors */
225         }
226         return consumer->messages_rxed_len;
227 }
228
229 static int consumer_wait_for_completion(struct consumer *consumer)
230 {
231         struct timeval start = ast_tvnow();
232         struct timespec end = {
233                 .tv_sec = start.tv_sec + 3,
234                 .tv_nsec = start.tv_usec * 1000
235         };
236
237         SCOPED_MUTEX(lock, &consumer->lock);
238
239         while (!consumer->complete) {
240                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
241                 if (r == ETIMEDOUT) {
242                         break;
243                 }
244                 ast_assert(r == 0); /* Not expecting any othet types of errors */
245         }
246         return consumer->complete;
247 }
248
249 static int consumer_should_stay(struct consumer *consumer, size_t expected_len)
250 {
251         struct timeval start = ast_tvnow();
252         struct timeval diff = {
253                 .tv_sec = 0,
254                 .tv_usec = 100000 /* wait for 100ms */
255         };
256         struct timeval end_tv = ast_tvadd(start, diff);
257         struct timespec end = {
258                 .tv_sec = end_tv.tv_sec,
259                 .tv_nsec = end_tv.tv_usec * 1000
260         };
261
262         SCOPED_MUTEX(lock, &consumer->lock);
263
264         while (consumer->messages_rxed_len == expected_len) {
265                 int r = ast_cond_timedwait(&consumer->out, &consumer->lock, &end);
266                 if (r == ETIMEDOUT) {
267                         break;
268                 }
269                 ast_assert(r == 0); /* Not expecting any othet types of errors */
270         }
271         return consumer->messages_rxed_len;
272 }
273
274 AST_TEST_DEFINE(subscription_messages)
275 {
276         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
277         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
278         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
279         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
280         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
281         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
282         RAII_VAR(char *, expected_uniqueid, NULL, ast_free);
283         int complete;
284         struct stasis_subscription_change *change;
285
286         switch (cmd) {
287         case TEST_INIT:
288                 info->name = __func__;
289                 info->category = test_category;
290                 info->summary = "Test subscribe/unsubscribe messages";
291                 info->description = "Test subscribe/unsubscribe messages";
292                 return AST_TEST_NOT_RUN;
293         case TEST_EXECUTE:
294                 break;
295         }
296
297         topic = stasis_topic_create("TestTopic");
298         ast_test_validate(test, NULL != topic);
299
300         consumer = consumer_create(0);
301         ast_test_validate(test, NULL != consumer);
302
303         uut = stasis_subscribe(topic, consumer_exec, consumer);
304         ast_test_validate(test, NULL != uut);
305         ao2_ref(consumer, +1);
306         expected_uniqueid = ast_strdup(stasis_subscription_uniqueid(uut));
307
308         uut = stasis_unsubscribe(uut);
309         complete = consumer_wait_for_completion(consumer);
310         ast_test_validate(test, 1 == complete);
311
312         ast_test_validate(test, 2 == consumer->messages_rxed_len);
313         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[0]));
314         ast_test_validate(test, stasis_subscription_change_type() == stasis_message_type(consumer->messages_rxed[1]));
315
316         change = stasis_message_data(consumer->messages_rxed[0]);
317         ast_test_validate(test, topic == change->topic);
318         ast_test_validate(test, 0 == strcmp("Subscribe", change->description));
319         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
320
321         change = stasis_message_data(consumer->messages_rxed[1]);
322         ast_test_validate(test, topic == change->topic);
323         ast_test_validate(test, 0 == strcmp("Unsubscribe", change->description));
324         ast_test_validate(test, 0 == strcmp(expected_uniqueid, change->uniqueid));
325
326         return AST_TEST_PASS;
327 }
328
329 AST_TEST_DEFINE(publish)
330 {
331         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
332         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
333         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
334         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
335         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
336         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
337         int actual_len;
338         const char *actual;
339
340         switch (cmd) {
341         case TEST_INIT:
342                 info->name = __func__;
343                 info->category = test_category;
344                 info->summary = "Test simple subscriptions";
345                 info->description = "Test simple subscriptions";
346                 return AST_TEST_NOT_RUN;
347         case TEST_EXECUTE:
348                 break;
349         }
350
351         topic = stasis_topic_create("TestTopic");
352         ast_test_validate(test, NULL != topic);
353
354         consumer = consumer_create(1);
355         ast_test_validate(test, NULL != consumer);
356
357         uut = stasis_subscribe(topic, consumer_exec, consumer);
358         ast_test_validate(test, NULL != uut);
359         ao2_ref(consumer, +1);
360
361         test_data = ao2_alloc(1, NULL);
362         ast_test_validate(test, NULL != test_data);
363         test_message_type = stasis_message_type_create("TestMessage", NULL);
364         test_message = stasis_message_create(test_message_type, test_data);
365
366         stasis_publish(topic, test_message);
367
368         actual_len = consumer_wait_for(consumer, 1);
369         ast_test_validate(test, 1 == actual_len);
370         actual = stasis_message_data(consumer->messages_rxed[0]);
371         ast_test_validate(test, test_data == actual);
372
373         return AST_TEST_PASS;
374 }
375
376 AST_TEST_DEFINE(unsubscribe_stops_messages)
377 {
378         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
379         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
380         RAII_VAR(struct stasis_subscription *, uut, NULL, stasis_unsubscribe);
381         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
382         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
383         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
384         int actual_len;
385
386         switch (cmd) {
387         case TEST_INIT:
388                 info->name = __func__;
389                 info->category = test_category;
390                 info->summary = "Test simple subscriptions";
391                 info->description = "Test simple subscriptions";
392                 return AST_TEST_NOT_RUN;
393         case TEST_EXECUTE:
394                 break;
395         }
396
397         topic = stasis_topic_create("TestTopic");
398         ast_test_validate(test, NULL != topic);
399
400         consumer = consumer_create(1);
401         ast_test_validate(test, NULL != consumer);
402
403         uut = stasis_subscribe(topic, consumer_exec, consumer);
404         ast_test_validate(test, NULL != uut);
405         ao2_ref(consumer, +1);
406
407         uut = stasis_unsubscribe(uut);
408
409         test_data = ao2_alloc(1, NULL);
410         ast_test_validate(test, NULL != test_data);
411         test_message_type = stasis_message_type_create("TestMessage", NULL);
412         test_message = stasis_message_create(test_message_type, test_data);
413
414         stasis_publish(topic, test_message);
415
416         actual_len = consumer_should_stay(consumer, 0);
417         ast_test_validate(test, 0 == actual_len);
418
419         return AST_TEST_PASS;
420 }
421
422 AST_TEST_DEFINE(forward)
423 {
424         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
425         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
426
427         RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
428         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
429
430         RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
431         RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
432         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
433
434         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
435         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
436         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
437         int actual_len;
438
439         switch (cmd) {
440         case TEST_INIT:
441                 info->name = __func__;
442                 info->category = test_category;
443                 info->summary = "Test sending events to a parent topic";
444                 info->description = "Test sending events to a parent topic.\n"
445                         "This test creates three topics (one parent, two children)\n"
446                         "and publishes a message to one child, and verifies it's\n"
447                         "only seen by that child and the parent";
448                 return AST_TEST_NOT_RUN;
449         case TEST_EXECUTE:
450                 break;
451         }
452
453         parent_topic = stasis_topic_create("ParentTestTopic");
454         ast_test_validate(test, NULL != parent_topic);
455         topic = stasis_topic_create("TestTopic");
456         ast_test_validate(test, NULL != topic);
457
458         forward_sub = stasis_forward_all(topic, parent_topic);
459         ast_test_validate(test, NULL != forward_sub);
460
461         parent_consumer = consumer_create(1);
462         ast_test_validate(test, NULL != parent_consumer);
463         consumer = consumer_create(1);
464         ast_test_validate(test, NULL != consumer);
465
466         parent_sub = stasis_subscribe(parent_topic, consumer_exec, parent_consumer);
467         ast_test_validate(test, NULL != parent_sub);
468         ao2_ref(parent_consumer, +1);
469         sub = stasis_subscribe(topic, consumer_exec, consumer);
470         ast_test_validate(test, NULL != sub);
471         ao2_ref(consumer, +1);
472
473         test_data = ao2_alloc(1, NULL);
474         ast_test_validate(test, NULL != test_data);
475         test_message_type = stasis_message_type_create("TestMessage", NULL);
476         test_message = stasis_message_create(test_message_type, test_data);
477
478         stasis_publish(topic, test_message);
479
480         actual_len = consumer_wait_for(consumer, 1);
481         ast_test_validate(test, 1 == actual_len);
482         actual_len = consumer_wait_for(parent_consumer, 1);
483         ast_test_validate(test, 1 == actual_len);
484
485         return AST_TEST_PASS;
486 }
487
488 AST_TEST_DEFINE(interleaving)
489 {
490         RAII_VAR(struct stasis_topic *, parent_topic, NULL, ao2_cleanup);
491         RAII_VAR(struct stasis_topic *, topic1, NULL, ao2_cleanup);
492         RAII_VAR(struct stasis_topic *, topic2, NULL, ao2_cleanup);
493
494         RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
495
496         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
497
498         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
499         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
500         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
501
502         RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
503         RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
504         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
505
506         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
507
508         int actual_len;
509
510         switch (cmd) {
511         case TEST_INIT:
512                 info->name = __func__;
513                 info->category = test_category;
514                 info->summary = "Test sending interleaved events to a parent topic";
515                 info->description = "Test sending events to a parent topic.\n"
516                         "This test creates three topics (one parent, two children)\n"
517                         "and publishes messages alternately between the children.\n"
518                         "It verifies that the messages are received in the expected\n"
519                         "order.";
520                 return AST_TEST_NOT_RUN;
521         case TEST_EXECUTE:
522                 break;
523         }
524
525         test_message_type = stasis_message_type_create("test", NULL);
526         ast_test_validate(test, NULL != test_message_type);
527
528         test_data = ao2_alloc(1, NULL);
529         ast_test_validate(test, NULL != test_data);
530
531         test_message1 = stasis_message_create(test_message_type, test_data);
532         ast_test_validate(test, NULL != test_message1);
533         test_message2 = stasis_message_create(test_message_type, test_data);
534         ast_test_validate(test, NULL != test_message2);
535         test_message3 = stasis_message_create(test_message_type, test_data);
536         ast_test_validate(test, NULL != test_message3);
537
538         parent_topic = stasis_topic_create("ParentTestTopic");
539         ast_test_validate(test, NULL != parent_topic);
540         topic1 = stasis_topic_create("Topic1");
541         ast_test_validate(test, NULL != topic1);
542         topic2 = stasis_topic_create("Topic2");
543         ast_test_validate(test, NULL != topic2);
544
545         forward_sub1 = stasis_forward_all(topic1, parent_topic);
546         ast_test_validate(test, NULL != forward_sub1);
547         forward_sub2 = stasis_forward_all(topic2, parent_topic);
548         ast_test_validate(test, NULL != forward_sub2);
549
550         consumer = consumer_create(1);
551         ast_test_validate(test, NULL != consumer);
552
553         sub = stasis_subscribe(parent_topic, consumer_exec, consumer);
554         ast_test_validate(test, NULL != sub);
555         ao2_ref(consumer, +1);
556
557         stasis_publish(topic1, test_message1);
558         stasis_publish(topic2, test_message2);
559         stasis_publish(topic1, test_message3);
560
561         actual_len = consumer_wait_for(consumer, 3);
562         ast_test_validate(test, 3 == actual_len);
563
564         ast_test_validate(test, test_message1 == consumer->messages_rxed[0]);
565         ast_test_validate(test, test_message2 == consumer->messages_rxed[1]);
566         ast_test_validate(test, test_message3 == consumer->messages_rxed[2]);
567
568         return AST_TEST_PASS;
569 }
570
571 struct cache_test_data {
572         char *id;
573         char *value;
574 };
575
576 static void cache_test_data_dtor(void *obj)
577 {
578         struct cache_test_data *data = obj;
579         ast_free(data->id);
580         ast_free(data->value);
581 }
582
583 static struct stasis_message *cache_test_message_create(struct stasis_message_type *type, const char *name, const char *value)
584 {
585         RAII_VAR(struct cache_test_data *, data, NULL, ao2_cleanup);
586
587         data = ao2_alloc(sizeof(*data), cache_test_data_dtor);
588         if (data == NULL) {
589                 return NULL;
590         }
591
592         ast_assert(name != NULL);
593         ast_assert(value != NULL);
594
595         data->id = ast_strdup(name);
596         data->value = ast_strdup(value);
597         if (!data->id || !data->value) {
598                 return NULL;
599         }
600
601         return stasis_message_create(type, data);
602 }
603
604 static const char *cache_test_data_id(struct stasis_message *message) {
605         struct cache_test_data *cachable = stasis_message_data(message);
606
607         if (0 != strcmp("Cacheable", stasis_message_type_name(stasis_message_type(message)))) {
608                 return NULL;
609         }
610         return cachable->id;
611 }
612
613 AST_TEST_DEFINE(cache_filter)
614 {
615         RAII_VAR(struct stasis_message_type *, non_cache_type, NULL, ao2_cleanup);
616         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
617         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
618         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
619         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
620         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
621         RAII_VAR(struct stasis_message *, test_message, NULL, ao2_cleanup);
622         int actual_len;
623
624         switch (cmd) {
625         case TEST_INIT:
626                 info->name = __func__;
627                 info->category = test_category;
628                 info->summary = "Test caching topics only forward cache_update messages.";
629                 info->description = "Test caching topics only forward cache_update messages.";
630                 return AST_TEST_NOT_RUN;
631         case TEST_EXECUTE:
632                 break;
633         }
634
635         non_cache_type = stasis_message_type_create("NonCacheable", NULL);
636         ast_test_validate(test, NULL != non_cache_type);
637         topic = stasis_topic_create("SomeTopic");
638         ast_test_validate(test, NULL != topic);
639         cache = stasis_cache_create(cache_test_data_id);
640         ast_test_validate(test, NULL != cache);
641         caching_topic = stasis_caching_topic_create(topic, cache);
642         ast_test_validate(test, NULL != caching_topic);
643         consumer = consumer_create(1);
644         ast_test_validate(test, NULL != consumer);
645         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
646         ast_test_validate(test, NULL != sub);
647         ao2_ref(consumer, +1);
648
649         test_message = cache_test_message_create(non_cache_type, "1", "1");
650         ast_test_validate(test, NULL != test_message);
651
652         stasis_publish(topic, test_message);
653
654         actual_len = consumer_should_stay(consumer, 0);
655         ast_test_validate(test, 0 == actual_len);
656
657         return AST_TEST_PASS;
658 }
659
660 AST_TEST_DEFINE(cache)
661 {
662         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
663         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
664         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
665         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
666         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
667         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
668         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
669         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
670         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
671         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
672         int actual_len;
673         struct stasis_cache_update *actual_update;
674
675         switch (cmd) {
676         case TEST_INIT:
677                 info->name = __func__;
678                 info->category = test_category;
679                 info->summary = "Test passing messages through cache topic unscathed.";
680                 info->description = "Test passing messages through cache topic unscathed.";
681                 return AST_TEST_NOT_RUN;
682         case TEST_EXECUTE:
683                 break;
684         }
685
686         cache_type = stasis_message_type_create("Cacheable", NULL);
687         ast_test_validate(test, NULL != cache_type);
688         topic = stasis_topic_create("SomeTopic");
689         ast_test_validate(test, NULL != topic);
690         cache = stasis_cache_create(cache_test_data_id);
691         ast_test_validate(test, NULL != cache);
692         caching_topic = stasis_caching_topic_create(topic, cache);
693         ast_test_validate(test, NULL != caching_topic);
694         consumer = consumer_create(1);
695         ast_test_validate(test, NULL != consumer);
696         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
697         ast_test_validate(test, NULL != sub);
698         ao2_ref(consumer, +1);
699
700         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
701         ast_test_validate(test, NULL != test_message1_1);
702         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
703         ast_test_validate(test, NULL != test_message2_1);
704
705         /* Post a couple of snapshots */
706         stasis_publish(topic, test_message1_1);
707         stasis_publish(topic, test_message2_1);
708         actual_len = consumer_wait_for(consumer, 2);
709         ast_test_validate(test, 2 == actual_len);
710
711         /* Check for new snapshot messages */
712         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
713         actual_update = stasis_message_data(consumer->messages_rxed[0]);
714         ast_test_validate(test, NULL == actual_update->old_snapshot);
715         ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
716         ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
717         /* stasis_cache_get returned a ref, so unref test_message1_1 */
718         ao2_ref(test_message1_1, -1);
719
720         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
721         actual_update = stasis_message_data(consumer->messages_rxed[1]);
722         ast_test_validate(test, NULL == actual_update->old_snapshot);
723         ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
724         ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
725         /* stasis_cache_get returned a ref, so unref test_message2_1 */
726         ao2_ref(test_message2_1, -1);
727
728         /* Update snapshot 2 */
729         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
730         ast_test_validate(test, NULL != test_message2_2);
731         stasis_publish(topic, test_message2_2);
732
733         actual_len = consumer_wait_for(consumer, 3);
734         ast_test_validate(test, 3 == actual_len);
735
736         actual_update = stasis_message_data(consumer->messages_rxed[2]);
737         ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
738         ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
739         ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
740         /* stasis_cache_get returned a ref, so unref test_message2_2 */
741         ao2_ref(test_message2_2, -1);
742
743         /* Clear snapshot 1 */
744         test_message1_clear = stasis_cache_clear_create(test_message1_1);
745         ast_test_validate(test, NULL != test_message1_clear);
746         stasis_publish(topic, test_message1_clear);
747
748         actual_len = consumer_wait_for(consumer, 4);
749         ast_test_validate(test, 4 == actual_len);
750
751         actual_update = stasis_message_data(consumer->messages_rxed[3]);
752         ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
753         ast_test_validate(test, NULL == actual_update->new_snapshot);
754         ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
755
756         return AST_TEST_PASS;
757 }
758
759 AST_TEST_DEFINE(cache_dump)
760 {
761         RAII_VAR(struct stasis_message_type *, cache_type, NULL, ao2_cleanup);
762         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
763         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
764         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe);
765         RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
766         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
767         RAII_VAR(struct stasis_message *, test_message1_1, NULL, ao2_cleanup);
768         RAII_VAR(struct stasis_message *, test_message2_1, NULL, ao2_cleanup);
769         RAII_VAR(struct stasis_message *, test_message2_2, NULL, ao2_cleanup);
770         RAII_VAR(struct stasis_message *, test_message1_clear, NULL, ao2_cleanup);
771         RAII_VAR(struct ao2_container *, cache_dump, NULL, ao2_cleanup);
772         int actual_len;
773         struct ao2_iterator i;
774         void *obj;
775
776         switch (cmd) {
777         case TEST_INIT:
778                 info->name = __func__;
779                 info->category = test_category;
780                 info->summary = "Test passing messages through cache topic unscathed.";
781                 info->description = "Test passing messages through cache topic unscathed.";
782                 return AST_TEST_NOT_RUN;
783         case TEST_EXECUTE:
784                 break;
785         }
786
787         cache_type = stasis_message_type_create("Cacheable", NULL);
788         ast_test_validate(test, NULL != cache_type);
789         topic = stasis_topic_create("SomeTopic");
790         ast_test_validate(test, NULL != topic);
791         cache = stasis_cache_create(cache_test_data_id);
792         ast_test_validate(test, NULL != cache);
793         caching_topic = stasis_caching_topic_create(topic, cache);
794         ast_test_validate(test, NULL != caching_topic);
795         consumer = consumer_create(1);
796         ast_test_validate(test, NULL != consumer);
797         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), consumer_exec, consumer);
798         ast_test_validate(test, NULL != sub);
799         ao2_ref(consumer, +1);
800
801         test_message1_1 = cache_test_message_create(cache_type, "1", "1");
802         ast_test_validate(test, NULL != test_message1_1);
803         test_message2_1 = cache_test_message_create(cache_type, "2", "1");
804         ast_test_validate(test, NULL != test_message2_1);
805
806         /* Post a couple of snapshots */
807         stasis_publish(topic, test_message1_1);
808         stasis_publish(topic, test_message2_1);
809         actual_len = consumer_wait_for(consumer, 2);
810         ast_test_validate(test, 2 == actual_len);
811
812         /* Check the cache */
813         cache_dump = stasis_cache_dump(cache, NULL);
814         ast_test_validate(test, NULL != cache_dump);
815         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
816         i = ao2_iterator_init(cache_dump, 0);
817         while ((obj = ao2_iterator_next(&i))) {
818                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
819                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_1);
820         }
821
822         /* Update snapshot 2 */
823         test_message2_2 = cache_test_message_create(cache_type, "2", "2");
824         ast_test_validate(test, NULL != test_message2_2);
825         stasis_publish(topic, test_message2_2);
826
827         actual_len = consumer_wait_for(consumer, 3);
828         ast_test_validate(test, 3 == actual_len);
829
830         /* Check the cache */
831         cache_dump = stasis_cache_dump(cache, NULL);
832         ast_test_validate(test, NULL != cache_dump);
833         ast_test_validate(test, 2 == ao2_container_count(cache_dump));
834         i = ao2_iterator_init(cache_dump, 0);
835         while ((obj = ao2_iterator_next(&i))) {
836                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
837                 ast_test_validate(test, actual_cache_entry == test_message1_1 || actual_cache_entry == test_message2_2);
838         }
839
840         /* Clear snapshot 1 */
841         test_message1_clear = stasis_cache_clear_create(test_message1_1);
842         ast_test_validate(test, NULL != test_message1_clear);
843         stasis_publish(topic, test_message1_clear);
844
845         actual_len = consumer_wait_for(consumer, 4);
846         ast_test_validate(test, 4 == actual_len);
847
848         /* Check the cache */
849         cache_dump = stasis_cache_dump(cache, NULL);
850         ast_test_validate(test, NULL != cache_dump);
851         ast_test_validate(test, 1 == ao2_container_count(cache_dump));
852         i = ao2_iterator_init(cache_dump, 0);
853         while ((obj = ao2_iterator_next(&i))) {
854                 RAII_VAR(struct stasis_message *, actual_cache_entry, obj, ao2_cleanup);
855                 ast_test_validate(test, actual_cache_entry == test_message2_2);
856         }
857
858         /* Dump the cache to ensure that it has no subscription change items in it since those aren't cached */
859         ao2_cleanup(cache_dump);
860         cache_dump = stasis_cache_dump(cache, stasis_subscription_change_type());
861         ast_test_validate(test, 0 == ao2_container_count(cache_dump));
862
863         return AST_TEST_PASS;
864 }
865
866 AST_TEST_DEFINE(router)
867 {
868         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
869         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
870         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
871         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
872         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
873         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
874         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
875         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
876         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
877         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
878         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
879         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
880         int actual_len, ret;
881         struct stasis_message *actual;
882
883         switch (cmd) {
884         case TEST_INIT:
885                 info->name = __func__;
886                 info->category = test_category;
887                 info->summary = "Test simple message routing";
888                 info->description = "Test simple message routing";
889                 return AST_TEST_NOT_RUN;
890         case TEST_EXECUTE:
891                 break;
892         }
893
894         topic = stasis_topic_create("TestTopic");
895         ast_test_validate(test, NULL != topic);
896
897         consumer1 = consumer_create(1);
898         ast_test_validate(test, NULL != consumer1);
899         consumer2 = consumer_create(1);
900         ast_test_validate(test, NULL != consumer2);
901         consumer3 = consumer_create(1);
902         ast_test_validate(test, NULL != consumer3);
903
904         test_message_type1 = stasis_message_type_create("TestMessage1", NULL);
905         ast_test_validate(test, NULL != test_message_type1);
906         test_message_type2 = stasis_message_type_create("TestMessage2", NULL);
907         ast_test_validate(test, NULL != test_message_type2);
908         test_message_type3 = stasis_message_type_create("TestMessage3", NULL);
909         ast_test_validate(test, NULL != test_message_type3);
910
911         uut = stasis_message_router_create(topic);
912         ast_test_validate(test, NULL != uut);
913
914         ret = stasis_message_router_add(
915                 uut, test_message_type1, consumer_exec, consumer1);
916         ast_test_validate(test, 0 == ret);
917         ao2_ref(consumer1, +1);
918         ret = stasis_message_router_add(
919                 uut, test_message_type2, consumer_exec, consumer2);
920         ast_test_validate(test, 0 == ret);
921         ao2_ref(consumer2, +1);
922         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
923         ast_test_validate(test, 0 == ret);
924         ao2_ref(consumer3, +1);
925
926         test_data = ao2_alloc(1, NULL);
927         ast_test_validate(test, NULL != test_data);
928         test_message1 = stasis_message_create(test_message_type1, test_data);
929         ast_test_validate(test, NULL != test_message1);
930         test_message2 = stasis_message_create(test_message_type2, test_data);
931         ast_test_validate(test, NULL != test_message2);
932         test_message3 = stasis_message_create(test_message_type3, test_data);
933         ast_test_validate(test, NULL != test_message3);
934
935         stasis_publish(topic, test_message1);
936         stasis_publish(topic, test_message2);
937         stasis_publish(topic, test_message3);
938
939         actual_len = consumer_wait_for(consumer1, 1);
940         ast_test_validate(test, 1 == actual_len);
941         actual_len = consumer_wait_for(consumer2, 1);
942         ast_test_validate(test, 1 == actual_len);
943         actual_len = consumer_wait_for(consumer3, 1);
944         ast_test_validate(test, 1 == actual_len);
945
946         actual = consumer1->messages_rxed[0];
947         ast_test_validate(test, test_message1 == actual);
948
949         actual = consumer2->messages_rxed[0];
950         ast_test_validate(test, test_message2 == actual);
951
952         actual = consumer3->messages_rxed[0];
953         ast_test_validate(test, test_message3 == actual);
954
955         /* consumer1 and consumer2 do not get the final message. */
956         ao2_cleanup(consumer1);
957         ao2_cleanup(consumer2);
958
959         return AST_TEST_PASS;
960 }
961
962 static const char *cache_simple(struct stasis_message *message) {
963         const char *type_name =
964                 stasis_message_type_name(stasis_message_type(message));
965         if (!ast_begins_with(type_name, "Cache")) {
966                 return NULL;
967         }
968
969         return "cached";
970 }
971
972 AST_TEST_DEFINE(router_cache_updates)
973 {
974         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
975         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
976         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL, stasis_caching_unsubscribe_and_join);
977         RAII_VAR(struct stasis_message_type *, test_message_type1, NULL, ao2_cleanup);
978         RAII_VAR(struct stasis_message_type *, test_message_type2, NULL, ao2_cleanup);
979         RAII_VAR(struct stasis_message_type *, test_message_type3, NULL, ao2_cleanup);
980         RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
981         RAII_VAR(char *, test_data, NULL, ao2_cleanup);
982         RAII_VAR(struct stasis_message *, test_message1, NULL, ao2_cleanup);
983         RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
984         RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
985         RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
986         RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
987         RAII_VAR(struct consumer *, consumer3, NULL, ao2_cleanup);
988         RAII_VAR(struct stasis_message *, message1, NULL, ao2_cleanup);
989         RAII_VAR(struct stasis_message *, message2, NULL, ao2_cleanup);
990         struct stasis_cache_update *update;
991         int actual_len, ret;
992         struct stasis_message *actual;
993
994         switch (cmd) {
995         case TEST_INIT:
996                 info->name = __func__;
997                 info->category = test_category;
998                 info->summary = "Test special handling cache_update messages";
999                 info->description = "Test special handling cache_update messages";
1000                 return AST_TEST_NOT_RUN;
1001         case TEST_EXECUTE:
1002                 break;
1003         }
1004
1005         topic = stasis_topic_create("TestTopic");
1006         ast_test_validate(test, NULL != topic);
1007
1008         cache = stasis_cache_create(cache_simple);
1009         ast_test_validate(test, NULL != cache);
1010         caching_topic = stasis_caching_topic_create(topic, cache);
1011         ast_test_validate(test, NULL != caching_topic);
1012
1013         consumer1 = consumer_create(1);
1014         ast_test_validate(test, NULL != consumer1);
1015         consumer2 = consumer_create(1);
1016         ast_test_validate(test, NULL != consumer2);
1017         consumer3 = consumer_create(1);
1018         ast_test_validate(test, NULL != consumer3);
1019
1020         test_message_type1 = stasis_message_type_create("Cache1", NULL);
1021         ast_test_validate(test, NULL != test_message_type1);
1022         test_message_type2 = stasis_message_type_create("Cache2", NULL);
1023         ast_test_validate(test, NULL != test_message_type2);
1024         test_message_type3 = stasis_message_type_create("NonCache", NULL);
1025         ast_test_validate(test, NULL != test_message_type3);
1026
1027         uut = stasis_message_router_create(
1028                 stasis_caching_get_topic(caching_topic));
1029         ast_test_validate(test, NULL != uut);
1030
1031         ret = stasis_message_router_add_cache_update(
1032                 uut, test_message_type1, consumer_exec, consumer1);
1033         ast_test_validate(test, 0 == ret);
1034         ao2_ref(consumer1, +1);
1035         ret = stasis_message_router_add(
1036                 uut, stasis_cache_update_type(), consumer_exec, consumer2);
1037         ast_test_validate(test, 0 == ret);
1038         ao2_ref(consumer2, +1);
1039         ret = stasis_message_router_set_default(uut, consumer_exec, consumer3);
1040         ast_test_validate(test, 0 == ret);
1041         ao2_ref(consumer3, +1);
1042
1043         test_data = ao2_alloc(1, NULL);
1044         ast_test_validate(test, NULL != test_data);
1045         test_message1 = stasis_message_create(test_message_type1, test_data);
1046         ast_test_validate(test, NULL != test_message1);
1047         test_message2 = stasis_message_create(test_message_type2, test_data);
1048         ast_test_validate(test, NULL != test_message2);
1049         test_message3 = stasis_message_create(test_message_type3, test_data);
1050         ast_test_validate(test, NULL != test_message3);
1051
1052         stasis_publish(topic, test_message1);
1053         stasis_publish(topic, test_message2);
1054         stasis_publish(topic, test_message3);
1055
1056         actual_len = consumer_wait_for(consumer1, 1);
1057         ast_test_validate(test, 1 == actual_len);
1058         actual_len = consumer_wait_for(consumer2, 1);
1059         ast_test_validate(test, 1 == actual_len);
1060         /* Uncacheable message should not be passed through */
1061         actual_len = consumer_should_stay(consumer3, 0);
1062         ast_test_validate(test, 0 == actual_len);
1063
1064         actual = consumer1->messages_rxed[0];
1065         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1066         update = stasis_message_data(actual);
1067         ast_test_validate(test, test_message_type1 == update->type);
1068         ast_test_validate(test, test_message1 == update->new_snapshot);
1069
1070         actual = consumer2->messages_rxed[0];
1071         ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(actual));
1072         update = stasis_message_data(actual);
1073         ast_test_validate(test, test_message_type2 == update->type);
1074         ast_test_validate(test, test_message2 == update->new_snapshot);
1075
1076         /* consumer1 and consumer2 do not get the final message. */
1077         ao2_cleanup(consumer1);
1078         ao2_cleanup(consumer2);
1079
1080         return AST_TEST_PASS;
1081 }
1082
1083 AST_TEST_DEFINE(no_to_json)
1084 {
1085         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1086         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1087         RAII_VAR(char *, data, NULL, ao2_cleanup);
1088         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1089         char *expected = "SomeData";
1090
1091         switch (cmd) {
1092         case TEST_INIT:
1093                 info->name = __func__;
1094                 info->category = test_category;
1095                 info->summary = "Test message to_json function";
1096                 info->description = "Test message to_json function";
1097                 return AST_TEST_NOT_RUN;
1098         case TEST_EXECUTE:
1099                 break;
1100         }
1101
1102         /* Test NULL */
1103         actual = stasis_message_to_json(NULL);
1104         ast_test_validate(test, NULL == actual);
1105
1106         /* Test message with NULL to_json function */
1107         type = stasis_message_type_create("SomeMessage", NULL);
1108
1109         data = ao2_alloc(strlen(expected) + 1, NULL);
1110         strcpy(data, expected);
1111         uut = stasis_message_create(type, data);
1112         ast_test_validate(test, NULL != uut);
1113
1114         actual = stasis_message_to_json(uut);
1115         ast_test_validate(test, NULL == actual);
1116
1117         return AST_TEST_PASS;
1118 }
1119
1120 AST_TEST_DEFINE(to_json)
1121 {
1122         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1123         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1124         RAII_VAR(char *, data, NULL, ao2_cleanup);
1125         RAII_VAR(struct ast_json *, actual, NULL, ast_json_unref);
1126         const char *expected_text = "SomeData";
1127         RAII_VAR(struct ast_json *, expected, NULL, ast_json_unref);
1128
1129         switch (cmd) {
1130         case TEST_INIT:
1131                 info->name = __func__;
1132                 info->category = test_category;
1133                 info->summary = "Test message to_json function when NULL";
1134                 info->description = "Test message to_json function when NULL";
1135                 return AST_TEST_NOT_RUN;
1136         case TEST_EXECUTE:
1137                 break;
1138         }
1139
1140         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1141
1142         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1143         strcpy(data, expected_text);
1144         uut = stasis_message_create(type, data);
1145         ast_test_validate(test, NULL != uut);
1146
1147         expected = ast_json_string_create(expected_text);
1148         actual = stasis_message_to_json(uut);
1149         ast_test_validate(test, ast_json_equal(expected, actual));
1150
1151         return AST_TEST_PASS;
1152 }
1153
1154 AST_TEST_DEFINE(no_to_ami)
1155 {
1156         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1157         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1158         RAII_VAR(char *, data, NULL, ao2_cleanup);
1159         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1160         char *expected = "SomeData";
1161
1162         switch (cmd) {
1163         case TEST_INIT:
1164                 info->name = __func__;
1165                 info->category = test_category;
1166                 info->summary = "Test message to_ami function when NULL";
1167                 info->description = "Test message to_ami function when NULL";
1168                 return AST_TEST_NOT_RUN;
1169         case TEST_EXECUTE:
1170                 break;
1171         }
1172
1173         /* Test NULL */
1174         actual = stasis_message_to_ami(NULL);
1175         ast_test_validate(test, NULL == actual);
1176
1177         /* Test message with NULL to_ami function */
1178         type = stasis_message_type_create("SomeMessage", NULL);
1179
1180         data = ao2_alloc(strlen(expected) + 1, NULL);
1181         strcpy(data, expected);
1182         uut = stasis_message_create(type, data);
1183         ast_test_validate(test, NULL != uut);
1184
1185         actual = stasis_message_to_ami(uut);
1186         ast_test_validate(test, NULL == actual);
1187
1188         return AST_TEST_PASS;
1189 }
1190
1191 AST_TEST_DEFINE(to_ami)
1192 {
1193         RAII_VAR(struct stasis_message_type *, type, NULL, ao2_cleanup);
1194         RAII_VAR(struct stasis_message *, uut, NULL, ao2_cleanup);
1195         RAII_VAR(char *, data, NULL, ao2_cleanup);
1196         RAII_VAR(struct ast_manager_event_blob *, actual, NULL, ao2_cleanup);
1197         const char *expected_text = "SomeData";
1198         const char *expected = "Message: SomeData\r\n";
1199
1200         switch (cmd) {
1201         case TEST_INIT:
1202                 info->name = __func__;
1203                 info->category = test_category;
1204                 info->summary = "Test message to_ami function";
1205                 info->description = "Test message to_ami function";
1206                 return AST_TEST_NOT_RUN;
1207         case TEST_EXECUTE:
1208                 break;
1209         }
1210
1211         type = stasis_message_type_create("SomeMessage", &fake_vtable);
1212
1213         data = ao2_alloc(strlen(expected_text) + 1, NULL);
1214         strcpy(data, expected_text);
1215         uut = stasis_message_create(type, data);
1216         ast_test_validate(test, NULL != uut);
1217
1218         actual = stasis_message_to_ami(uut);
1219         ast_test_validate(test, strcmp(expected, actual->extra_fields) == 0);
1220
1221         return AST_TEST_PASS;
1222 }
1223
1224 static void noop(void *data, struct stasis_subscription *sub,
1225         struct stasis_message *message)
1226 {
1227         /* no-op */
1228 }
1229
1230 AST_TEST_DEFINE(dtor_order)
1231 {
1232         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1233         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1234
1235         switch (cmd) {
1236         case TEST_INIT:
1237                 info->name = __func__;
1238                 info->category = test_category;
1239                 info->summary = "Test that destruction order doesn't bomb stuff";
1240                 info->description = "Test that destruction order doesn't bomb stuff";
1241                 return AST_TEST_NOT_RUN;
1242         case TEST_EXECUTE:
1243                 break;
1244         }
1245
1246         topic = stasis_topic_create("test-topic");
1247         ast_test_validate(test, NULL != topic);
1248
1249         sub = stasis_subscribe(topic, noop, NULL);
1250         ast_test_validate(test, NULL != sub);
1251
1252         /* With any luck, this won't completely blow everything up */
1253         ao2_cleanup(topic);
1254         stasis_unsubscribe(sub);
1255
1256         /* These refs were cleaned up manually */
1257         topic = NULL;
1258         sub = NULL;
1259
1260         return AST_TEST_PASS;
1261 }
1262
1263 static const char *noop_get_id(struct stasis_message *message)
1264 {
1265         return NULL;
1266 }
1267
1268 AST_TEST_DEFINE(caching_dtor_order)
1269 {
1270         RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
1271         RAII_VAR(struct stasis_cache *, cache, NULL, ao2_cleanup);
1272         RAII_VAR(struct stasis_caching_topic *, caching_topic, NULL,
1273                 stasis_caching_unsubscribe);
1274         RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
1275
1276         switch (cmd) {
1277         case TEST_INIT:
1278                 info->name = __func__;
1279                 info->category = test_category;
1280                 info->summary = "Test that destruction order doesn't bomb stuff";
1281                 info->description = "Test that destruction order doesn't bomb stuff";
1282                 return AST_TEST_NOT_RUN;
1283         case TEST_EXECUTE:
1284                 break;
1285         }
1286
1287         cache = stasis_cache_create(noop_get_id);
1288         ast_test_validate(test, NULL != cache);
1289
1290         topic = stasis_topic_create("test-topic");
1291         ast_test_validate(test, NULL != topic);
1292
1293         caching_topic = stasis_caching_topic_create(topic, cache);
1294         ast_test_validate(test, NULL != caching_topic);
1295
1296         sub = stasis_subscribe(stasis_caching_get_topic(caching_topic), noop,
1297                 NULL);
1298         ast_test_validate(test, NULL != sub);
1299
1300         /* With any luck, this won't completely blow everything up */
1301         ao2_cleanup(cache);
1302         ao2_cleanup(topic);
1303         stasis_caching_unsubscribe(caching_topic);
1304         stasis_unsubscribe(sub);
1305
1306         /* These refs were cleaned up manually */
1307         cache = NULL;
1308         topic = NULL;
1309         caching_topic = NULL;
1310         sub = NULL;
1311
1312         return AST_TEST_PASS;
1313 }
1314
1315 static int unload_module(void)
1316 {
1317         AST_TEST_UNREGISTER(message_type);
1318         AST_TEST_UNREGISTER(message);
1319         AST_TEST_UNREGISTER(subscription_messages);
1320         AST_TEST_UNREGISTER(publish);
1321         AST_TEST_UNREGISTER(unsubscribe_stops_messages);
1322         AST_TEST_UNREGISTER(forward);
1323         AST_TEST_UNREGISTER(cache_filter);
1324         AST_TEST_UNREGISTER(cache);
1325         AST_TEST_UNREGISTER(cache_dump);
1326         AST_TEST_UNREGISTER(router);
1327         AST_TEST_UNREGISTER(router_cache_updates);
1328         AST_TEST_UNREGISTER(interleaving);
1329         AST_TEST_UNREGISTER(no_to_json);
1330         AST_TEST_UNREGISTER(to_json);
1331         AST_TEST_UNREGISTER(no_to_ami);
1332         AST_TEST_UNREGISTER(to_ami);
1333         AST_TEST_UNREGISTER(dtor_order);
1334         AST_TEST_UNREGISTER(caching_dtor_order);
1335         return 0;
1336 }
1337
1338 static int load_module(void)
1339 {
1340         AST_TEST_REGISTER(message_type);
1341         AST_TEST_REGISTER(message);
1342         AST_TEST_REGISTER(subscription_messages);
1343         AST_TEST_REGISTER(publish);
1344         AST_TEST_REGISTER(unsubscribe_stops_messages);
1345         AST_TEST_REGISTER(forward);
1346         AST_TEST_REGISTER(cache_filter);
1347         AST_TEST_REGISTER(cache);
1348         AST_TEST_REGISTER(cache_dump);
1349         AST_TEST_REGISTER(router);
1350         AST_TEST_REGISTER(router_cache_updates);
1351         AST_TEST_REGISTER(interleaving);
1352         AST_TEST_REGISTER(no_to_json);
1353         AST_TEST_REGISTER(to_json);
1354         AST_TEST_REGISTER(no_to_ami);
1355         AST_TEST_REGISTER(to_ami);
1356         AST_TEST_REGISTER(dtor_order);
1357         AST_TEST_REGISTER(caching_dtor_order);
1358         return AST_MODULE_LOAD_SUCCESS;
1359 }
1360
1361 AST_MODULE_INFO(ASTERISK_GPL_KEY, 0, "Stasis testing",
1362                 .load = load_module,
1363                 .unload = unload_module
1364         );