doxygen: Fix doxygen errors
[asterisk/asterisk.git] / res / res_stasis_test.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Test infrastructure for dealing with Stasis.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <depend>TEST_FRAMEWORK</depend>
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 ASTERISK_REGISTER_FILE();
34
35 #include "asterisk/astobj2.h"
36 #include "asterisk/module.h"
37 #include "asterisk/stasis_test.h"
38
39 STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
40
41 static void stasis_message_sink_dtor(void *obj)
42 {
43         struct stasis_message_sink *sink = obj;
44
45         {
46                 SCOPED_MUTEX(lock, &sink->lock);
47                 while (!sink->is_done) {
48                         /* Normally waiting forever is bad, but if we're not
49                          * done, we're not done. */
50                         ast_cond_wait(&sink->cond, &sink->lock);
51                 }
52         }
53
54         ast_mutex_destroy(&sink->lock);
55         ast_cond_destroy(&sink->cond);
56
57         while (sink->num_messages > 0) {
58                 ao2_cleanup(sink->messages[--sink->num_messages]);
59         }
60         ast_free(sink->messages);
61         sink->messages = NULL;
62         sink->max_messages = 0;
63 }
64
65 static struct timespec make_deadline(int timeout_millis)
66 {
67         struct timeval start = ast_tvnow();
68         struct timeval delta = {
69                 .tv_sec = timeout_millis / 1000,
70                 .tv_usec = (timeout_millis % 1000) * 1000,
71         };
72         struct timeval deadline_tv = ast_tvadd(start, delta);
73         struct timespec deadline = {
74                 .tv_sec = deadline_tv.tv_sec,
75                 .tv_nsec = 1000 * deadline_tv.tv_usec,
76         };
77
78         return deadline;
79 }
80
81 struct stasis_message_sink *stasis_message_sink_create(void)
82 {
83         RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
84
85         sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
86         if (!sink) {
87                 return NULL;
88         }
89         ast_mutex_init(&sink->lock);
90         ast_cond_init(&sink->cond, NULL);
91         sink->max_messages = 4;
92         sink->messages =
93                 ast_malloc(sizeof(*sink->messages) * sink->max_messages);
94         if (!sink->messages) {
95                 return NULL;
96         }
97         ao2_ref(sink, +1);
98         return sink;
99 }
100
101 /*!
102  * \brief Implementation of the stasis_message_sink_cb() callback.
103  *
104  * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
105  * it has to do with how we load modules.
106  *
107  * Modules have their own metadata compiled into them in the module info block
108  * at the end of the file.  This includes dependency information in the
109  * \c nonoptreq field.
110  *
111  * Asterisk loads the module, inspects the field, then loads any needed
112  * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
113  * dlopen(), which defers binding function references until they are called.
114  *
115  * But when you take the address of a function, that function needs to be
116  * available at load time. So if some module used the address of
117  * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
118  * that module would fail to load.
119  *
120  * The stasis_message_sink_cb() function gives us a layer of indirection so that
121  * the initial lazy binding will still work as expected.
122  */
123 static void message_sink_cb(void *data, struct stasis_subscription *sub,
124         struct stasis_message *message)
125 {
126         struct stasis_message_sink *sink = data;
127
128         SCOPED_MUTEX(lock, &sink->lock);
129
130         if (stasis_subscription_final_message(sub, message)) {
131                 sink->is_done = 1;
132                 ast_cond_signal(&sink->cond);
133                 return;
134         }
135
136         if (stasis_subscription_change_type() == stasis_message_type(message)) {
137                 /* Ignore subscription changes */
138                 return;
139         }
140
141         if (sink->num_messages == sink->max_messages) {
142                 size_t new_max_messages = sink->max_messages * 2;
143                 struct stasis_message **new_messages = ast_realloc(
144                         sink->messages,
145                         sizeof(*new_messages) * new_max_messages);
146                 if (!new_messages) {
147                         return;
148                 }
149                 sink->max_messages = new_max_messages;
150                 sink->messages = new_messages;
151         }
152
153         ao2_ref(message, +1);
154         sink->messages[sink->num_messages++] = message;
155         ast_cond_signal(&sink->cond);
156 }
157
158 stasis_subscription_cb stasis_message_sink_cb(void)
159 {
160         return message_sink_cb;
161 }
162
163
164 int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
165         int num_messages, int timeout_millis)
166 {
167         struct timespec deadline = make_deadline(timeout_millis);
168
169         SCOPED_MUTEX(lock, &sink->lock);
170         while (sink->num_messages < num_messages) {
171                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
172
173                 if (r == ETIMEDOUT) {
174                         break;
175                 }
176                 if (r != 0) {
177                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
178                                 strerror(r));
179                         break;
180                 }
181         }
182         return sink->num_messages;
183 }
184
185 int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
186         int num_messages, int timeout_millis)
187 {
188         struct timespec deadline = make_deadline(timeout_millis);
189
190         SCOPED_MUTEX(lock, &sink->lock);
191         while (sink->num_messages == num_messages) {
192                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
193
194                 if (r == ETIMEDOUT) {
195                         break;
196                 }
197                 if (r != 0) {
198                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
199                                 strerror(r));
200                         break;
201                 }
202         }
203         return sink->num_messages;
204 }
205
206 int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
207         stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
208 {
209         struct timespec deadline = make_deadline(timeout_millis);
210
211         SCOPED_MUTEX(lock, &sink->lock);
212
213         /* wait for the start */
214         while (sink->num_messages < start + 1) {
215                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
216
217                 if (r == ETIMEDOUT) {
218                         /* Timed out waiting for the start */
219                         return -1;
220                 }
221                 if (r != 0) {
222                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
223                                 strerror(r));
224                         return -2;
225                 }
226         }
227
228
229         while (!cmp_cb(sink->messages[start], data)) {
230                 ++start;
231
232                 while (sink->num_messages < start + 1) {
233                         int r = ast_cond_timedwait(&sink->cond,
234                                 &sink->lock, &deadline);
235
236                         if (r == ETIMEDOUT) {
237                                 return -1;
238                         }
239                         if (r != 0) {
240                                 ast_log(LOG_ERROR,
241                                         "Unexpected condition error: %s\n",
242                                         strerror(r));
243                                 return -2;
244                         }
245                 }
246         }
247
248         return start;
249 }
250
251 struct stasis_message *stasis_test_message_create(void)
252 {
253         RAII_VAR(void *, data, NULL, ao2_cleanup);
254
255         if (!stasis_test_message_type()) {
256                 return NULL;
257         }
258
259         /* We just need the unique pointer; don't care what's in it */
260         data = ao2_alloc(1, NULL);
261         if (!data) {
262                 return NULL;
263         }
264
265         return stasis_message_create(stasis_test_message_type(), data);
266 }
267
268 static int unload_module(void)
269 {
270         STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
271         return 0;
272 }
273
274 static int load_module(void)
275 {
276         if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
277                 return AST_MODULE_LOAD_FAILURE;
278         }
279
280         return AST_MODULE_LOAD_SUCCESS;
281 }
282
283 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
284         .support_level = AST_MODULE_SUPPORT_CORE,
285         .load = load_module,
286         .unload = unload_module,
287         .load_pri = AST_MODPRI_APP_DEPEND,
288 );