res_rtp_asterisk: Fix a myriad of TURN client issues.
authorJoshua Colp <jcolp@digium.com>
Tue, 16 Sep 2014 11:12:30 +0000 (11:12 +0000)
committerJoshua Colp <jcolp@digium.com>
Tue, 16 Sep 2014 11:12:30 +0000 (11:12 +0000)
1. The number of file descriptors an ioqueue instance can handle is fixed, so we
now spawn the required number to handle the load.
2. Our transport identifiers were exceeding the range supported by pjnath.
3. The TURN client did not set up client binding causing needless bandwidth usage.
4. The code no longer updates address information on each packet.
5. STUN traffic was getting looped back to Asterisk instead of going through the
TURN server.
6. Synchronization now ensures things are completely setup or destroyed.
7. Logging now reflects the target the TURN server is sending to/receiving from
on our behalf.

ASTERISK-23577 #close
Reported by: Jay Jideliov

ASTERISK-23634 #close
Reported by: Roman Skvirsky

Review: https://reviewboard.asterisk.org/r/3982/
........

Merged revisions 423150 from http://svn.asterisk.org/svn/asterisk/branches/11
........

Merged revisions 423151 from http://svn.asterisk.org/svn/asterisk/branches/12
........

Merged revisions 423152 from http://svn.asterisk.org/svn/asterisk/branches/13

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@423153 65c4cc65-6c06-0410-ace0-fbb531ad65f3

include/asterisk/rtp_engine.h
res/res_rtp_asterisk.c

index e5f38ee..db5cd34 100644 (file)
@@ -429,6 +429,10 @@ struct ast_rtp_engine_ice {
        void (*ice_lite)(struct ast_rtp_instance *instance);
        /*! Callback for changing our role in negotiation */
        void (*set_role)(struct ast_rtp_instance *instance, enum ast_rtp_ice_role role);
+       /*! Callback for requesting a TURN session */
+       void (*turn_request)(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component,
+               enum ast_transport transport, const char *server, unsigned int port,
+               const char *username, const char *password);
 };
 
 /*! \brief DTLS setup types */
index f192bac..739ae17 100644 (file)
@@ -83,9 +83,9 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
 #define MINIMUM_RTP_PORT 1024 /*!< Minimum port number to accept */
 #define MAXIMUM_RTP_PORT 65535 /*!< Maximum port number to accept */
 
-#define DEFAULT_TURN_PORT 34780
+#define DEFAULT_TURN_PORT 3478
 
-#define TURN_ALLOCATION_WAIT_TIME 2000
+#define TURN_STATE_WAIT_TIME 2000
 
 #define RTCP_PT_FUR     192
 #define RTCP_PT_SR      AST_RTP_RTCP_SR
@@ -149,20 +149,39 @@ static pj_str_t turnpassword;
 /*! \brief Pool factory used by pjlib to allocate memory. */
 static pj_caching_pool cachingpool;
 
-/*! \brief Pool used by pjlib functions which require memory allocation. */
+/*! \brief Global memory pool for configuration and timers */
 static pj_pool_t *pool;
 
-/*! \brief I/O queue for TURN relay traffic */
-static pj_ioqueue_t *ioqueue;
+/*! \brief Global timer heap */
+static pj_timer_heap_t *timer_heap;
 
-/*! \brief Timer heap for ICE and TURN stuff */
-static pj_timer_heap_t *timerheap;
+/*! \brief Thread executing the timer heap */
+static pj_thread_t *timer_thread;
 
-/*! \brief Worker thread for ICE/TURN */
-static pj_thread_t *thread;
+/*! \brief Used to tell the timer thread to terminate */
+static int timer_terminate;
+
+/*! \brief Structure which contains ioqueue thread information */
+struct ast_rtp_ioqueue_thread {
+       /*! \brief Pool used by the thread */
+       pj_pool_t *pool;
+       /*! \brief The thread handling the queue and timer heap */
+       pj_thread_t *thread;
+       /*! \brief Ioqueue which polls on sockets */
+       pj_ioqueue_t *ioqueue;
+       /*! \brief Timer heap for scheduled items */
+       pj_timer_heap_t *timerheap;
+       /*! \brief Termination request */
+       int terminate;
+       /*! \brief Current number of descriptors being waited on */
+       unsigned int count;
+       /*! \brief Linked list information */
+       AST_LIST_ENTRY(ast_rtp_ioqueue_thread) next;
+};
+
+/*! \brief List of ioqueue threads */
+static AST_LIST_HEAD_STATIC(ioqueues, ast_rtp_ioqueue_thread);
 
-/*! \brief Notification that the ICE/TURN worker thread should stop */
-static int worker_terminate;
 #endif
 
 #define FLAG_3389_WARNING               (1 << 0)
@@ -172,10 +191,10 @@ static int worker_terminate;
 #define FLAG_NEED_MARKER_BIT            (1 << 3)
 #define FLAG_DTMF_COMPENSATE            (1 << 4)
 
