Stasis: Allow message types to be blocked
[asterisk/asterisk.git] / res / res_stasis_test.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file \brief Test infrastructure for dealing with Stasis.
21  *
22  * \author David M. Lee, II <dlee@digium.com>
23  */
24
25 /*** MODULEINFO
26         <depend>TEST_FRAMEWORK</depend>
27         <support_level>core</support_level>
28  ***/
29
30 #include "asterisk.h"
31
32 ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
33
34 #include "asterisk/astobj2.h"
35 #include "asterisk/module.h"
36 #include "asterisk/stasis_test.h"
37
38 STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
39
40 static void stasis_message_sink_dtor(void *obj)
41 {
42         struct stasis_message_sink *sink = obj;
43
44         {
45                 SCOPED_MUTEX(lock, &sink->lock);
46                 while (!sink->is_done) {
47                         /* Normally waiting forever is bad, but if we're not
48                          * done, we're not done. */
49                         ast_cond_wait(&sink->cond, &sink->lock);
50                 }
51         }
52
53         ast_mutex_destroy(&sink->lock);
54         ast_cond_destroy(&sink->cond);
55
56         while (sink->num_messages > 0) {
57                 ao2_cleanup(sink->messages[--sink->num_messages]);
58         }
59         ast_free(sink->messages);
60         sink->messages = NULL;
61         sink->max_messages = 0;
62 }
63
64 static struct timespec make_deadline(int timeout_millis)
65 {
66         struct timeval start = ast_tvnow();
67         struct timeval delta = {
68                 .tv_sec = timeout_millis / 1000,
69                 .tv_usec = (timeout_millis % 1000) * 1000,
70         };
71         struct timeval deadline_tv = ast_tvadd(start, delta);
72         struct timespec deadline = {
73                 .tv_sec = deadline_tv.tv_sec,
74                 .tv_nsec = 1000 * deadline_tv.tv_usec,
75         };
76
77         return deadline;
78 }
79
80 struct stasis_message_sink *stasis_message_sink_create(void)
81 {
82         RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
83
84         sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
85         if (!sink) {
86                 return NULL;
87         }
88         ast_mutex_init(&sink->lock);
89         ast_cond_init(&sink->cond, NULL);
90         sink->max_messages = 4;
91         sink->messages =
92                 ast_malloc(sizeof(*sink->messages) * sink->max_messages);
93         if (!sink->messages) {
94                 return NULL;
95         }
96         ao2_ref(sink, +1);
97         return sink;
98 }
99
100 /*!
101  * \brief Implementation of the stasis_message_sink_cb() callback.
102  *
103  * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
104  * it has to do with how we load modules.
105  *
106  * Modules have their own metadata compiled into them in the module info block
107  * at the end of the file.  This includes dependency information in the
108  * \c nonoptreq field.
109  *
110  * Asterisk loads the module, inspects the field, then loads any needed
111  * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
112  * dlopen(), which defers binding function references until they are called.
113  *
114  * But when you take the address of a function, that function needs to be
115  * available at load time. So if some module used the address of
116  * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
117  * that module would fail to load.
118  *
119  * The stasis_message_sink_cb() function gives us a layer of indirection so that
120  * the initial lazy binding will still work as expected.
121  */
122 static void message_sink_cb(void *data, struct stasis_subscription *sub,
123         struct stasis_message *message)
124 {
125         struct stasis_message_sink *sink = data;
126
127         SCOPED_MUTEX(lock, &sink->lock);
128
129         if (stasis_subscription_final_message(sub, message)) {
130                 sink->is_done = 1;
131                 ast_cond_signal(&sink->cond);
132                 return;
133         }
134
135         if (stasis_subscription_change_type() == stasis_message_type(message)) {
136                 /* Ignore subscription changes */
137                 return;
138         }
139
140         if (sink->num_messages == sink->max_messages) {
141                 size_t new_max_messages = sink->max_messages * 2;
142                 struct stasis_message **new_messages = ast_realloc(
143                         sink->messages,
144                         sizeof(*new_messages) * new_max_messages);
145                 if (!new_messages) {
146                         return;
147                 }
148                 sink->max_messages = new_max_messages;
149                 sink->messages = new_messages;
150         }
151
152         ao2_ref(message, +1);
153         sink->messages[sink->num_messages++] = message;
154         ast_cond_signal(&sink->cond);
155 }
156
157 stasis_subscription_cb stasis_message_sink_cb(void)
158 {
159         return message_sink_cb;
160 }
161
162
163 int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
164         int num_messages, int timeout_millis)
165 {
166         struct timespec deadline = make_deadline(timeout_millis);
167
168         SCOPED_MUTEX(lock, &sink->lock);
169         while (sink->num_messages < num_messages) {
170                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
171
172                 if (r == ETIMEDOUT) {
173                         break;
174                 }
175                 if (r != 0) {
176                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
177                                 strerror(r));
178                         break;
179                 }
180         }
181         return sink->num_messages;
182 }
183
184 int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
185         int num_messages, int timeout_millis)
186 {
187         struct timespec deadline = make_deadline(timeout_millis);
188
189         SCOPED_MUTEX(lock, &sink->lock);
190         while (sink->num_messages == num_messages) {
191                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
192
193                 if (r == ETIMEDOUT) {
194                         break;
195                 }
196                 if (r != 0) {
197                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
198                                 strerror(r));
199                         break;
200                 }
201         }
202         return sink->num_messages;
203 }
204
205 int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
206         stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
207 {
208         struct timespec deadline = make_deadline(timeout_millis);
209
210         SCOPED_MUTEX(lock, &sink->lock);
211
212         /* wait for the start */
213         while (sink->num_messages < start + 1) {
214                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
215
216                 if (r == ETIMEDOUT) {
217                         /* Timed out waiting for the start */
218                         return -1;
219                 }
220                 if (r != 0) {
221                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
222                                 strerror(r));
223                         return -2;
224                 }
225         }
226
227
228         while (!cmp_cb(sink->messages[start], data)) {
229                 ++start;
230
231                 while (sink->num_messages < start + 1) {
232                         int r = ast_cond_timedwait(&sink->cond,
233                                 &sink->lock, &deadline);
234
235                         if (r == ETIMEDOUT) {
236                                 return -1;
237                         }
238                         if (r != 0) {
239                                 ast_log(LOG_ERROR,
240                                         "Unexpected condition error: %s\n",
241                                         strerror(r));
242                                 return -2;
243                         }
244                 }
245         }
246
247         return start;
248 }
249
250 struct stasis_message *stasis_test_message_create(void)
251 {
252         RAII_VAR(void *, data, NULL, ao2_cleanup);
253
254         if (!stasis_test_message_type()) {
255                 return NULL;
256         }
257
258         /* We just need the unique pointer; don't care what's in it */
259         data = ao2_alloc(1, NULL);
260         if (!data) {
261                 return NULL;
262         }
263
264         return stasis_message_create(stasis_test_message_type(), data);
265 }
266
267 static int unload_module(void)
268 {
269         STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
270         return 0;
271 }
272
273 static int load_module(void)
274 {
275         if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
276                 return AST_MODULE_LOAD_FAILURE;
277         }
278
279         return AST_MODULE_LOAD_SUCCESS;
280 }
281
282 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
283         .support_level = AST_MODULE_SUPPORT_CORE,
284         .load = load_module,
285         .unload = unload_module,
286         .load_pri = AST_MODPRI_APP_DEPEND,
287         );