Merge "res_pjsip: New endpoint option "refer_blind_progress""
[asterisk/asterisk.git] / res / res_stasis_test.c
1 /*
2  * Asterisk -- An open source telephony toolkit.
3  *
4  * Copyright (C) 2013, Digium, Inc.
5  *
6  * David M. Lee, II <dlee@digium.com>
7  *
8  * See http://www.asterisk.org for more information about
9  * the Asterisk project. Please do not directly contact
10  * any of the maintainers of this project for assistance;
11  * the project provides a web site, mailing lists and IRC
12  * channels for your use.
13  *
14  * This program is free software, distributed under the terms of
15  * the GNU General Public License Version 2. See the LICENSE file
16  * at the top of the source tree.
17  */
18
19 /*!
20  * \file
21  * \brief Test infrastructure for dealing with Stasis.
22  *
23  * \author David M. Lee, II <dlee@digium.com>
24  */
25
26 /*** MODULEINFO
27         <depend>TEST_FRAMEWORK</depend>
28         <support_level>core</support_level>
29  ***/
30
31 #include "asterisk.h"
32
33 #include "asterisk/astobj2.h"
34 #include "asterisk/module.h"
35 #include "asterisk/stasis_test.h"
36
37 STASIS_MESSAGE_TYPE_DEFN(stasis_test_message_type);
38
39 static void stasis_message_sink_dtor(void *obj)
40 {
41         struct stasis_message_sink *sink = obj;
42
43         {
44                 SCOPED_MUTEX(lock, &sink->lock);
45                 while (!sink->is_done) {
46                         /* Normally waiting forever is bad, but if we're not
47                          * done, we're not done. */
48                         ast_cond_wait(&sink->cond, &sink->lock);
49                 }
50         }
51
52         ast_mutex_destroy(&sink->lock);
53         ast_cond_destroy(&sink->cond);
54
55         while (sink->num_messages > 0) {
56                 ao2_cleanup(sink->messages[--sink->num_messages]);
57         }
58         ast_free(sink->messages);
59         sink->messages = NULL;
60         sink->max_messages = 0;
61 }
62
63 static struct timespec make_deadline(int timeout_millis)
64 {
65         struct timeval start = ast_tvnow();
66         struct timeval delta = {
67                 .tv_sec = timeout_millis / 1000,
68                 .tv_usec = (timeout_millis % 1000) * 1000,
69         };
70         struct timeval deadline_tv = ast_tvadd(start, delta);
71         struct timespec deadline = {
72                 .tv_sec = deadline_tv.tv_sec,
73                 .tv_nsec = 1000 * deadline_tv.tv_usec,
74         };
75
76         return deadline;
77 }
78
79 struct stasis_message_sink *stasis_message_sink_create(void)
80 {
81         RAII_VAR(struct stasis_message_sink *, sink, NULL, ao2_cleanup);
82
83         sink = ao2_alloc(sizeof(*sink), stasis_message_sink_dtor);
84         if (!sink) {
85                 return NULL;
86         }
87         ast_mutex_init(&sink->lock);
88         ast_cond_init(&sink->cond, NULL);
89         sink->max_messages = 4;
90         sink->messages =
91                 ast_malloc(sizeof(*sink->messages) * sink->max_messages);
92         if (!sink->messages) {
93                 return NULL;
94         }
95         ao2_ref(sink, +1);
96         return sink;
97 }
98
99 /*!
100  * \brief Implementation of the stasis_message_sink_cb() callback.
101  *
102  * Why the roundabout way of exposing this via stasis_message_sink_cb()? Well,
103  * it has to do with how we load modules.
104  *
105  * Modules have their own metadata compiled into them in the module info block
106  * at the end of the file.  This includes dependency information in the
107  * \c nonoptreq field.
108  *
109  * Asterisk loads the module, inspects the field, then loads any needed
110  * dependencies. This works because Asterisk passes \c RTLD_LAZY to the initial
111  * dlopen(), which defers binding function references until they are called.
112  *
113  * But when you take the address of a function, that function needs to be
114  * available at load time. So if some module used the address of
115  * message_sink_cb() directly, and \c res_stasis_test.so wasn't loaded yet, then
116  * that module would fail to load.
117  *
118  * The stasis_message_sink_cb() function gives us a layer of indirection so that
119  * the initial lazy binding will still work as expected.
120  */
121 static void message_sink_cb(void *data, struct stasis_subscription *sub,
122         struct stasis_message *message)
123 {
124         struct stasis_message_sink *sink = data;
125
126         SCOPED_MUTEX(lock, &sink->lock);
127
128         if (stasis_subscription_final_message(sub, message)) {
129                 sink->is_done = 1;
130                 ast_cond_signal(&sink->cond);
131                 return;
132         }
133
134         if (stasis_subscription_change_type() == stasis_message_type(message)) {
135                 /* Ignore subscription changes */
136                 return;
137         }
138
139         if (sink->num_messages == sink->max_messages) {
140                 size_t new_max_messages = sink->max_messages * 2;
141                 struct stasis_message **new_messages = ast_realloc(
142                         sink->messages,
143                         sizeof(*new_messages) * new_max_messages);
144                 if (!new_messages) {
145                         return;
146                 }
147                 sink->max_messages = new_max_messages;
148                 sink->messages = new_messages;
149         }
150
151         ao2_ref(message, +1);
152         sink->messages[sink->num_messages++] = message;
153         ast_cond_signal(&sink->cond);
154 }
155
156 stasis_subscription_cb stasis_message_sink_cb(void)
157 {
158         return message_sink_cb;
159 }
160
161
162 int stasis_message_sink_wait_for_count(struct stasis_message_sink *sink,
163         int num_messages, int timeout_millis)
164 {
165         struct timespec deadline = make_deadline(timeout_millis);
166
167         SCOPED_MUTEX(lock, &sink->lock);
168         while (sink->num_messages < num_messages) {
169                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
170
171                 if (r == ETIMEDOUT) {
172                         break;
173                 }
174                 if (r != 0) {
175                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
176                                 strerror(r));
177                         break;
178                 }
179         }
180         return sink->num_messages;
181 }
182
183 int stasis_message_sink_should_stay(struct stasis_message_sink *sink,
184         int num_messages, int timeout_millis)
185 {
186         struct timespec deadline = make_deadline(timeout_millis);
187
188         SCOPED_MUTEX(lock, &sink->lock);
189         while (sink->num_messages == num_messages) {
190                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
191
192                 if (r == ETIMEDOUT) {
193                         break;
194                 }
195                 if (r != 0) {
196                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
197                                 strerror(r));
198                         break;
199                 }
200         }
201         return sink->num_messages;
202 }
203
204 int stasis_message_sink_wait_for(struct stasis_message_sink *sink, int start,
205         stasis_wait_cb cmp_cb, const void *data, int timeout_millis)
206 {
207         struct timespec deadline = make_deadline(timeout_millis);
208
209         SCOPED_MUTEX(lock, &sink->lock);
210
211         /* wait for the start */
212         while (sink->num_messages < start + 1) {
213                 int r = ast_cond_timedwait(&sink->cond, &sink->lock, &deadline);
214
215                 if (r == ETIMEDOUT) {
216                         /* Timed out waiting for the start */
217                         return -1;
218                 }
219                 if (r != 0) {
220                         ast_log(LOG_ERROR, "Unexpected condition error: %s\n",
221                                 strerror(r));
222                         return -2;
223                 }
224         }
225
226
227         while (!cmp_cb(sink->messages[start], data)) {
228                 ++start;
229
230                 while (sink->num_messages < start + 1) {
231                         int r = ast_cond_timedwait(&sink->cond,
232                                 &sink->lock, &deadline);
233
234                         if (r == ETIMEDOUT) {
235                                 return -1;
236                         }
237                         if (r != 0) {
238                                 ast_log(LOG_ERROR,
239                                         "Unexpected condition error: %s\n",
240                                         strerror(r));
241                                 return -2;
242                         }
243                 }
244         }
245
246         return start;
247 }
248
249 struct stasis_message *stasis_test_message_create(void)
250 {
251         RAII_VAR(void *, data, NULL, ao2_cleanup);
252
253         if (!stasis_test_message_type()) {
254                 return NULL;
255         }
256
257         /* We just need the unique pointer; don't care what's in it */
258         data = ao2_alloc(1, NULL);
259         if (!data) {
260                 return NULL;
261         }
262
263         return stasis_message_create(stasis_test_message_type(), data);
264 }
265
266 static int unload_module(void)
267 {
268         STASIS_MESSAGE_TYPE_CLEANUP(stasis_test_message_type);
269         return 0;
270 }
271
272 static int load_module(void)
273 {
274         if (STASIS_MESSAGE_TYPE_INIT(stasis_test_message_type) != 0) {
275                 return AST_MODULE_LOAD_DECLINE;
276         }
277
278         return AST_MODULE_LOAD_SUCCESS;
279 }
280
281 AST_MODULE_INFO(ASTERISK_GPL_KEY, AST_MODFLAG_GLOBAL_SYMBOLS | AST_MODFLAG_LOAD_ORDER, "Stasis test utilities",
282         .support_level = AST_MODULE_SUPPORT_CORE,
283         .load = load_module,
284         .unload = unload_module,
285         .load_pri = AST_MODPRI_APP_DEPEND,
286 );