-#define TRANSPORT_SOCKET_RTP 1
-#define TRANSPORT_SOCKET_RTCP 2
-#define TRANSPORT_TURN_RTP 3
-#define TRANSPORT_TURN_RTCP 4
+#define TRANSPORT_SOCKET_RTP 0
+#define TRANSPORT_SOCKET_RTCP 1
+#define TRANSPORT_TURN_RTP 2
+#define TRANSPORT_TURN_RTCP 3
 
 /*! \brief RTP learning mode tracking information */
 struct rtp_learning_info {
@@ -276,7 +295,13 @@ struct ast_rtp {
        pj_turn_sock *turn_rtcp;    /*!< RTCP TURN relay */
        pj_turn_state_t turn_state; /*!< Current state of the TURN relay session */
        unsigned int passthrough:1; /*!< Bit to indicate that the received packet should be passed through */
+       unsigned int rtp_passthrough:1; /*!< Bit to indicate that TURN RTP should be passed through */
+       unsigned int rtcp_passthrough:1; /*!< Bit to indicate that TURN RTCP should be passed through */
        unsigned int ice_port;      /*!< Port that ICE was started with if it was previously started */
+       struct ast_sockaddr rtp_loop; /*!< Loopback address for forwarding RTP from TURN */
+       struct ast_sockaddr rtcp_loop; /*!< Loopback address for forwarding RTCP from TURN */
+
+       struct ast_rtp_ioqueue_thread *ioqueue; /*!< The ioqueue thread handling us */
 
        char remote_ufrag[256];  /*!< The remote ICE username */
        char remote_passwd[256]; /*!< The remote ICE password */
@@ -423,10 +448,11 @@ static void dtls_srtp_check_pending(struct ast_rtp_instance *instance, struct as
 
 static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp);
 
+#ifdef HAVE_PJPROJECT
 /*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */
-static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component, struct ast_sockaddr *cand_address)
+static void update_address_with_ice_candidate(struct ast_rtp *rtp, enum ast_rtp_ice_component_type component,
+       struct ast_sockaddr *cand_address)
 {
-#ifdef HAVE_PJPROJECT
        char address[PJ_INET6_ADDRSTRLEN];
 
        if (!rtp->ice || (component < 1) || !rtp->ice->comp[component - 1].valid_check) {
@@ -435,10 +461,20 @@ static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component
 
        ast_sockaddr_parse(cand_address, pj_sockaddr_print(&rtp->ice->comp[component - 1].valid_check->rcand->addr, address, sizeof(address), 0), 0);
        ast_sockaddr_set_port(cand_address, pj_sockaddr_get_port(&rtp->ice->comp[component - 1].valid_check->rcand->addr));
-#endif
 }
 
-#ifdef HAVE_PJPROJECT
+/*! \brief Helper function which sets up channel binding on a TURN session if applicable */
+static void turn_enable_bind_channel(struct ast_rtp *rtp, pj_turn_sock *turn, int component, int transport)
+{
+       if (!rtp->ice || !turn || (component < 1) || !rtp->ice->comp[component - 1].valid_check ||
+               (rtp->ice->comp[component - 1].valid_check->lcand->transport_id != transport)) {
+               return;
+       }
+
+       pj_turn_sock_bind_channel(turn, &rtp->ice->comp[component - 1].valid_check->rcand->addr,
+               sizeof(rtp->ice->comp[component - 1].valid_check->rcand->addr));
+}
+
 /*! \brief Destructor for locally created ICE candidates */
 static void ast_rtp_ice_candidate_destroy(void *obj)
 {
@@ -669,7 +705,7 @@ static void ast_rtp_ice_start(struct ast_rtp_instance *instance)
        if (pj_ice_sess_create_check_list(rtp->ice, &ufrag, &passwd, ao2_container_count(rtp->ice_active_remote_candidates), &candidates[0]) == PJ_SUCCESS) {
                ast_test_suite_event_notify("ICECHECKLISTCREATE", "Result: SUCCESS");
                pj_ice_sess_start_check(rtp->ice);
-               pj_timer_heap_poll(timerheap, NULL);
+               pj_timer_heap_poll(timer_heap, NULL);
                rtp->strict_rtp_state = STRICT_RTP_OPEN;
                return;
        }
@@ -795,6 +831,335 @@ static void ast_rtp_ice_add_cand(struct ast_rtp *rtp, unsigned comp_id, unsigned
        ao2_ref(candidate, -1);
 }
 
+static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+       struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+       pj_status_t status;
+
+       status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr,
+               addr_len);
+       if (status != PJ_SUCCESS) {
+               char buf[100];
+
+               pj_strerror(status, buf, sizeof(buf));
+               ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+                       (int)status, buf);
+               return;
+       }
+       if (!rtp->rtp_passthrough) {
+               return;
+       }
+       rtp->rtp_passthrough = 0;
+
+       ast_sendto(rtp->s, pkt, pkt_len, 0, &rtp->rtp_loop);
+}
+
+static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+       struct ast_rtp *rtp;
+
+       /* If this is a leftover from an already notified RTP instance just ignore the state change */
+       if (!instance) {
+               return;
+       }
+
+       rtp = ast_rtp_instance_get_data(instance);
+
+       /* We store the new state so the other thread can actually handle it */
+       ast_mutex_lock(&rtp->lock);
+       rtp->turn_state = new_state;
+       ast_cond_signal(&rtp->cond);
+
+       if (new_state == PJ_TURN_STATE_DESTROYING) {
+               pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
+               rtp->turn_rtp = NULL;
+       }
+
+       ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
+       .on_rx_data = ast_rtp_on_turn_rx_rtp_data,
+       .on_state = ast_rtp_on_turn_rtp_state,
+};
+
+static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+       struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+       pj_status_t status;
+
+       status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr,
+               addr_len);
+       if (status != PJ_SUCCESS) {
+               char buf[100];
+
+               pj_strerror(status, buf, sizeof(buf));
+               ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+                       (int)status, buf);
+               return;
+       }
+       if (!rtp->rtcp_passthrough) {
+               return;
+       }
+       rtp->rtcp_passthrough = 0;
+
+       ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp_loop);
+}
+
+static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+       struct ast_rtp *rtp = NULL;
+
+       /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
+       if (!instance) {
+               return;
+       }
+
+       rtp = ast_rtp_instance_get_data(instance);
+
+       /* We store the new state so the other thread can actually handle it */
+       ast_mutex_lock(&rtp->lock);
+       rtp->turn_state = new_state;
+       ast_cond_signal(&rtp->cond);
+
+       if (new_state == PJ_TURN_STATE_DESTROYING) {
+               pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
+               rtp->turn_rtcp = NULL;
+       }
+
+       ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTCP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
+       .on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
+       .on_state = ast_rtp_on_turn_rtcp_state,
+};
+
+/*! \brief Worker thread for ioqueue and timerheap */
+static int ioqueue_worker_thread(void *data)
+{
+       struct ast_rtp_ioqueue_thread *ioqueue = data;
+
+       while (!ioqueue->terminate) {
+               const pj_time_val delay = {0, 10};
+
+               pj_ioqueue_poll(ioqueue->ioqueue, &delay);
+
+               pj_timer_heap_poll(ioqueue->timerheap, NULL);
+       }
+
+       return 0;
+}
+
+/*! \brief Destroyer for ioqueue thread */
+static void rtp_ioqueue_thread_destroy(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+       if (ioqueue->thread) {
+               ioqueue->terminate = 1;
+               pj_thread_join(ioqueue->thread);
+               pj_thread_destroy(ioqueue->thread);
+       }
+
+       pj_pool_release(ioqueue->pool);
+       ast_free(ioqueue);
+}
+
+/*! \brief Removal function for ioqueue thread, determines if it should be terminated and destroyed */
+static void rtp_ioqueue_thread_remove(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+       int destroy = 0;
+
+       /* If nothing is using this ioqueue thread destroy it */
+       AST_LIST_LOCK(&ioqueues);
+       if ((ioqueue->count - 2) == 0) {
+               destroy = 1;
+               AST_LIST_REMOVE(&ioqueues, ioqueue, next);
+       }
+       AST_LIST_UNLOCK(&ioqueues);
+
+       if (!destroy) {
+               return;
+       }
+
+       rtp_ioqueue_thread_destroy(ioqueue);
+}
+
+/*! \brief Finder and allocator for an ioqueue thread */
+static struct ast_rtp_ioqueue_thread *rtp_ioqueue_thread_get_or_create(void)
+{
+       struct ast_rtp_ioqueue_thread *ioqueue;
+       pj_lock_t *lock;
+
+       AST_LIST_LOCK(&ioqueues);
+
+       /* See if an ioqueue thread exists that can handle more */
+       AST_LIST_TRAVERSE(&ioqueues, ioqueue, next) {
+               if ((ioqueue->count + 2) < PJ_IOQUEUE_MAX_HANDLES) {
+                       break;
+               }
+       }
+
+       /* If we found one bump it up and return it */
+       if (ioqueue) {
+               ioqueue->count += 2;
+               goto end;
+       }
+
+       ioqueue = ast_calloc(1, sizeof(*ioqueue));
+       if (!ioqueue) {
+               goto end;
+       }
+
+       ioqueue->pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
+
+       /* We use a timer on the ioqueue thread for TURN so that two threads aren't operating
+        * on a session at the same time
+        */
+       if (pj_timer_heap_create(ioqueue->pool, 4, &ioqueue->timerheap) != PJ_SUCCESS) {
+               goto fatal;
+       }
+
+       if (pj_lock_create_recursive_mutex(ioqueue->pool, "rtp%p", &lock) != PJ_SUCCESS) {
+               goto fatal;
+       }
+
+       pj_timer_heap_set_lock(ioqueue->timerheap, lock, PJ_TRUE);
+
+       if (pj_ioqueue_create(ioqueue->pool, 16, &ioqueue->ioqueue) != PJ_SUCCESS) {
+               goto fatal;
+       }
+
+       if (pj_thread_create(ioqueue->pool, "ice", &ioqueue_worker_thread, ioqueue, 0, 0, &ioqueue->thread) != PJ_SUCCESS) {
+               goto fatal;
+       }
+
+       AST_LIST_INSERT_HEAD(&ioqueues, ioqueue, next);
+
+       /* Since this is being returned to an active session the count always starts at 2 */
+       ioqueue->count = 2;
+
+       goto end;
+
+fatal:
+       rtp_ioqueue_thread_destroy(ioqueue);
+       ioqueue = NULL;
+
+end:
+       AST_LIST_UNLOCK(&ioqueues);
+       return ioqueue;
+}
+
+static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component,
+               enum ast_transport transport, const char *server, unsigned int port, const char *username, const char *password)
+{
+       struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+       pj_turn_sock **turn_sock;
+       const pj_turn_sock_cb *turn_cb;
+       pj_turn_tp_type conn_type;
+       int conn_transport;
+       pj_stun_auth_cred cred = { 0, };
+       pj_str_t turn_addr;
+       struct ast_sockaddr addr = { { 0, } };
+       pj_stun_config stun_config;
+       struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+       struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
+       pj_turn_session_info info;
+       struct ast_sockaddr local, loop;
+
+       ast_rtp_instance_get_local_address(instance, &local);
+       if (ast_sockaddr_is_ipv4(&local)) {
+               ast_sockaddr_parse(&loop, "127.0.0.1", PARSE_PORT_FORBID);
+       } else {
+               ast_sockaddr_parse(&loop, "::1", PARSE_PORT_FORBID);
+       }
+
+       /* Determine what component we are requesting a TURN session for */
+       if (component == AST_RTP_ICE_COMPONENT_RTP) {
+               turn_sock = &rtp->turn_rtp;
+               turn_cb = &ast_rtp_turn_rtp_sock_cb;
+               conn_transport = TRANSPORT_TURN_RTP;
+               ast_sockaddr_set_port(&loop, ast_sockaddr_port(&local));
+       } else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+               turn_sock = &rtp->turn_rtcp;
+               turn_cb = &ast_rtp_turn_rtcp_sock_cb;
+               conn_transport = TRANSPORT_TURN_RTCP;
+               ast_sockaddr_set_port(&loop, ast_sockaddr_port(&rtp->rtcp->us));
+       } else {
+               return;
+       }
+
+       if (transport == AST_TRANSPORT_UDP) {
+               conn_type = PJ_TURN_TP_UDP;
+       } else if (transport == AST_TRANSPORT_TCP) {
+               conn_type = PJ_TURN_TP_TCP;
+       } else {
+               ast_assert(0);
+               return;
+       }
+
+       ast_sockaddr_parse(&addr, server, PARSE_PORT_FORBID);
+
+       ast_mutex_lock(&rtp->lock);
+       if (*turn_sock) {
+               pj_turn_sock_destroy(*turn_sock);
+               rtp->turn_state = PJ_TURN_STATE_NULL;
+               while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+                       ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+               }
+       }
+       ast_mutex_unlock(&rtp->lock);
+
+       if (component == AST_RTP_ICE_COMPONENT_RTP && !rtp->ioqueue) {
+               rtp->ioqueue = rtp_ioqueue_thread_get_or_create();
+               if (!rtp->ioqueue) {
+                       return;
+               }
+       }
+
+       pj_stun_config_init(&stun_config, &cachingpool.factory, 0, rtp->ioqueue->ioqueue, rtp->ioqueue->timerheap);
+
+       if (pj_turn_sock_create(&stun_config, ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type,
+               turn_cb, NULL, instance, turn_sock) != PJ_SUCCESS) {
+               ast_log(LOG_WARNING, "Could not create a TURN client socket\n");
+               return;
+       }
+
+       cred.type = PJ_STUN_AUTH_CRED_STATIC;
+       pj_strset2(&cred.data.static_cred.username, (char*)username);
+       cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+       pj_strset2(&cred.data.static_cred.data, (char*)password);
+
+       /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
+       ast_mutex_lock(&rtp->lock);
+       pj_turn_sock_alloc(*turn_sock, pj_cstr(&turn_addr, server), port, NULL, &cred, NULL);
+       while (rtp->turn_state < PJ_TURN_STATE_READY) {
+               ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+       }
+       ast_mutex_unlock(&rtp->lock);
+
+       /* If a TURN session was allocated add it as a candidate */
+       if (rtp->turn_state != PJ_TURN_STATE_READY) {
+               return;
+       }
+
+       pj_turn_sock_get_info(*turn_sock, &info);
+
+       ast_rtp_ice_add_cand(rtp, component, conn_transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr,
+               &info.relay_addr, NULL, pj_sockaddr_get_len(&info.relay_addr));
+
+       if (component == AST_RTP_ICE_COMPONENT_RTP) {
+               ast_sockaddr_copy(&rtp->rtp_loop, &loop);
+       } else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+               ast_sockaddr_copy(&rtp->rtcp_loop, &loop);
+       }
+}
+
 static char *generate_random_string(char *buf, size_t size)
 {
         long val[4];
@@ -819,6 +1184,7 @@ static struct ast_rtp_engine_ice ast_rtp_ice = {
        .get_local_candidates = ast_rtp_ice_get_local_candidates,
        .ice_lite = ast_rtp_ice_lite,
        .set_role = ast_rtp_ice_set_role,
+       .turn_request = ast_rtp_ice_turn_request,
 };
 #endif
 
@@ -1239,6 +1605,22 @@ static void ast_rtp_on_ice_complete(pj_ice_sess *ice, pj_status_t status)
 {
        struct ast_rtp_instance *instance = ice->user_data;
        struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+
+       if (status == PJ_SUCCESS) {
+               struct ast_sockaddr remote_address;
+
+               /* Symmetric RTP must be disabled for the remote address to not get overwritten */
+               ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0);
+
+               update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
+               ast_rtp_instance_set_remote_address(instance, &remote_address);
+               turn_enable_bind_channel(rtp, rtp->turn_rtp, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP);
+
+               if (rtp->rtcp) {
+                       update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them);
+                       turn_enable_bind_channel(rtp, rtp->turn_rtcp, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP);
+               }
+       }
  
 #ifdef HAVE_OPENSSL_SRTP
        dtls_perform_handshake(instance, &rtp->dtls, 0);
