Add support for ICE/STUN/TURN in res_rtp_asterisk and chan_sip.
[asterisk/asterisk.git] / res / pjproject / pjlib / src / pjlib-test / ioq_perf.c
1 /* $Id$ */
2 /* 
3  * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19  */
20 #include "test.h"
21 #include <pjlib.h>
22 #include <pj/compat/high_precision.h>
23
24 /**
25  * \page page_pjlib_ioqueue_perf_test Test: I/O Queue Performance
26  *
27  * Test the performance of the I/O queue, using typical producer
28  * consumer test. The test should examine the effect of using multiple
29  * threads on the performance.
30  *
31  * This file is <b>pjlib-test/ioq_perf.c</b>
32  *
33  * \include pjlib-test/ioq_perf.c
34  */
35
36 #if INCLUDE_IOQUEUE_PERF_TEST
37
38 #ifdef _MSC_VER
39 #   pragma warning ( disable: 4204)     // non-constant aggregate initializer
40 #endif
41
42 #define THIS_FILE       "ioq_perf"
43 //#define TRACE_(expr)  PJ_LOG(3,expr)
44 #define TRACE_(expr)
45
46
47 static pj_bool_t thread_quit_flag;
48 static pj_status_t last_error;
49 static unsigned last_error_counter;
50
51 /* Descriptor for each producer/consumer pair. */
52 typedef struct test_item
53 {
54     pj_sock_t            server_fd, 
55                          client_fd;
56     pj_ioqueue_t        *ioqueue;
57     pj_ioqueue_key_t    *server_key,
58                         *client_key;
59     pj_ioqueue_op_key_t  recv_op,
60                          send_op;
61     int                  has_pending_send;
62     pj_size_t            buffer_size;
63     char                *outgoing_buffer;
64     char                *incoming_buffer;
65     pj_size_t            bytes_sent, 
66                          bytes_recv;
67 } test_item;
68
69 /* Callback when data has been read.
70  * Increment item->bytes_recv and ready to read the next data.
71  */
72 static void on_read_complete(pj_ioqueue_key_t *key, 
73                              pj_ioqueue_op_key_t *op_key,
74                              pj_ssize_t bytes_read)
75 {
76     test_item *item = (test_item*)pj_ioqueue_get_user_data(key);
77     pj_status_t rc;
78     int data_is_available = 1;
79
80     //TRACE_((THIS_FILE, "     read complete, bytes_read=%d", bytes_read));
81
82     do {
83         if (thread_quit_flag)
84             return;
85
86         if (bytes_read < 0) {
87             pj_status_t rc = -bytes_read;
88             char errmsg[PJ_ERR_MSG_SIZE];
89
90             if (rc != last_error) {
91                 //last_error = rc;
92                 pj_strerror(rc, errmsg, sizeof(errmsg));
93                 PJ_LOG(3,(THIS_FILE,"...error: read error, bytes_read=%d (%s)", 
94                           bytes_read, errmsg));
95                 PJ_LOG(3,(THIS_FILE, 
96                           ".....additional info: total read=%u, total sent=%u",
97                           item->bytes_recv, item->bytes_sent));
98             } else {
99                 last_error_counter++;
100             }
101             bytes_read = 0;
102
103         } else if (bytes_read == 0) {
104             PJ_LOG(3,(THIS_FILE, "...socket has closed!"));
105         }
106
107         item->bytes_recv += bytes_read;
108     
109         /* To assure that the test quits, even if main thread
110          * doesn't have time to run.
111          */
112         if (item->bytes_recv > item->buffer_size * 10000) 
113             thread_quit_flag = 1;
114
115         bytes_read = item->buffer_size;
116         rc = pj_ioqueue_recv( key, op_key,
117                               item->incoming_buffer, &bytes_read, 0 );
118
119         if (rc == PJ_SUCCESS) {
120             data_is_available = 1;
121         } else if (rc == PJ_EPENDING) {
122             data_is_available = 0;
123         } else {
124             data_is_available = 0;
125             if (rc != last_error) {
126                 last_error = rc;
127                 app_perror("...error: read error(1)", rc);
128             } else {
129                 last_error_counter++;
130             }
131         }
132
133         if (!item->has_pending_send) {
134             pj_ssize_t sent = item->buffer_size;
135             rc = pj_ioqueue_send(item->client_key, &item->send_op,
136                                  item->outgoing_buffer, &sent, 0);
137             if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
138                 app_perror("...error: write error", rc);
139             }
140
141             item->has_pending_send = (rc==PJ_EPENDING);
142         }
143
144     } while (data_is_available);
145 }
146
147 /* Callback when data has been written.
148  * Increment item->bytes_sent and write the next data.
149  */
150 static void on_write_complete(pj_ioqueue_key_t *key, 
151                               pj_ioqueue_op_key_t *op_key,
152                               pj_ssize_t bytes_sent)
153 {
154     test_item *item = (test_item*) pj_ioqueue_get_user_data(key);
155     
156     //TRACE_((THIS_FILE, "     write complete: sent = %d", bytes_sent));
157
158     if (thread_quit_flag)
159         return;
160
161     item->has_pending_send = 0;
162     item->bytes_sent += bytes_sent;
163
164     if (bytes_sent <= 0) {
165         PJ_LOG(3,(THIS_FILE, "...error: sending stopped. bytes_sent=%d", 
166                   bytes_sent));
167     } 
168     else {
169         pj_status_t rc;
170
171         bytes_sent = item->buffer_size;
172         rc = pj_ioqueue_send( item->client_key, op_key,
173                               item->outgoing_buffer, &bytes_sent, 0);
174         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
175             app_perror("...error: write error", rc);
176         }
177
178         item->has_pending_send = (rc==PJ_EPENDING);
179     }
180 }
181
182 struct thread_arg
183 {
184     int           id;
185     pj_ioqueue_t *ioqueue;
186     unsigned      counter;
187 };
188
189 /* The worker thread. */
190 static int worker_thread(void *p)
191 {
192     struct thread_arg *arg = (struct thread_arg*) p;
193     const pj_time_val timeout = {0, 100};
194     int rc;
195
196     while (!thread_quit_flag) {
197
198         ++arg->counter;
199         rc = pj_ioqueue_poll(arg->ioqueue, &timeout);
200         //TRACE_((THIS_FILE, "     thread: poll returned rc=%d", rc));
201         if (rc < 0) {
202             char errmsg[PJ_ERR_MSG_SIZE];
203             pj_strerror(-rc, errmsg, sizeof(errmsg));
204             PJ_LOG(3, (THIS_FILE, 
205                        "...error in pj_ioqueue_poll() in thread %d "
206                        "after %d loop: %s [pj_status_t=%d]", 
207                        arg->id, arg->counter, errmsg, -rc));
208             //return -1;
209         }
210     }
211     return 0;
212 }
213
214 /* Calculate the bandwidth for the specific test configuration.
215  * The test is simple:
216  *  - create sockpair_cnt number of producer-consumer socket pair.
217  *  - create thread_cnt number of worker threads.
218  *  - each producer will send buffer_size bytes data as fast and
219  *    as soon as it can.
220  *  - each consumer will read buffer_size bytes of data as fast 
221  *    as it could.
222  *  - measure the total bytes received by all consumers during a
223  *    period of time.
224  */
225 static int perform_test(pj_bool_t allow_concur,
226                         int sock_type, const char *type_name,
227                         unsigned thread_cnt, unsigned sockpair_cnt,
228                         pj_size_t buffer_size, 
229                         pj_size_t *p_bandwidth)
230 {
231     enum { MSEC_DURATION = 5000 };
232     pj_pool_t *pool;
233     test_item *items;
234     pj_thread_t **thread;
235     pj_ioqueue_t *ioqueue;
236     pj_status_t rc;
237     pj_ioqueue_callback ioqueue_callback;
238     pj_uint32_t total_elapsed_usec, total_received;
239     pj_highprec_t bandwidth;
240     pj_timestamp start, stop;
241     unsigned i;
242
243     TRACE_((THIS_FILE, "    starting test.."));
244
245     ioqueue_callback.on_read_complete = &on_read_complete;
246     ioqueue_callback.on_write_complete = &on_write_complete;
247
248     thread_quit_flag = 0;
249
250     pool = pj_pool_create(mem, NULL, 4096, 4096, NULL);
251     if (!pool)
252         return -10;
253
254     items = (test_item*) pj_pool_alloc(pool, sockpair_cnt*sizeof(test_item));
255     thread = (pj_thread_t**)
256              pj_pool_alloc(pool, thread_cnt*sizeof(pj_thread_t*));
257
258     TRACE_((THIS_FILE, "     creating ioqueue.."));
259     rc = pj_ioqueue_create(pool, sockpair_cnt*2, &ioqueue);
260     if (rc != PJ_SUCCESS) {
261         app_perror("...error: unable to create ioqueue", rc);
262         return -15;
263     }
264
265     rc = pj_ioqueue_set_default_concurrency(ioqueue, allow_concur);
266     if (rc != PJ_SUCCESS) {
267         app_perror("...error: pj_ioqueue_set_default_concurrency()", rc);
268         return -16;
269     }
270
271     /* Initialize each producer-consumer pair. */
272     for (i=0; i<sockpair_cnt; ++i) {
273         pj_ssize_t bytes;
274
275         items[i].ioqueue = ioqueue;
276         items[i].buffer_size = buffer_size;
277         items[i].outgoing_buffer = (char*) pj_pool_alloc(pool, buffer_size);
278         items[i].incoming_buffer = (char*) pj_pool_alloc(pool, buffer_size);
279         items[i].bytes_recv = items[i].bytes_sent = 0;
280
281         /* randomize outgoing buffer. */
282         pj_create_random_string(items[i].outgoing_buffer, buffer_size);
283
284         /* Create socket pair. */
285         TRACE_((THIS_FILE, "      calling socketpair.."));
286         rc = app_socketpair(pj_AF_INET(), sock_type, 0, 
287                             &items[i].server_fd, &items[i].client_fd);
288         if (rc != PJ_SUCCESS) {
289             app_perror("...error: unable to create socket pair", rc);
290             return -20;
291         }
292
293         /* Register server socket to ioqueue. */
294         TRACE_((THIS_FILE, "      register(1).."));
295         rc = pj_ioqueue_register_sock(pool, ioqueue, 
296                                       items[i].server_fd,
297                                       &items[i], &ioqueue_callback,
298                                       &items[i].server_key);
299         if (rc != PJ_SUCCESS) {
300             app_perror("...error: registering server socket to ioqueue", rc);
301             return -60;
302         }
303
304         /* Register client socket to ioqueue. */
305         TRACE_((THIS_FILE, "      register(2).."));
306         rc = pj_ioqueue_register_sock(pool, ioqueue, 
307                                       items[i].client_fd,
308                                       &items[i],  &ioqueue_callback,
309                                       &items[i].client_key);
310         if (rc != PJ_SUCCESS) {
311             app_perror("...error: registering server socket to ioqueue", rc);
312             return -70;
313         }
314
315         /* Start reading. */
316         TRACE_((THIS_FILE, "      pj_ioqueue_recv.."));
317         bytes = items[i].buffer_size;
318         rc = pj_ioqueue_recv(items[i].server_key, &items[i].recv_op,
319                              items[i].incoming_buffer, &bytes,
320                              0);
321         if (rc != PJ_EPENDING) {
322             app_perror("...error: pj_ioqueue_recv", rc);
323             return -73;
324         }
325
326         /* Start writing. */
327         TRACE_((THIS_FILE, "      pj_ioqueue_write.."));
328         bytes = items[i].buffer_size;
329         rc = pj_ioqueue_send(items[i].client_key, &items[i].send_op,
330                              items[i].outgoing_buffer, &bytes, 0);
331         if (rc != PJ_SUCCESS && rc != PJ_EPENDING) {
332             app_perror("...error: pj_ioqueue_write", rc);
333             return -76;
334         }
335
336         items[i].has_pending_send = (rc==PJ_EPENDING);
337     }
338
339     /* Create the threads. */
340     for (i=0; i<thread_cnt; ++i) {
341         struct thread_arg *arg;
342
343         arg = (struct thread_arg*) pj_pool_zalloc(pool, sizeof(*arg));
344         arg->id = i;
345         arg->ioqueue = ioqueue;
346         arg->counter = 0;
347
348         rc = pj_thread_create( pool, NULL, 
349                                &worker_thread, 
350                                arg, 
351                                PJ_THREAD_DEFAULT_STACK_SIZE, 
352                                PJ_THREAD_SUSPENDED, &thread[i] );
353         if (rc != PJ_SUCCESS) {
354             app_perror("...error: unable to create thread", rc);
355             return -80;
356         }
357     }
358
359     /* Mark start time. */
360     rc = pj_get_timestamp(&start);
361     if (rc != PJ_SUCCESS)
362         return -90;
363
364     /* Start the thread. */
365     TRACE_((THIS_FILE, "     resuming all threads.."));
366     for (i=0; i<thread_cnt; ++i) {
367         rc = pj_thread_resume(thread[i]);
368         if (rc != 0)
369             return -100;
370     }
371
372     /* Wait for MSEC_DURATION seconds. 
373      * This should be as simple as pj_thread_sleep(MSEC_DURATION) actually,
374      * but unfortunately it doesn't work when system doesn't employ
375      * timeslicing for threads.
376      */
377     TRACE_((THIS_FILE, "     wait for few seconds.."));
378     do {
379         pj_thread_sleep(1);
380
381         /* Mark end time. */
382         rc = pj_get_timestamp(&stop);
383
384         if (thread_quit_flag) {
385             TRACE_((THIS_FILE, "      transfer limit reached.."));
386             break;
387         }
388
389         if (pj_elapsed_usec(&start,&stop)<MSEC_DURATION * 1000) {
390             TRACE_((THIS_FILE, "      time limit reached.."));
391             break;
392         }
393
394     } while (1);
395
396     /* Terminate all threads. */
397     TRACE_((THIS_FILE, "     terminating all threads.."));
398     thread_quit_flag = 1;
399
400     for (i=0; i<thread_cnt; ++i) {
401         TRACE_((THIS_FILE, "      join thread %d..", i));
402         pj_thread_join(thread[i]);
403     }
404
405     /* Close all sockets. */
406     TRACE_((THIS_FILE, "     closing all sockets.."));
407     for (i=0; i<sockpair_cnt; ++i) {
408         pj_ioqueue_unregister(items[i].server_key);
409         pj_ioqueue_unregister(items[i].client_key);
410     }
411
412     /* Destroy threads */
413     for (i=0; i<thread_cnt; ++i) {
414         pj_thread_destroy(thread[i]);
415     }
416
417     /* Destroy ioqueue. */
418     TRACE_((THIS_FILE, "     destroying ioqueue.."));
419     pj_ioqueue_destroy(ioqueue);
420
421     /* Calculate actual time in usec. */
422     total_elapsed_usec = pj_elapsed_usec(&start, &stop);
423
424     /* Calculate total bytes received. */
425     total_received = 0;
426     for (i=0; i<sockpair_cnt; ++i) {
427         total_received = items[i].bytes_recv;
428     }
429
430     /* bandwidth = total_received*1000/total_elapsed_usec */
431     bandwidth = total_received;
432     pj_highprec_mul(bandwidth, 1000);
433     pj_highprec_div(bandwidth, total_elapsed_usec);
434     
435     *p_bandwidth = (pj_uint32_t)bandwidth;
436
437     PJ_LOG(3,(THIS_FILE, "   %.4s    %2d        %2d       %8d KB/s",
438               type_name, thread_cnt, sockpair_cnt,
439               *p_bandwidth));
440
441     /* Done. */
442     pj_pool_release(pool);
443
444     TRACE_((THIS_FILE, "    done.."));
445     return 0;
446 }
447
448 static int ioqueue_perf_test_imp(pj_bool_t allow_concur)
449 {
450     enum { BUF_SIZE = 512 };
451     int i, rc;
452     struct {
453         int         type;
454         const char *type_name;
455         int         thread_cnt;
456         int         sockpair_cnt;
457     } test_param[] = 
458     {
459         { pj_SOCK_DGRAM(), "udp", 1, 1},
460         { pj_SOCK_DGRAM(), "udp", 1, 2},
461         { pj_SOCK_DGRAM(), "udp", 1, 4},
462         { pj_SOCK_DGRAM(), "udp", 1, 8},
463         { pj_SOCK_DGRAM(), "udp", 2, 1},
464         { pj_SOCK_DGRAM(), "udp", 2, 2},
465         { pj_SOCK_DGRAM(), "udp", 2, 4},
466         { pj_SOCK_DGRAM(), "udp", 2, 8},
467         { pj_SOCK_DGRAM(), "udp", 4, 1},
468         { pj_SOCK_DGRAM(), "udp", 4, 2},
469         { pj_SOCK_DGRAM(), "udp", 4, 4},
470         { pj_SOCK_DGRAM(), "udp", 4, 8},
471         { pj_SOCK_DGRAM(), "udp", 4, 16},
472         { pj_SOCK_STREAM(), "tcp", 1, 1},
473         { pj_SOCK_STREAM(), "tcp", 1, 2},
474         { pj_SOCK_STREAM(), "tcp", 1, 4},
475         { pj_SOCK_STREAM(), "tcp", 1, 8},
476         { pj_SOCK_STREAM(), "tcp", 2, 1},
477         { pj_SOCK_STREAM(), "tcp", 2, 2},
478         { pj_SOCK_STREAM(), "tcp", 2, 4},
479         { pj_SOCK_STREAM(), "tcp", 2, 8},
480         { pj_SOCK_STREAM(), "tcp", 4, 1},
481         { pj_SOCK_STREAM(), "tcp", 4, 2},
482         { pj_SOCK_STREAM(), "tcp", 4, 4},
483         { pj_SOCK_STREAM(), "tcp", 4, 8},
484         { pj_SOCK_STREAM(), "tcp", 4, 16},
485 /*
486         { pj_SOCK_DGRAM(), "udp", 32, 1},
487         { pj_SOCK_DGRAM(), "udp", 32, 1},
488         { pj_SOCK_DGRAM(), "udp", 32, 1},
489         { pj_SOCK_DGRAM(), "udp", 32, 1},
490         { pj_SOCK_DGRAM(), "udp", 1, 32},
491         { pj_SOCK_DGRAM(), "udp", 1, 32},
492         { pj_SOCK_DGRAM(), "udp", 1, 32},
493         { pj_SOCK_DGRAM(), "udp", 1, 32},
494         { pj_SOCK_STREAM(), "tcp", 32, 1},
495         { pj_SOCK_STREAM(), "tcp", 32, 1},
496         { pj_SOCK_STREAM(), "tcp", 32, 1},
497         { pj_SOCK_STREAM(), "tcp", 32, 1},
498         { pj_SOCK_STREAM(), "tcp", 1, 32},
499         { pj_SOCK_STREAM(), "tcp", 1, 32},
500         { pj_SOCK_STREAM(), "tcp", 1, 32},
501         { pj_SOCK_STREAM(), "tcp", 1, 32},
502 */
503     };
504     pj_size_t best_bandwidth;
505     int best_index = 0;
506
507     PJ_LOG(3,(THIS_FILE, "   Benchmarking %s ioqueue:", pj_ioqueue_name()));
508     PJ_LOG(3,(THIS_FILE, "   Testing with concurency=%d", allow_concur));
509     PJ_LOG(3,(THIS_FILE, "   ======================================="));
510     PJ_LOG(3,(THIS_FILE, "   Type  Threads  Skt.Pairs      Bandwidth"));
511     PJ_LOG(3,(THIS_FILE, "   ======================================="));
512
513     best_bandwidth = 0;
514     for (i=0; i<(int)(sizeof(test_param)/sizeof(test_param[0])); ++i) {
515         pj_size_t bandwidth;
516
517         rc = perform_test(allow_concur,
518                           test_param[i].type, 
519                           test_param[i].type_name,
520                           test_param[i].thread_cnt, 
521                           test_param[i].sockpair_cnt, 
522                           BUF_SIZE, 
523                           &bandwidth);
524         if (rc != 0)
525             return rc;
526
527         if (bandwidth > best_bandwidth)
528             best_bandwidth = bandwidth, best_index = i;
529
530         /* Give it a rest before next test, to allow system to close the
531          * sockets properly. 
532          */
533         pj_thread_sleep(500);
534     }
535
536     PJ_LOG(3,(THIS_FILE, 
537               "   Best: Type=%s Threads=%d, Skt.Pairs=%d, Bandwidth=%u KB/s",
538               test_param[best_index].type_name,
539               test_param[best_index].thread_cnt,
540               test_param[best_index].sockpair_cnt,
541               best_bandwidth));
542     PJ_LOG(3,(THIS_FILE, "   (Note: packet size=%d, total errors=%u)", 
543                          BUF_SIZE, last_error_counter));
544     return 0;
545 }
546
547 /*
548  * main test entry.
549  */
550 int ioqueue_perf_test(void)
551 {
552     int rc;
553
554     rc = ioqueue_perf_test_imp(PJ_TRUE);
555     if (rc != 0)
556         return rc;
557
558     rc = ioqueue_perf_test_imp(PJ_FALSE);
559     if (rc != 0)
560         return rc;
561
562     return 0;
563 }
564
565 #else
566 /* To prevent warning about "translation unit is empty"
567  * when this test is disabled. 
568  */
569 int dummy_uiq_perf_test;
570 #endif  /* INCLUDE_IOQUEUE_PERF_TEST */
571
572