Add support for ICE/STUN/TURN in res_rtp_asterisk and chan_sip.
[asterisk/asterisk.git] / res / pjproject / pjlib / src / pj / ioqueue_select.c
1 /* $Id$ */
2 /* 
3  * Copyright (C) 2008-2011 Teluu Inc. (http://www.teluu.com)
4  * Copyright (C) 2003-2008 Benny Prijono <benny@prijono.org>
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA 
19  */
20
21 /*
22  * sock_select.c
23  *
24  * This is the implementation of IOQueue using pj_sock_select().
25  * It runs anywhere where pj_sock_select() is available (currently
26  * Win32, Linux, Linux kernel, etc.).
27  */
28
29 #include <pj/ioqueue.h>
30 #include <pj/os.h>
31 #include <pj/lock.h>
32 #include <pj/log.h>
33 #include <pj/list.h>
34 #include <pj/pool.h>
35 #include <pj/string.h>
36 #include <pj/assert.h>
37 #include <pj/sock.h>
38 #include <pj/compat/socket.h>
39 #include <pj/sock_select.h>
40 #include <pj/sock_qos.h>
41 #include <pj/errno.h>
42
43 /* Now that we have access to OS'es <sys/select>, lets check again that
44  * PJ_IOQUEUE_MAX_HANDLES is not greater than FD_SETSIZE
45  */
46 #if PJ_IOQUEUE_MAX_HANDLES > FD_SETSIZE
47 #   error "PJ_IOQUEUE_MAX_HANDLES cannot be greater than FD_SETSIZE"
48 #endif
49
50
51 /*
52  * Include declaration from common abstraction.
53  */
54 #include "ioqueue_common_abs.h"
55
56 /*
57  * ISSUES with ioqueue_select()
58  *
59  * EAGAIN/EWOULDBLOCK error in recv():
60  *  - when multiple threads are working with the ioqueue, application
61  *    may receive EAGAIN or EWOULDBLOCK in the receive callback.
62  *    This error happens because more than one thread is watching for
63  *    the same descriptor set, so when all of them call recv() or recvfrom()
64  *    simultaneously, only one will succeed and the rest will get the error.
65  *
66  */
67 #define THIS_FILE   "ioq_select"
68
69 /*
70  * The select ioqueue relies on socket functions (pj_sock_xxx()) to return
71  * the correct error code.
72  */
73 #if PJ_RETURN_OS_ERROR(100) != PJ_STATUS_FROM_OS(100)
74 #   error "Error reporting must be enabled for this function to work!"
75 #endif
76
77 /*
78  * During debugging build, VALIDATE_FD_SET is set.
79  * This will check the validity of the fd_sets.
80  */
81 /*
82 #if defined(PJ_DEBUG) && PJ_DEBUG != 0
83 #  define VALIDATE_FD_SET               1
84 #else
85 #  define VALIDATE_FD_SET               0
86 #endif
87 */
88 #define VALIDATE_FD_SET     0
89
90 #if 0
91 #  define TRACE__(args) PJ_LOG(3,args)
92 #else
93 #  define TRACE__(args)
94 #endif
95
96 /*
97  * This describes each key.
98  */
99 struct pj_ioqueue_key_t
100 {
101     DECLARE_COMMON_KEY
102 };
103
104 /*
105  * This describes the I/O queue itself.
106  */
107 struct pj_ioqueue_t
108 {
109     DECLARE_COMMON_IOQUEUE
110
111     unsigned            max, count;     /* Max and current key count        */
112     int                 nfds;           /* The largest fd value (for select)*/
113     pj_ioqueue_key_t    active_list;    /* List of active keys.             */
114     pj_fd_set_t         rfdset;
115     pj_fd_set_t         wfdset;
116 #if PJ_HAS_TCP
117     pj_fd_set_t         xfdset;
118 #endif
119
120 #if PJ_IOQUEUE_HAS_SAFE_UNREG
121     pj_mutex_t         *ref_cnt_mutex;
122     pj_ioqueue_key_t    closing_list;
123     pj_ioqueue_key_t    free_list;
124 #endif
125 };
126
127 /* Proto */
128 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
129             PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
130 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h);
131 #endif
132
133 /* Include implementation for common abstraction after we declare
134  * pj_ioqueue_key_t and pj_ioqueue_t.
135  */
136 #include "ioqueue_common_abs.c"
137
138 #if PJ_IOQUEUE_HAS_SAFE_UNREG
139 /* Scan closing keys to be put to free list again */
140 static void scan_closing_keys(pj_ioqueue_t *ioqueue);
141 #endif
142
143 /*
144  * pj_ioqueue_name()
145  */
146 PJ_DEF(const char*) pj_ioqueue_name(void)
147 {
148     return "select";
149 }
150
151 /* 
152  * Scan the socket descriptor sets for the largest descriptor.
153  * This value is needed by select().
154  */
155 #if defined(PJ_SELECT_NEEDS_NFDS) && PJ_SELECT_NEEDS_NFDS!=0
156 static void rescan_fdset(pj_ioqueue_t *ioqueue)
157 {
158     pj_ioqueue_key_t *key = ioqueue->active_list.next;
159     int max = 0;
160
161     while (key != &ioqueue->active_list) {
162         if (key->fd > max)
163             max = key->fd;
164         key = key->next;
165     }
166
167     ioqueue->nfds = max;
168 }
169 #else
170 static void rescan_fdset(pj_ioqueue_t *ioqueue)
171 {
172     ioqueue->nfds = FD_SETSIZE-1;
173 }
174 #endif
175
176
177 /*
178  * pj_ioqueue_create()
179  *
180  * Create select ioqueue.
181  */
182 PJ_DEF(pj_status_t) pj_ioqueue_create( pj_pool_t *pool, 
183                                        pj_size_t max_fd,
184                                        pj_ioqueue_t **p_ioqueue)
185 {
186     pj_ioqueue_t *ioqueue;
187     pj_lock_t *lock;
188     unsigned i;
189     pj_status_t rc;
190
191     /* Check that arguments are valid. */
192     PJ_ASSERT_RETURN(pool != NULL && p_ioqueue != NULL && 
193                      max_fd > 0 && max_fd <= PJ_IOQUEUE_MAX_HANDLES, 
194                      PJ_EINVAL);
195
196     /* Check that size of pj_ioqueue_op_key_t is sufficient */
197     PJ_ASSERT_RETURN(sizeof(pj_ioqueue_op_key_t)-sizeof(void*) >=
198                      sizeof(union operation_key), PJ_EBUG);
199
200     /* Create and init common ioqueue stuffs */
201     ioqueue = PJ_POOL_ALLOC_T(pool, pj_ioqueue_t);
202     ioqueue_init(ioqueue);
203
204     ioqueue->max = max_fd;
205     ioqueue->count = 0;
206     PJ_FD_ZERO(&ioqueue->rfdset);
207     PJ_FD_ZERO(&ioqueue->wfdset);
208 #if PJ_HAS_TCP
209     PJ_FD_ZERO(&ioqueue->xfdset);
210 #endif
211     pj_list_init(&ioqueue->active_list);
212
213     rescan_fdset(ioqueue);
214
215 #if PJ_IOQUEUE_HAS_SAFE_UNREG
216     /* When safe unregistration is used (the default), we pre-create
217      * all keys and put them in the free list.
218      */
219
220     /* Mutex to protect key's reference counter 
221      * We don't want to use key's mutex or ioqueue's mutex because
222      * that would create deadlock situation in some cases.
223      */
224     rc = pj_mutex_create_simple(pool, NULL, &ioqueue->ref_cnt_mutex);
225     if (rc != PJ_SUCCESS)
226         return rc;
227
228
229     /* Init key list */
230     pj_list_init(&ioqueue->free_list);
231     pj_list_init(&ioqueue->closing_list);
232
233
234     /* Pre-create all keys according to max_fd */
235     for (i=0; i<max_fd; ++i) {
236         pj_ioqueue_key_t *key;
237
238         key = PJ_POOL_ALLOC_T(pool, pj_ioqueue_key_t);
239         key->ref_count = 0;
240         rc = pj_mutex_create_recursive(pool, NULL, &key->mutex);
241         if (rc != PJ_SUCCESS) {
242             key = ioqueue->free_list.next;
243             while (key != &ioqueue->free_list) {
244                 pj_mutex_destroy(key->mutex);
245                 key = key->next;
246             }
247             pj_mutex_destroy(ioqueue->ref_cnt_mutex);
248             return rc;
249         }
250
251         pj_list_push_back(&ioqueue->free_list, key);
252     }
253 #endif
254
255     /* Create and init ioqueue mutex */
256     rc = pj_lock_create_simple_mutex(pool, "ioq%p", &lock);
257     if (rc != PJ_SUCCESS)
258         return rc;
259
260     rc = pj_ioqueue_set_lock(ioqueue, lock, PJ_TRUE);
261     if (rc != PJ_SUCCESS)
262         return rc;
263
264     PJ_LOG(4, ("pjlib", "select() I/O Queue created (%p)", ioqueue));
265
266     *p_ioqueue = ioqueue;
267     return PJ_SUCCESS;
268 }
269
270 /*
271  * pj_ioqueue_destroy()
272  *
273  * Destroy ioqueue.
274  */
275 PJ_DEF(pj_status_t) pj_ioqueue_destroy(pj_ioqueue_t *ioqueue)
276 {
277     pj_ioqueue_key_t *key;
278
279     PJ_ASSERT_RETURN(ioqueue, PJ_EINVAL);
280
281     pj_lock_acquire(ioqueue->lock);
282
283 #if PJ_IOQUEUE_HAS_SAFE_UNREG
284     /* Destroy reference counters */
285     key = ioqueue->active_list.next;
286     while (key != &ioqueue->active_list) {
287         pj_mutex_destroy(key->mutex);
288         key = key->next;
289     }
290
291     key = ioqueue->closing_list.next;
292     while (key != &ioqueue->closing_list) {
293         pj_mutex_destroy(key->mutex);
294         key = key->next;
295     }
296
297     key = ioqueue->free_list.next;
298     while (key != &ioqueue->free_list) {
299         pj_mutex_destroy(key->mutex);
300         key = key->next;
301     }
302
303     pj_mutex_destroy(ioqueue->ref_cnt_mutex);
304 #endif
305
306     return ioqueue_destroy(ioqueue);
307 }
308
309
310 /*
311  * pj_ioqueue_register_sock()
312  *
313  * Register socket handle to ioqueue.
314  */
315 PJ_DEF(pj_status_t) pj_ioqueue_register_sock( pj_pool_t *pool,
316                                               pj_ioqueue_t *ioqueue,
317                                               pj_sock_t sock,
318                                               void *user_data,
319                                               const pj_ioqueue_callback *cb,
320                                               pj_ioqueue_key_t **p_key)
321 {
322     pj_ioqueue_key_t *key = NULL;
323 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
324     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
325     u_long value;
326 #else
327     pj_uint32_t value;
328 #endif
329     pj_status_t rc = PJ_SUCCESS;
330     
331     PJ_ASSERT_RETURN(pool && ioqueue && sock != PJ_INVALID_SOCKET &&
332                      cb && p_key, PJ_EINVAL);
333
334     pj_lock_acquire(ioqueue->lock);
335
336     if (ioqueue->count >= ioqueue->max) {
337         rc = PJ_ETOOMANY;
338         goto on_return;
339     }
340
341     /* If safe unregistration (PJ_IOQUEUE_HAS_SAFE_UNREG) is used, get
342      * the key from the free list. Otherwise allocate a new one. 
343      */
344 #if PJ_IOQUEUE_HAS_SAFE_UNREG
345
346     /* Scan closing_keys first to let them come back to free_list */
347     scan_closing_keys(ioqueue);
348
349     pj_assert(!pj_list_empty(&ioqueue->free_list));
350     if (pj_list_empty(&ioqueue->free_list)) {
351         rc = PJ_ETOOMANY;
352         goto on_return;
353     }
354
355     key = ioqueue->free_list.next;
356     pj_list_erase(key);
357 #else
358     key = (pj_ioqueue_key_t*)pj_pool_zalloc(pool, sizeof(pj_ioqueue_key_t));
359 #endif
360
361     rc = ioqueue_init_key(pool, ioqueue, key, sock, user_data, cb);
362     if (rc != PJ_SUCCESS) {
363         key = NULL;
364         goto on_return;
365     }
366
367     /* Set socket to nonblocking. */
368     value = 1;
369 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
370     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
371     if (ioctlsocket(sock, FIONBIO, &value)) {
372 #else
373     if (ioctl(sock, FIONBIO, &value)) {
374 #endif
375         rc = pj_get_netos_error();
376         goto on_return;
377     }
378
379
380     /* Put in active list. */
381     pj_list_insert_before(&ioqueue->active_list, key);
382     ++ioqueue->count;
383
384     /* Rescan fdset to get max descriptor */
385     rescan_fdset(ioqueue);
386
387 on_return:
388     /* On error, socket may be left in non-blocking mode. */
389     *p_key = key;
390     pj_lock_release(ioqueue->lock);
391     
392     return rc;
393 }
394
395 #if PJ_IOQUEUE_HAS_SAFE_UNREG
396 /* Increment key's reference counter */
397 static void increment_counter(pj_ioqueue_key_t *key)
398 {
399     pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
400     ++key->ref_count;
401     pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
402 }
403
404 /* Decrement the key's reference counter, and when the counter reach zero,
405  * destroy the key.
406  *
407  * Note: MUST NOT CALL THIS FUNCTION WHILE HOLDING ioqueue's LOCK.
408  */
409 static void decrement_counter(pj_ioqueue_key_t *key)
410 {
411     pj_lock_acquire(key->ioqueue->lock);
412     pj_mutex_lock(key->ioqueue->ref_cnt_mutex);
413     --key->ref_count;
414     if (key->ref_count == 0) {
415
416         pj_assert(key->closing == 1);
417         pj_gettickcount(&key->free_time);
418         key->free_time.msec += PJ_IOQUEUE_KEY_FREE_DELAY;
419         pj_time_val_normalize(&key->free_time);
420
421         pj_list_erase(key);
422         pj_list_push_back(&key->ioqueue->closing_list, key);
423         /* Rescan fdset to get max descriptor */
424         rescan_fdset(key->ioqueue);
425     }
426     pj_mutex_unlock(key->ioqueue->ref_cnt_mutex);
427     pj_lock_release(key->ioqueue->lock);
428 }
429 #endif
430
431
432 /*
433  * pj_ioqueue_unregister()
434  *
435  * Unregister handle from ioqueue.
436  */
437 PJ_DEF(pj_status_t) pj_ioqueue_unregister( pj_ioqueue_key_t *key)
438 {
439     pj_ioqueue_t *ioqueue;
440
441     PJ_ASSERT_RETURN(key, PJ_EINVAL);
442
443     ioqueue = key->ioqueue;
444
445     /* Lock the key to make sure no callback is simultaneously modifying
446      * the key. We need to lock the key before ioqueue here to prevent
447      * deadlock.
448      */
449     pj_mutex_lock(key->mutex);
450
451     /* Also lock ioqueue */
452     pj_lock_acquire(ioqueue->lock);
453
454     pj_assert(ioqueue->count > 0);
455     --ioqueue->count;
456 #if !PJ_IOQUEUE_HAS_SAFE_UNREG
457     /* Ticket #520, key will be erased more than once */
458     pj_list_erase(key);
459 #endif
460     PJ_FD_CLR(key->fd, &ioqueue->rfdset);
461     PJ_FD_CLR(key->fd, &ioqueue->wfdset);
462 #if PJ_HAS_TCP
463     PJ_FD_CLR(key->fd, &ioqueue->xfdset);
464 #endif
465
466     /* Close socket. */
467     pj_sock_close(key->fd);
468
469     /* Clear callback */
470     key->cb.on_accept_complete = NULL;
471     key->cb.on_connect_complete = NULL;
472     key->cb.on_read_complete = NULL;
473     key->cb.on_write_complete = NULL;
474
475     /* Must release ioqueue lock first before decrementing counter, to
476      * prevent deadlock.
477      */
478     pj_lock_release(ioqueue->lock);
479
480 #if PJ_IOQUEUE_HAS_SAFE_UNREG
481     /* Mark key is closing. */
482     key->closing = 1;
483
484     /* Decrement counter. */
485     decrement_counter(key);
486
487     /* Done. */
488     pj_mutex_unlock(key->mutex);
489 #else
490     pj_mutex_destroy(key->mutex);
491 #endif
492
493     return PJ_SUCCESS;
494 }
495
496
497 /* This supposed to check whether the fd_set values are consistent
498  * with the operation currently set in each key.
499  */
500 #if VALIDATE_FD_SET
501 static void validate_sets(const pj_ioqueue_t *ioqueue,
502                           const pj_fd_set_t *rfdset,
503                           const pj_fd_set_t *wfdset,
504                           const pj_fd_set_t *xfdset)
505 {
506     pj_ioqueue_key_t *key;
507
508     /*
509      * This basicly would not work anymore.
510      * We need to lock key before performing the check, but we can't do
511      * so because we're holding ioqueue mutex. If we acquire key's mutex
512      * now, the will cause deadlock.
513      */
514     pj_assert(0);
515
516     key = ioqueue->active_list.next;
517     while (key != &ioqueue->active_list) {
518         if (!pj_list_empty(&key->read_list)
519 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
520             || !pj_list_empty(&key->accept_list)
521 #endif
522             ) 
523         {
524             pj_assert(PJ_FD_ISSET(key->fd, rfdset));
525         } 
526         else {
527             pj_assert(PJ_FD_ISSET(key->fd, rfdset) == 0);
528         }
529         if (!pj_list_empty(&key->write_list)
530 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
531             || key->connecting
532 #endif
533            )
534         {
535             pj_assert(PJ_FD_ISSET(key->fd, wfdset));
536         }
537         else {
538             pj_assert(PJ_FD_ISSET(key->fd, wfdset) == 0);
539         }
540 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP != 0
541         if (key->connecting)
542         {
543             pj_assert(PJ_FD_ISSET(key->fd, xfdset));
544         }
545         else {
546             pj_assert(PJ_FD_ISSET(key->fd, xfdset) == 0);
547         }
548 #endif /* PJ_HAS_TCP */
549
550         key = key->next;
551     }
552 }
553 #endif  /* VALIDATE_FD_SET */
554
555
556 /* ioqueue_remove_from_set()
557  * This function is called from ioqueue_dispatch_event() to instruct
558  * the ioqueue to remove the specified descriptor from ioqueue's descriptor
559  * set for the specified event.
560  */
561 static void ioqueue_remove_from_set( pj_ioqueue_t *ioqueue,
562                                      pj_ioqueue_key_t *key, 
563                                      enum ioqueue_event_type event_type)
564 {
565     pj_lock_acquire(ioqueue->lock);
566
567     if (event_type == READABLE_EVENT)
568         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->rfdset);
569     else if (event_type == WRITEABLE_EVENT)
570         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->wfdset);
571 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
572     else if (event_type == EXCEPTION_EVENT)
573         PJ_FD_CLR((pj_sock_t)key->fd, &ioqueue->xfdset);
574 #endif
575     else
576         pj_assert(0);
577
578     pj_lock_release(ioqueue->lock);
579 }
580
581 /*
582  * ioqueue_add_to_set()
583  * This function is called from pj_ioqueue_recv(), pj_ioqueue_send() etc
584  * to instruct the ioqueue to add the specified handle to ioqueue's descriptor
585  * set for the specified event.
586  */
587 static void ioqueue_add_to_set( pj_ioqueue_t *ioqueue,
588                                 pj_ioqueue_key_t *key,
589                                 enum ioqueue_event_type event_type )
590 {
591     pj_lock_acquire(ioqueue->lock);
592
593     if (event_type == READABLE_EVENT)
594         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->rfdset);
595     else if (event_type == WRITEABLE_EVENT)
596         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->wfdset);
597 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
598     else if (event_type == EXCEPTION_EVENT)
599         PJ_FD_SET((pj_sock_t)key->fd, &ioqueue->xfdset);
600 #endif
601     else
602         pj_assert(0);
603
604     pj_lock_release(ioqueue->lock);
605 }
606
607 #if PJ_IOQUEUE_HAS_SAFE_UNREG
608 /* Scan closing keys to be put to free list again */
609 static void scan_closing_keys(pj_ioqueue_t *ioqueue)
610 {
611     pj_time_val now;
612     pj_ioqueue_key_t *h;
613
614     pj_gettickcount(&now);
615     h = ioqueue->closing_list.next;
616     while (h != &ioqueue->closing_list) {
617         pj_ioqueue_key_t *next = h->next;
618
619         pj_assert(h->closing != 0);
620
621         if (PJ_TIME_VAL_GTE(now, h->free_time)) {
622             pj_list_erase(h);
623             pj_list_push_back(&ioqueue->free_list, h);
624         }
625         h = next;
626     }
627 }
628 #endif
629
630 #if defined(PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT) && \
631     PJ_IPHONE_OS_HAS_MULTITASKING_SUPPORT!=0
632 static pj_status_t replace_udp_sock(pj_ioqueue_key_t *h)
633 {
634     enum flags {
635         HAS_PEER_ADDR = 1,
636         HAS_QOS = 2
637     };
638     pj_sock_t old_sock, new_sock = PJ_INVALID_SOCKET;
639     pj_sockaddr local_addr, rem_addr;
640     int val, addr_len;
641     pj_fd_set_t *fds[3];
642     unsigned i, fds_cnt, flags=0;
643     pj_qos_params qos_params;
644     unsigned msec;
645     pj_status_t status;
646
647     pj_lock_acquire(h->ioqueue->lock);
648
649     old_sock = h->fd;
650
651     /* Can only replace UDP socket */
652     pj_assert(h->fd_type == pj_SOCK_DGRAM());
653
654     PJ_LOG(4,(THIS_FILE, "Attempting to replace UDP socket %d", old_sock));
655
656     /* Investigate the old socket */
657     addr_len = sizeof(local_addr);
658     status = pj_sock_getsockname(old_sock, &local_addr, &addr_len);
659     if (status != PJ_SUCCESS)
660         goto on_error;
661     
662     addr_len = sizeof(rem_addr);
663     status = pj_sock_getpeername(old_sock, &rem_addr, &addr_len);
664     if (status == PJ_SUCCESS)
665         flags |= HAS_PEER_ADDR;
666
667     status = pj_sock_get_qos_params(old_sock, &qos_params);
668     if (status == PJ_SUCCESS)
669         flags |= HAS_QOS;
670
671     /* We're done with the old socket, close it otherwise we'll get
672      * error in bind()
673      */
674     pj_sock_close(old_sock);
675
676     /* Prepare the new socket */
677     status = pj_sock_socket(local_addr.addr.sa_family, PJ_SOCK_DGRAM, 0,
678                             &new_sock);
679     if (status != PJ_SUCCESS)
680         goto on_error;
681
682     /* Even after the socket is closed, we'll still get "Address in use"
683      * errors, so force it with SO_REUSEADDR
684      */
685     val = 1;
686     status = pj_sock_setsockopt(new_sock, SOL_SOCKET, SO_REUSEADDR,
687                                 &val, sizeof(val));
688     if (status != PJ_SUCCESS)
689         goto on_error;
690
691     /* The loop is silly, but what else can we do? */
692     addr_len = pj_sockaddr_get_len(&local_addr);
693     for (msec=20; ; msec<1000? msec=msec*2 : 1000) {
694         status = pj_sock_bind(new_sock, &local_addr, addr_len);
695         if (status != PJ_STATUS_FROM_OS(EADDRINUSE))
696             break;
697         PJ_LOG(4,(THIS_FILE, "Address is still in use, retrying.."));
698         pj_thread_sleep(msec);
699     }
700
701     if (status != PJ_SUCCESS)
702         goto on_error;
703
704     if (flags & HAS_QOS) {
705         status = pj_sock_set_qos_params(new_sock, &qos_params);
706         if (status != PJ_SUCCESS)
707             goto on_error;
708     }
709
710     if (flags & HAS_PEER_ADDR) {
711         status = pj_sock_connect(new_sock, &rem_addr, addr_len);
712         if (status != PJ_SUCCESS)
713             goto on_error;
714     }
715
716     /* Set socket to nonblocking. */
717     val = 1;
718 #if defined(PJ_WIN32) && PJ_WIN32!=0 || \
719     defined(PJ_WIN32_WINCE) && PJ_WIN32_WINCE!=0
720     if (ioctlsocket(new_sock, FIONBIO, &val)) {
721 #else
722     if (ioctl(new_sock, FIONBIO, &val)) {
723 #endif
724         status = pj_get_netos_error();
725         goto on_error;
726     }
727
728     /* Replace the occurrence of old socket with new socket in the
729      * fd sets.
730      */
731     fds_cnt = 0;
732     fds[fds_cnt++] = &h->ioqueue->rfdset;
733     fds[fds_cnt++] = &h->ioqueue->wfdset;
734 #if PJ_HAS_TCP
735     fds[fds_cnt++] = &h->ioqueue->xfdset;
736 #endif
737
738     for (i=0; i<fds_cnt; ++i) {
739         if (PJ_FD_ISSET(old_sock, fds[i])) {
740             PJ_FD_CLR(old_sock, fds[i]);
741             PJ_FD_SET(new_sock, fds[i]);
742         }
743     }
744
745     /* And finally replace the fd in the key */
746     h->fd = new_sock;
747
748     PJ_LOG(4,(THIS_FILE, "UDP has been replaced successfully!"));
749
750     pj_lock_release(h->ioqueue->lock);
751
752     return PJ_SUCCESS;
753
754 on_error:
755     if (new_sock != PJ_INVALID_SOCKET)
756         pj_sock_close(new_sock);
757     PJ_PERROR(1,(THIS_FILE, status, "Error replacing socket"));
758     pj_lock_release(h->ioqueue->lock);
759     return status;
760 }
761 #endif
762
763
764 /*
765  * pj_ioqueue_poll()
766  *
767  * Few things worth written:
768  *
769  *  - we used to do only one callback called per poll, but it didn't go
770  *    very well. The reason is because on some situation, the write 
771  *    callback gets called all the time, thus doesn't give the read
772  *    callback to get called. This happens, for example, when user
773  *    submit write operation inside the write callback.
774  *    As the result, we changed the behaviour so that now multiple
775  *    callbacks are called in a single poll. It should be fast too,
776  *    just that we need to be carefull with the ioqueue data structs.
777  *
778  *  - to guarantee preemptiveness etc, the poll function must strictly
779  *    work on fd_set copy of the ioqueue (not the original one).
780  */
781 PJ_DEF(int) pj_ioqueue_poll( pj_ioqueue_t *ioqueue, const pj_time_val *timeout)
782 {
783     pj_fd_set_t rfdset, wfdset, xfdset;
784     int count, counter;
785     pj_ioqueue_key_t *h;
786     struct event
787     {
788         pj_ioqueue_key_t        *key;
789         enum ioqueue_event_type  event_type;
790     } event[PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL];
791
792     PJ_ASSERT_RETURN(ioqueue, -PJ_EINVAL);
793
794     /* Lock ioqueue before making fd_set copies */
795     pj_lock_acquire(ioqueue->lock);
796
797     /* We will only do select() when there are sockets to be polled.
798      * Otherwise select() will return error.
799      */
800     if (PJ_FD_COUNT(&ioqueue->rfdset)==0 &&
801         PJ_FD_COUNT(&ioqueue->wfdset)==0 
802 #if defined(PJ_HAS_TCP) && PJ_HAS_TCP!=0
803         && PJ_FD_COUNT(&ioqueue->xfdset)==0
804 #endif
805         )
806     {
807 #if PJ_IOQUEUE_HAS_SAFE_UNREG
808         scan_closing_keys(ioqueue);
809 #endif
810         pj_lock_release(ioqueue->lock);
811         TRACE__((THIS_FILE, "     poll: no fd is set"));
812         if (timeout)
813             pj_thread_sleep(PJ_TIME_VAL_MSEC(*timeout));
814         return 0;
815     }
816
817     /* Copy ioqueue's pj_fd_set_t to local variables. */
818     pj_memcpy(&rfdset, &ioqueue->rfdset, sizeof(pj_fd_set_t));
819     pj_memcpy(&wfdset, &ioqueue->wfdset, sizeof(pj_fd_set_t));
820 #if PJ_HAS_TCP
821     pj_memcpy(&xfdset, &ioqueue->xfdset, sizeof(pj_fd_set_t));
822 #else
823     PJ_FD_ZERO(&xfdset);
824 #endif
825
826 #if VALIDATE_FD_SET
827     validate_sets(ioqueue, &rfdset, &wfdset, &xfdset);
828 #endif
829
830     /* Unlock ioqueue before select(). */
831     pj_lock_release(ioqueue->lock);
832
833     count = pj_sock_select(ioqueue->nfds+1, &rfdset, &wfdset, &xfdset, 
834                            timeout);
835     
836     if (count == 0)
837         return 0;
838     else if (count < 0)
839         return -pj_get_netos_error();
840     else if (count > PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL)
841         count = PJ_IOQUEUE_MAX_EVENTS_IN_SINGLE_POLL;
842
843     /* Scan descriptor sets for event and add the events in the event
844      * array to be processed later in this function. We do this so that
845      * events can be processed in parallel without holding ioqueue lock.
846      */
847     pj_lock_acquire(ioqueue->lock);
848
849     counter = 0;
850
851     /* Scan for writable sockets first to handle piggy-back data
852      * coming with accept().
853      */
854     h = ioqueue->active_list.next;
855     for ( ; h!=&ioqueue->active_list && counter<count; h = h->next) {
856
857         if ( (key_has_pending_write(h) || key_has_pending_connect(h))
858              && PJ_FD_ISSET(h->fd, &wfdset) && !IS_CLOSING(h))
859         {
860 #if PJ_IOQUEUE_HAS_SAFE_UNREG
861             increment_counter(h);
862 #endif
863             event[counter].key = h;
864             event[counter].event_type = WRITEABLE_EVENT;
865             ++counter;
866         }
867
868         /* Scan for readable socket. */
869         if ((key_has_pending_read(h) || key_has_pending_accept(h))
870             && PJ_FD_ISSET(h->fd, &rfdset) && !IS_CLOSING(h) &&
871             counter<count)
872         {
873 #if PJ_IOQUEUE_HAS_SAFE_UNREG
874             increment_counter(h);
875 #endif
876             event[counter].key = h;
877             event[counter].event_type = READABLE_EVENT;
878             ++counter;
879         }
880
881 #if PJ_HAS_TCP
882         if (key_has_pending_connect(h) && PJ_FD_ISSET(h->fd, &xfdset) &&
883             !IS_CLOSING(h) && counter<count) 
884         {
885 #if PJ_IOQUEUE_HAS_SAFE_UNREG
886             increment_counter(h);
887 #endif
888             event[counter].key = h;
889             event[counter].event_type = EXCEPTION_EVENT;
890             ++counter;
891         }
892 #endif
893     }
894
895     pj_lock_release(ioqueue->lock);
896
897     count = counter;
898
899     /* Now process all events. The dispatch functions will take care
900      * of locking in each of the key
901      */
902     for (counter=0; counter<count; ++counter) {
903         switch (event[counter].event_type) {
904         case READABLE_EVENT:
905             ioqueue_dispatch_read_event(ioqueue, event[counter].key);
906             break;
907         case WRITEABLE_EVENT:
908             ioqueue_dispatch_write_event(ioqueue, event[counter].key);
909             break;
910         case EXCEPTION_EVENT:
911             ioqueue_dispatch_exception_event(ioqueue, event[counter].key);
912             break;
913         case NO_EVENT:
914             pj_assert(!"Invalid event!");
915             break;
916         }
917
918 #if PJ_IOQUEUE_HAS_SAFE_UNREG
919         decrement_counter(event[counter].key);
920 #endif
921     }
922
923
924     return count;
925 }
926