@@ -1263,7 +1645,13 @@ static void ast_rtp_on_ice_rx_data(pj_ice_sess *ice, unsigned comp_id, unsigned
 
        /* Instead of handling the packet here (which really doesn't work with our architecture) we set a bit to indicate that it should be handled after pj_ice_sess_on_rx_pkt
         * returns */
-       rtp->passthrough = 1;
+       if (transport_id == TRANSPORT_SOCKET_RTP || transport_id == TRANSPORT_SOCKET_RTCP) {
+               rtp->passthrough = 1;
+       } else if (transport_id == TRANSPORT_TURN_RTP) {
+               rtp->rtp_passthrough = 1;
+       } else if (transport_id == TRANSPORT_TURN_RTCP) {
+               rtp->rtcp_passthrough = 1;
+       }
 }
 
 static pj_status_t ast_rtp_on_ice_tx_pkt(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, const void *pkt, pj_size_t size, const pj_sockaddr_t *dst_addr, unsigned dst_addr_len)
@@ -1309,106 +1697,11 @@ static pj_ice_sess_cb ast_rtp_ice_sess_cb = {
        .on_tx_pkt = ast_rtp_on_ice_tx_pkt,
 };
 
-static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
-       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-       struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-       struct ast_sockaddr dest = { { 0, }, };
-
-       ast_rtp_instance_get_local_address(instance, &dest);
-
-       ast_sendto(rtp->s, pkt, pkt_len, 0, &dest);
-}
-
-static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
-       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-       struct ast_rtp *rtp = NULL;
-
-       /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
-       if (!instance) {
-               return;
-       }
-
-       rtp = ast_rtp_instance_get_data(instance);
-
-       /* If the TURN session is being destroyed we need to remove it from the RTP instance */
-       if (new_state == PJ_TURN_STATE_DESTROYING) {
-               rtp->turn_rtp = NULL;
-               return;
-       }
-
-       /* We store the new state so the other thread can actually handle it */
-       ast_mutex_lock(&rtp->lock);
-       rtp->turn_state = new_state;
-
-       /* If this is a state that the main thread should be notified about do so */
-       if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
-               ast_cond_signal(&rtp->cond);
-       }
-
-       ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
-       .on_rx_data = ast_rtp_on_turn_rx_rtp_data,
-       .on_state = ast_rtp_on_turn_rtp_state,
-};
-
-static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
-       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-       struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-
-       ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp->us);
-}
-
-static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
-       struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
-       struct ast_rtp *rtp = NULL;
-
-       /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
-       if (!instance) {
-               return;
-       }
-
-       rtp = ast_rtp_instance_get_data(instance);
-
-       /* If the TURN session is being destroyed we need to remove it from the RTP instance */
-       if (new_state == PJ_TURN_STATE_DESTROYING) {
-               rtp->turn_rtcp = NULL;
-               return;
-       }
-
-       /* We store the new state so the other thread can actually handle it */
-       ast_mutex_lock(&rtp->lock);
-       rtp->turn_state = new_state;
-
-       /* If this is a state that the main thread should be notified about do so */
-       if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
-               ast_cond_signal(&rtp->cond);
-       }
-
-       ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTCP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
-       .on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
-       .on_state = ast_rtp_on_turn_rtcp_state,
-};
-
-/*! \brief Worker thread for I/O queue and timerheap */
-static int ice_worker_thread(void *data)
+/*! \brief Worker thread for timerheap */
+static int timer_worker_thread(void *data)
 {
-       while (!worker_terminate) {
-               const pj_time_val delay = {0, 10};
-
-               pj_ioqueue_poll(ioqueue, &delay);
-
-               pj_timer_heap_poll(timerheap, NULL);
+       while (!timer_terminate) {
+               pj_timer_heap_poll(timer_heap, NULL);
        }
 
        return 0;
@@ -1697,6 +1990,9 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
        struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
        struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance);
        char *in = buf;
+#ifdef HAVE_PJPROJECT
+       struct ast_sockaddr *loop = rtcp ? &rtp->rtcp_loop : &rtp->rtp_loop;
+#endif
 
        if ((len = ast_recvfrom(rtcp ? rtp->rtcp->s : rtp->s, buf, size, flags, sa)) < 0) {
           return len;
@@ -1752,7 +2048,16 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
 #endif
 
 #ifdef HAVE_PJPROJECT
-       if (rtp->ice) {
+       if (!ast_sockaddr_isnull(loop) && !ast_sockaddr_cmp(loop, sa)) {
+               /* ICE traffic will have been handled in the TURN callback, so skip it but update the address
+                * so it reflects the actual source and not the loopback
+                */
+               if (rtcp) {
+                       ast_sockaddr_copy(sa, &rtp->rtcp->them);
+               } else {
+                       ast_rtp_instance_get_remote_address(instance, sa);
+               }
+       } else if (rtp->ice) {
                pj_str_t combined = pj_str(ast_sockaddr_stringify(sa));
                pj_sockaddr address;
                pj_status_t status;
@@ -1769,7 +2074,7 @@ static int __rtp_recvfrom(struct ast_rtp_instance *instance, void *buf, size_t s
 
                        pj_strerror(status, buf, sizeof(buf));
                        ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
-                               (int) status, buf);
+                               (int)status, buf);
                        return -1;
                }
                if (!rtp->passthrough) {
@@ -1942,7 +2247,7 @@ static int rtp_learning_rtp_seq_update(struct rtp_learning_info *info, uint16_t
 
 #ifdef HAVE_PJPROJECT
 static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct ast_rtp *rtp, struct ast_sockaddr *addr, int port, int component,
-                                     int transport, const pj_turn_sock_cb *turn_cb, pj_turn_sock **turn_sock)
+                                     int transport)
 {
        pj_sockaddr address[16];
        unsigned int count = PJ_ARRAY_SIZE(address), pos = 0;
@@ -1981,38 +2286,9 @@ static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct
        }
 
        /* If configured to use a TURN relay create a session and allocate */
-       if (pj_strlen(&turnaddr) && pj_turn_sock_create(&rtp->ice->stun_cfg, ast_sockaddr_is_ipv4(addr) ? pj_AF_INET() : pj_AF_INET6(), PJ_TURN_TP_TCP,
-                                                       turn_cb, NULL, instance, turn_sock) == PJ_SUCCESS) {
-               pj_stun_auth_cred cred = { 0, };
-               struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_ALLOCATION_WAIT_TIME, 1000));
-               struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
-
-               cred.type = PJ_STUN_AUTH_CRED_STATIC;
-               cred.data.static_cred.username = turnusername;
-               cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
-               cred.data.static_cred.data = turnpassword;
-
-               /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
-               ast_mutex_lock(&rtp->lock);
-               pj_turn_sock_alloc(*turn_sock, &turnaddr, turnport, NULL, &cred, NULL);
-               ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
-               ast_mutex_unlock(&rtp->lock);
-
-               /* If a TURN session was allocated add it as a candidate */
-               if (rtp->turn_state == PJ_TURN_STATE_READY) {
-                       pj_turn_session_info info;
-
-                       pj_turn_sock_get_info(*turn_sock, &info);
-
-                       if (transport == TRANSPORT_SOCKET_RTP) {
-                               transport = TRANSPORT_TURN_RTP;
-                       } else if (transport == TRANSPORT_SOCKET_RTCP) {
-                               transport = TRANSPORT_TURN_RTCP;
-                       }
-
-                       ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, &info.relay_addr,
-                                            NULL, pj_sockaddr_get_len(&info.relay_addr));
-               }
+       if (pj_strlen(&turnaddr)) {
+               ast_rtp_ice_turn_request(instance, component, AST_TRANSPORT_TCP, pj_strbuf(&turnaddr), turnport,
+                       pj_strbuf(&turnusername), pj_strbuf(&turnpassword));
        }
 }
 #endif
@@ -2071,7 +2347,7 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
 
        pj_thread_register_check();
 
-       pj_stun_config_init(&stun_config, &cachingpool.factory, 0, ioqueue, timerheap);
+       pj_stun_config_init(&stun_config, &cachingpool.factory, 0, NULL, timer_heap);
 
        ufrag = pj_str(rtp->local_ufrag);
        passwd = pj_str(rtp->local_passwd);
@@ -2084,14 +2360,14 @@ static int ice_create(struct ast_rtp_instance *instance, struct ast_sockaddr *ad
 
                /* Add all of the available candidates to the ICE session */
                rtp_add_candidates_to_ice(instance, rtp, addr, port, AST_RTP_ICE_COMPONENT_RTP,
-                       TRANSPORT_SOCKET_RTP, &ast_rtp_turn_rtp_sock_cb, &rtp->turn_rtp);
+                       TRANSPORT_SOCKET_RTP);
 
                /* Only add the RTCP candidates to ICE when replacing the session. New sessions
                 * handle this in a separate part of the setup phase */
                if (replace && rtp->rtcp) {
                        rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us,
                                ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP,
-                               TRANSPORT_SOCKET_RTCP, &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+                               TRANSPORT_SOCKET_RTCP);
                }
 
                return 0;
@@ -2200,6 +2476,8 @@ static int ast_rtp_new(struct ast_rtp_instance *instance,
 static int ast_rtp_destroy(struct ast_rtp_instance *instance)
 {
        struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+       struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+       struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
 
        /* Destroy the smoother that was smoothing out audio if present */
        if (rtp->smoother) {
@@ -2236,21 +2514,33 @@ static int ast_rtp_destroy(struct ast_rtp_instance *instance)
 #ifdef HAVE_PJPROJECT
        pj_thread_register_check();
 
-       /* Destroy the ICE session if being used */
-       if (rtp->ice) {
-               pj_ice_sess_destroy(rtp->ice);
-       }
-
        /* Destroy the RTP TURN relay if being used */
+       ast_mutex_lock(&rtp->lock);
        if (rtp->turn_rtp) {
-               pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
                pj_turn_sock_destroy(rtp->turn_rtp);
+               rtp->turn_state = PJ_TURN_STATE_NULL;
+               while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+                       ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+               }
        }
 
        /* Destroy the RTCP TURN relay if being used */
        if (rtp->turn_rtcp) {
-               pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
                pj_turn_sock_destroy(rtp->turn_rtcp);
+               rtp->turn_state = PJ_TURN_STATE_NULL;
+               while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+                       ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+               }
+       }
+       ast_mutex_unlock(&rtp->lock);
+
+       if (rtp->ioqueue) {
+               rtp_ioqueue_thread_remove(rtp->ioqueue);
+       }
+
+       /* Destroy the ICE session if being used */
+       if (rtp->ice) {
+               pj_ice_sess_destroy(rtp->ice);
        }
 
        /* Destroy any candidates */
@@ -2357,7 +2647,6 @@ static int ast_rtp_dtmf_begin(struct ast_rtp_instance *instance, char digit)
                                ast_sockaddr_stringify(&remote_address),
                                strerror(errno));
                }
-               update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
                if (rtp_debug_test_addr(&remote_address)) {
                        ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
                                    ast_sockaddr_stringify(&remote_address),
@@ -2407,8 +2696,6 @@ static int ast_rtp_dtmf_continuation(struct ast_rtp_instance *instance)
                        strerror(errno));
        }
 
-       update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
        if (rtp_debug_test_addr(&remote_address)) {
                ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
                            ast_sockaddr_stringify(&remote_address),
@@ -2483,8 +2770,6 @@ static int ast_rtp_dtmf_end_with_duration(struct ast_rtp_instance *instance, cha
                                strerror(errno));
                }
 
-               update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
                if (rtp_debug_test_addr(&remote_address)) {
                        ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
                                    ast_sockaddr_stringify(&remote_address),
@@ -2745,8 +3030,6 @@ static int ast_rtcp_write_report(struct ast_rtp_instance *instance, int sr)
                rtp->rtcp->rr_count++;
        }
 
-       update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &remote_address);
-
        if (rtcp_debug_test_addr(&rtp->rtcp->them)) {
                ast_verbose("* Sent RTCP %s to %s%s\n", sr ? "SR" : "RR",
                                ast_sockaddr_stringify(&remote_address), ice ? " (via ICE)" : "");
@@ -2940,8 +3223,6 @@ static int ast_rtp_raw_write(struct ast_rtp_instance *instance, struct ast_frame
                        }
                }
 
-               update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
                if (rtp_debug_test_addr(&remote_address)) {
                        ast_verbose("Sent RTP packet to      %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
                                    ast_sockaddr_stringify(&remote_address),
@@ -3910,8 +4191,6 @@ static int bridge_p2p_rtp_write(struct ast_rtp_instance *instance, unsigned int
                return 0;
        }
 
-       update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
        if (rtp_debug_test_addr(&remote_address)) {
                ast_verbose("Sent RTP P2P packet to %s%s (type %-2.2d, len %-6.6d)\n",
                            ast_sockaddr_stringify(&remote_address),
@@ -4388,8 +4667,7 @@ static void ast_rtp_prop_set(struct ast_rtp_instance *instance, enum ast_rtp_pro
 
 #ifdef HAVE_PJPROJECT
                        if (rtp->ice) {
-                               rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP,
-                                                         &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+                               rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP);
                        }
 #endif
 
@@ -4681,8 +4959,6 @@ static int ast_rtp_sendcng(struct ast_rtp_instance *instance, int level)
                return res;
        }
 
-       update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
        if (rtp_debug_test_addr(&remote_address)) {
                ast_verbose("Sent Comfort Noise RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
                            ast_sockaddr_stringify(&remote_address),
@@ -4949,17 +5225,17 @@ static int rtp_reload(int reload)
                        if (ast_parse_arg(s, PARSE_INADDR, &addr)) {
                                ast_log(LOG_WARNING, "Invalid TURN server address: %s\n", s);
                        } else {
-                               pj_strdup2(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
+                               pj_strdup2_with_null(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
                                /* ntohs() is not a bug here. The port number is used in host byte order with
                                 * a pjnat API. */
                                turnport = ntohs(addr.sin_port);
                        }
                }
                if ((s = ast_variable_retrieve(cfg, "general", "turnusername"))) {
-                       pj_strdup2(pool, &turnusername, s);
+                       pj_strdup2_with_null(pool, &turnusername, s);
                }
                if ((s = ast_variable_retrieve(cfg, "general", "turnpassword"))) {
-                       pj_strdup2(pool, &turnpassword, s);
+                       pj_strdup2_with_null(pool, &turnpassword, s);
                }
 #endif
                ast_config_destroy(cfg);
@@ -4979,6 +5255,20 @@ static int reload_module(void)
        return 0;
 }
 
+#ifdef HAVE_PJPROJECT
+static void rtp_terminate_pjproject(void)
+{
+       if (timer_thread) {
+               timer_terminate = 1;
+               pj_thread_join(timer_thread);
+               pj_thread_destroy(timer_thread);
+       }
+
+       pj_caching_pool_destroy(&cachingpool);
+       pj_shutdown();
+}
+#endif
+
 static int load_module(void)
 {
 #ifdef HAVE_PJPROJECT
@@ -4989,65 +5279,49 @@ static int load_module(void)
        }
 
        if (pjlib_util_init() != PJ_SUCCESS) {
-               pj_shutdown();
+               rtp_terminate_pjproject();
                return AST_MODULE_LOAD_DECLINE;
        }
 
        if (pjnath_init() != PJ_SUCCESS) {
-               pj_shutdown();
+               rtp_terminate_pjproject();
                return AST_MODULE_LOAD_DECLINE;
        }
 
        pj_caching_pool_init(&cachingpool, &pj_pool_factory_default_policy, 0);
 
-       pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
+       pool = pj_pool_create(&cachingpool.factory, "timer", 512, 512, NULL);
 
-       if (pj_timer_heap_create(pool, 100, &timerheap) != PJ_SUCCESS) {
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
+       if (pj_timer_heap_create(pool, 100, &timer_heap) != PJ_SUCCESS) {
+               rtp_terminate_pjproject();
                return AST_MODULE_LOAD_DECLINE;
        }
 
        if (pj_lock_create_recursive_mutex(pool, "rtp%p", &lock) != PJ_SUCCESS) {
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
+               rtp_terminate_pjproject();
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       pj_timer_heap_set_lock(timerheap, lock, PJ_TRUE);
+       pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE);
 
-       if (pj_ioqueue_create(pool, 16, &ioqueue) != PJ_SUCCESS) {
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
+       if (pj_thread_create(pool, "timer", &timer_worker_thread, NULL, 0, 0, &timer_thread) != PJ_SUCCESS) {
+               rtp_terminate_pjproject();
                return AST_MODULE_LOAD_DECLINE;
        }
 
-       if (pj_thread_create(pool, "ice", &ice_worker_thread, NULL, 0, 0, &thread) != PJ_SUCCESS) {
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
-               return AST_MODULE_LOAD_DECLINE;
-       }
 #endif
 
        if (ast_rtp_engine_register(&asterisk_rtp_engine)) {
 #ifdef HAVE_PJPROJECT
-               worker_terminate = 1;
-               pj_thread_join(thread);
-               pj_thread_destroy(thread);
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
+               rtp_terminate_pjproject();
 #endif
                return AST_MODULE_LOAD_DECLINE;
        }
 
        if (ast_cli_register_multiple(cli_rtp, ARRAY_LEN(cli_rtp))) {
 #ifdef HAVE_PJPROJECT
-               worker_terminate = 1;
-               pj_thread_join(thread);
-               pj_thread_destroy(thread);
                ast_rtp_engine_unregister(&asterisk_rtp_engine);
-               pj_caching_pool_destroy(&cachingpool);
-               pj_shutdown();
+               rtp_terminate_pjproject();
 #endif
                return AST_MODULE_LOAD_DECLINE;
        }
@@ -5063,15 +5337,8 @@ static int unload_module(void)
        ast_cli_unregister_multiple(cli_rtp, ARRAY_LEN(cli_rtp));
 
 #ifdef HAVE_PJPROJECT
-       worker_terminate = 1;
-
        pj_thread_register_check();
-
-       pj_thread_join(thread);
-       pj_thread_destroy(thread);
-
-       pj_caching_pool_destroy(&cachingpool);
-       pj_shutdown();
+       rtp_terminate_pjproject();
 #endif
 
        return 0;