#define MINIMUM_RTP_PORT 1024 /*!< Minimum port number to accept */
#define MAXIMUM_RTP_PORT 65535 /*!< Maximum port number to accept */
-#define DEFAULT_TURN_PORT 34780
+#define DEFAULT_TURN_PORT 3478
-#define TURN_ALLOCATION_WAIT_TIME 2000
+#define TURN_STATE_WAIT_TIME 2000
#define RTCP_PT_FUR 192
#define RTCP_PT_SR AST_RTP_RTCP_SR
/*! \brief Pool factory used by pjlib to allocate memory. */
static pj_caching_pool cachingpool;
-/*! \brief Pool used by pjlib functions which require memory allocation. */
+/*! \brief Global memory pool for configuration and timers */
static pj_pool_t *pool;
-/*! \brief I/O queue for TURN relay traffic */
-static pj_ioqueue_t *ioqueue;
+/*! \brief Global timer heap */
+static pj_timer_heap_t *timer_heap;
-/*! \brief Timer heap for ICE and TURN stuff */
-static pj_timer_heap_t *timerheap;
+/*! \brief Thread executing the timer heap */
+static pj_thread_t *timer_thread;
-/*! \brief Worker thread for ICE/TURN */
-static pj_thread_t *thread;
+/*! \brief Used to tell the timer thread to terminate */
+static int timer_terminate;
+
+/*! \brief Structure which contains ioqueue thread information */
+struct ast_rtp_ioqueue_thread {
+ /*! \brief Pool used by the thread */
+ pj_pool_t *pool;
+ /*! \brief The thread handling the queue and timer heap */
+ pj_thread_t *thread;
+ /*! \brief Ioqueue which polls on sockets */
+ pj_ioqueue_t *ioqueue;
+ /*! \brief Timer heap for scheduled items */
+ pj_timer_heap_t *timerheap;
+ /*! \brief Termination request */
+ int terminate;
+ /*! \brief Current number of descriptors being waited on */
+ unsigned int count;
+ /*! \brief Linked list information */
+ AST_LIST_ENTRY(ast_rtp_ioqueue_thread) next;
+};
+
+/*! \brief List of ioqueue threads */
+static AST_LIST_HEAD_STATIC(ioqueues, ast_rtp_ioqueue_thread);
-/*! \brief Notification that the ICE/TURN worker thread should stop */
-static int worker_terminate;
#endif
#define FLAG_3389_WARNING (1 << 0)
#define FLAG_NEED_MARKER_BIT (1 << 3)
#define FLAG_DTMF_COMPENSATE (1 << 4)
-#define TRANSPORT_SOCKET_RTP 1
-#define TRANSPORT_SOCKET_RTCP 2
-#define TRANSPORT_TURN_RTP 3
-#define TRANSPORT_TURN_RTCP 4
+#define TRANSPORT_SOCKET_RTP 0
+#define TRANSPORT_SOCKET_RTCP 1
+#define TRANSPORT_TURN_RTP 2
+#define TRANSPORT_TURN_RTCP 3
/*! \brief RTP learning mode tracking information */
struct rtp_learning_info {
pj_turn_sock *turn_rtcp; /*!< RTCP TURN relay */
pj_turn_state_t turn_state; /*!< Current state of the TURN relay session */
unsigned int passthrough:1; /*!< Bit to indicate that the received packet should be passed through */
+ unsigned int rtp_passthrough:1; /*!< Bit to indicate that TURN RTP should be passed through */
+ unsigned int rtcp_passthrough:1; /*!< Bit to indicate that TURN RTCP should be passed through */
unsigned int ice_port; /*!< Port that ICE was started with if it was previously started */
+ struct ast_sockaddr rtp_loop; /*!< Loopback address for forwarding RTP from TURN */
+ struct ast_sockaddr rtcp_loop; /*!< Loopback address for forwarding RTCP from TURN */
+
+ struct ast_rtp_ioqueue_thread *ioqueue; /*!< The ioqueue thread handling us */
char remote_ufrag[256]; /*!< The remote ICE username */
char remote_passwd[256]; /*!< The remote ICE password */
static int __rtp_sendto(struct ast_rtp_instance *instance, void *buf, size_t size, int flags, struct ast_sockaddr *sa, int rtcp, int *ice, int use_srtp);
+#ifdef HAVE_PJPROJECT
/*! \brief Helper function which updates an ast_sockaddr with the candidate used for the component */
-static void update_address_with_ice_candidate(struct ast_rtp *rtp, int component, struct ast_sockaddr *cand_address)
+static void update_address_with_ice_candidate(struct ast_rtp *rtp, enum ast_rtp_ice_component_type component,
+ struct ast_sockaddr *cand_address)
{
-#ifdef HAVE_PJPROJECT
char address[PJ_INET6_ADDRSTRLEN];
if (!rtp->ice || (component < 1) || !rtp->ice->comp[component - 1].valid_check) {
ast_sockaddr_parse(cand_address, pj_sockaddr_print(&rtp->ice->comp[component - 1].valid_check->rcand->addr, address, sizeof(address), 0), 0);
ast_sockaddr_set_port(cand_address, pj_sockaddr_get_port(&rtp->ice->comp[component - 1].valid_check->rcand->addr));
-#endif
}
-#ifdef HAVE_PJPROJECT
+/*! \brief Helper function which sets up channel binding on a TURN session if applicable */
+static void turn_enable_bind_channel(struct ast_rtp *rtp, pj_turn_sock *turn, int component, int transport)
+{
+ if (!rtp->ice || !turn || (component < 1) || !rtp->ice->comp[component - 1].valid_check ||
+ (rtp->ice->comp[component - 1].valid_check->lcand->transport_id != transport)) {
+ return;
+ }
+
+ pj_turn_sock_bind_channel(turn, &rtp->ice->comp[component - 1].valid_check->rcand->addr,
+ sizeof(rtp->ice->comp[component - 1].valid_check->rcand->addr));
+}
+
/*! \brief Destructor for locally created ICE candidates */
static void ast_rtp_ice_candidate_destroy(void *obj)
{
if (pj_ice_sess_create_check_list(rtp->ice, &ufrag, &passwd, ao2_container_count(rtp->ice_active_remote_candidates), &candidates[0]) == PJ_SUCCESS) {
ast_test_suite_event_notify("ICECHECKLISTCREATE", "Result: SUCCESS");
pj_ice_sess_start_check(rtp->ice);
- pj_timer_heap_poll(timerheap, NULL);
+ pj_timer_heap_poll(timer_heap, NULL);
rtp->strict_rtp_state = STRICT_RTP_OPEN;
return;
}
ao2_ref(candidate, -1);
}
+static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+ struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+ struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ pj_status_t status;
+
+ status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP, pkt, pkt_len, peer_addr,
+ addr_len);
+ if (status != PJ_SUCCESS) {
+ char buf[100];
+
+ pj_strerror(status, buf, sizeof(buf));
+ ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+ (int)status, buf);
+ return;
+ }
+ if (!rtp->rtp_passthrough) {
+ return;
+ }
+ rtp->rtp_passthrough = 0;
+
+ ast_sendto(rtp->s, pkt, pkt_len, 0, &rtp->rtp_loop);
+}
+
+static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+ struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+ struct ast_rtp *rtp;
+
+ /* If this is a leftover from an already notified RTP instance just ignore the state change */
+ if (!instance) {
+ return;
+ }
+
+ rtp = ast_rtp_instance_get_data(instance);
+
+ /* We store the new state so the other thread can actually handle it */
+ ast_mutex_lock(&rtp->lock);
+ rtp->turn_state = new_state;
+ ast_cond_signal(&rtp->cond);
+
+ if (new_state == PJ_TURN_STATE_DESTROYING) {
+ pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
+ rtp->turn_rtp = NULL;
+ }
+
+ ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
+ .on_rx_data = ast_rtp_on_turn_rx_rtp_data,
+ .on_state = ast_rtp_on_turn_rtp_state,
+};
+
+static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
+{
+ struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+ struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ pj_status_t status;
+
+ status = pj_ice_sess_on_rx_pkt(rtp->ice, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP, pkt, pkt_len, peer_addr,
+ addr_len);
+ if (status != PJ_SUCCESS) {
+ char buf[100];
+
+ pj_strerror(status, buf, sizeof(buf));
+ ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
+ (int)status, buf);
+ return;
+ }
+ if (!rtp->rtcp_passthrough) {
+ return;
+ }
+ rtp->rtcp_passthrough = 0;
+
+ ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp_loop);
+}
+
+static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
+{
+ struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
+ struct ast_rtp *rtp = NULL;
+
+ /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
+ if (!instance) {
+ return;
+ }
+
+ rtp = ast_rtp_instance_get_data(instance);
+
+ /* We store the new state so the other thread can actually handle it */
+ ast_mutex_lock(&rtp->lock);
+ rtp->turn_state = new_state;
+ ast_cond_signal(&rtp->cond);
+
+ if (new_state == PJ_TURN_STATE_DESTROYING) {
+ pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
+ rtp->turn_rtcp = NULL;
+ }
+
+ ast_mutex_unlock(&rtp->lock);
+}
+
+/* RTCP TURN Socket interface declaration */
+static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
+ .on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
+ .on_state = ast_rtp_on_turn_rtcp_state,
+};
+
+/*! \brief Worker thread for ioqueue and timerheap */
+static int ioqueue_worker_thread(void *data)
+{
+ struct ast_rtp_ioqueue_thread *ioqueue = data;
+
+ while (!ioqueue->terminate) {
+ const pj_time_val delay = {0, 10};
+
+ pj_ioqueue_poll(ioqueue->ioqueue, &delay);
+
+ pj_timer_heap_poll(ioqueue->timerheap, NULL);
+ }
+
+ return 0;
+}
+
+/*! \brief Destroyer for ioqueue thread */
+static void rtp_ioqueue_thread_destroy(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+ if (ioqueue->thread) {
+ ioqueue->terminate = 1;
+ pj_thread_join(ioqueue->thread);
+ pj_thread_destroy(ioqueue->thread);
+ }
+
+ pj_pool_release(ioqueue->pool);
+ ast_free(ioqueue);
+}
+
+/*! \brief Removal function for ioqueue thread, determines if it should be terminated and destroyed */
+static void rtp_ioqueue_thread_remove(struct ast_rtp_ioqueue_thread *ioqueue)
+{
+ int destroy = 0;
+
+ /* If nothing is using this ioqueue thread destroy it */
+ AST_LIST_LOCK(&ioqueues);
+ if ((ioqueue->count - 2) == 0) {
+ destroy = 1;
+ AST_LIST_REMOVE(&ioqueues, ioqueue, next);
+ }
+ AST_LIST_UNLOCK(&ioqueues);
+
+ if (!destroy) {
+ return;
+ }
+
+ rtp_ioqueue_thread_destroy(ioqueue);
+}
+
+/*! \brief Finder and allocator for an ioqueue thread */
+static struct ast_rtp_ioqueue_thread *rtp_ioqueue_thread_get_or_create(void)
+{
+ struct ast_rtp_ioqueue_thread *ioqueue;
+ pj_lock_t *lock;
+
+ AST_LIST_LOCK(&ioqueues);
+
+ /* See if an ioqueue thread exists that can handle more */
+ AST_LIST_TRAVERSE(&ioqueues, ioqueue, next) {
+ if ((ioqueue->count + 2) < PJ_IOQUEUE_MAX_HANDLES) {
+ break;
+ }
+ }
+
+ /* If we found one bump it up and return it */
+ if (ioqueue) {
+ ioqueue->count += 2;
+ goto end;
+ }
+
+ ioqueue = ast_calloc(1, sizeof(*ioqueue));
+ if (!ioqueue) {
+ goto end;
+ }
+
+ ioqueue->pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
+
+ /* We use a timer on the ioqueue thread for TURN so that two threads aren't operating
+ * on a session at the same time
+ */
+ if (pj_timer_heap_create(ioqueue->pool, 4, &ioqueue->timerheap) != PJ_SUCCESS) {
+ goto fatal;
+ }
+
+ if (pj_lock_create_recursive_mutex(ioqueue->pool, "rtp%p", &lock) != PJ_SUCCESS) {
+ goto fatal;
+ }
+
+ pj_timer_heap_set_lock(ioqueue->timerheap, lock, PJ_TRUE);
+
+ if (pj_ioqueue_create(ioqueue->pool, 16, &ioqueue->ioqueue) != PJ_SUCCESS) {
+ goto fatal;
+ }
+
+ if (pj_thread_create(ioqueue->pool, "ice", &ioqueue_worker_thread, ioqueue, 0, 0, &ioqueue->thread) != PJ_SUCCESS) {
+ goto fatal;
+ }
+
+ AST_LIST_INSERT_HEAD(&ioqueues, ioqueue, next);
+
+ /* Since this is being returned to an active session the count always starts at 2 */
+ ioqueue->count = 2;
+
+ goto end;
+
+fatal:
+ rtp_ioqueue_thread_destroy(ioqueue);
+ ioqueue = NULL;
+
+end:
+ AST_LIST_UNLOCK(&ioqueues);
+ return ioqueue;
+}
+
+static void ast_rtp_ice_turn_request(struct ast_rtp_instance *instance, enum ast_rtp_ice_component_type component,
+ enum ast_transport transport, const char *server, unsigned int port, const char *username, const char *password)
+{
+ struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ pj_turn_sock **turn_sock;
+ const pj_turn_sock_cb *turn_cb;
+ pj_turn_tp_type conn_type;
+ int conn_transport;
+ pj_stun_auth_cred cred = { 0, };
+ pj_str_t turn_addr;
+ struct ast_sockaddr addr = { { 0, } };
+ pj_stun_config stun_config;
+ struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+ struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
+ pj_turn_session_info info;
+ struct ast_sockaddr local, loop;
+
+ ast_rtp_instance_get_local_address(instance, &local);
+ if (ast_sockaddr_is_ipv4(&local)) {
+ ast_sockaddr_parse(&loop, "127.0.0.1", PARSE_PORT_FORBID);
+ } else {
+ ast_sockaddr_parse(&loop, "::1", PARSE_PORT_FORBID);
+ }
+
+ /* Determine what component we are requesting a TURN session for */
+ if (component == AST_RTP_ICE_COMPONENT_RTP) {
+ turn_sock = &rtp->turn_rtp;
+ turn_cb = &ast_rtp_turn_rtp_sock_cb;
+ conn_transport = TRANSPORT_TURN_RTP;
+ ast_sockaddr_set_port(&loop, ast_sockaddr_port(&local));
+ } else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+ turn_sock = &rtp->turn_rtcp;
+ turn_cb = &ast_rtp_turn_rtcp_sock_cb;
+ conn_transport = TRANSPORT_TURN_RTCP;
+ ast_sockaddr_set_port(&loop, ast_sockaddr_port(&rtp->rtcp->us));
+ } else {
+ return;
+ }
+
+ if (transport == AST_TRANSPORT_UDP) {
+ conn_type = PJ_TURN_TP_UDP;
+ } else if (transport == AST_TRANSPORT_TCP) {
+ conn_type = PJ_TURN_TP_TCP;
+ } else {
+ ast_assert(0);
+ return;
+ }
+
+ ast_sockaddr_parse(&addr, server, PARSE_PORT_FORBID);
+
+ ast_mutex_lock(&rtp->lock);
+ if (*turn_sock) {
+ pj_turn_sock_destroy(*turn_sock);
+ rtp->turn_state = PJ_TURN_STATE_NULL;
+ while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+ ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ }
+ }
+ ast_mutex_unlock(&rtp->lock);
+
+ if (component == AST_RTP_ICE_COMPONENT_RTP && !rtp->ioqueue) {
+ rtp->ioqueue = rtp_ioqueue_thread_get_or_create();
+ if (!rtp->ioqueue) {
+ return;
+ }
+ }
+
+ pj_stun_config_init(&stun_config, &cachingpool.factory, 0, rtp->ioqueue->ioqueue, rtp->ioqueue->timerheap);
+
+ if (pj_turn_sock_create(&stun_config, ast_sockaddr_is_ipv4(&addr) ? pj_AF_INET() : pj_AF_INET6(), conn_type,
+ turn_cb, NULL, instance, turn_sock) != PJ_SUCCESS) {
+ ast_log(LOG_WARNING, "Could not create a TURN client socket\n");
+ return;
+ }
+
+ cred.type = PJ_STUN_AUTH_CRED_STATIC;
+ pj_strset2(&cred.data.static_cred.username, (char*)username);
+ cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
+ pj_strset2(&cred.data.static_cred.data, (char*)password);
+
+ /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
+ ast_mutex_lock(&rtp->lock);
+ pj_turn_sock_alloc(*turn_sock, pj_cstr(&turn_addr, server), port, NULL, &cred, NULL);
+ while (rtp->turn_state < PJ_TURN_STATE_READY) {
+ ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ }
+ ast_mutex_unlock(&rtp->lock);
+
+ /* If a TURN session was allocated add it as a candidate */
+ if (rtp->turn_state != PJ_TURN_STATE_READY) {
+ return;
+ }
+
+ pj_turn_sock_get_info(*turn_sock, &info);
+
+ ast_rtp_ice_add_cand(rtp, component, conn_transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr,
+ &info.relay_addr, NULL, pj_sockaddr_get_len(&info.relay_addr));
+
+ if (component == AST_RTP_ICE_COMPONENT_RTP) {
+ ast_sockaddr_copy(&rtp->rtp_loop, &loop);
+ } else if (component == AST_RTP_ICE_COMPONENT_RTCP) {
+ ast_sockaddr_copy(&rtp->rtcp_loop, &loop);
+ }
+}
+
static char *generate_random_string(char *buf, size_t size)
{
long val[4];
.get_local_candidates = ast_rtp_ice_get_local_candidates,
.ice_lite = ast_rtp_ice_lite,
.set_role = ast_rtp_ice_set_role,
+ .turn_request = ast_rtp_ice_turn_request,
};
#endif
{
struct ast_rtp_instance *instance = ice->user_data;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+
+ if (status == PJ_SUCCESS) {
+ struct ast_sockaddr remote_address;
+
+ /* Symmetric RTP must be disabled for the remote address to not get overwritten */
+ ast_rtp_instance_set_prop(instance, AST_RTP_PROPERTY_NAT, 0);
+
+ update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
+ ast_rtp_instance_set_remote_address(instance, &remote_address);
+ turn_enable_bind_channel(rtp, rtp->turn_rtp, AST_RTP_ICE_COMPONENT_RTP, TRANSPORT_TURN_RTP);
+
+ if (rtp->rtcp) {
+ update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &rtp->rtcp->them);
+ turn_enable_bind_channel(rtp, rtp->turn_rtcp, AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_TURN_RTCP);
+ }
+ }
#ifdef HAVE_OPENSSL_SRTP
dtls_perform_handshake(instance, &rtp->dtls, 0);
/* Instead of handling the packet here (which really doesn't work with our architecture) we set a bit to indicate that it should be handled after pj_ice_sess_on_rx_pkt
* returns */
- rtp->passthrough = 1;
+ if (transport_id == TRANSPORT_SOCKET_RTP || transport_id == TRANSPORT_SOCKET_RTCP) {
+ rtp->passthrough = 1;
+ } else if (transport_id == TRANSPORT_TURN_RTP) {
+ rtp->rtp_passthrough = 1;
+ } else if (transport_id == TRANSPORT_TURN_RTCP) {
+ rtp->rtcp_passthrough = 1;
+ }
}
static pj_status_t ast_rtp_on_ice_tx_pkt(pj_ice_sess *ice, unsigned comp_id, unsigned transport_id, const void *pkt, pj_size_t size, const pj_sockaddr_t *dst_addr, unsigned dst_addr_len)
.on_tx_pkt = ast_rtp_on_ice_tx_pkt,
};
-static void ast_rtp_on_turn_rx_rtp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
- struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
- struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
- struct ast_sockaddr dest = { { 0, }, };
-
- ast_rtp_instance_get_local_address(instance, &dest);
-
- ast_sendto(rtp->s, pkt, pkt_len, 0, &dest);
-}
-
-static void ast_rtp_on_turn_rtp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
- struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
- struct ast_rtp *rtp = NULL;
-
- /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
- if (!instance) {
- return;
- }
-
- rtp = ast_rtp_instance_get_data(instance);
-
- /* If the TURN session is being destroyed we need to remove it from the RTP instance */
- if (new_state == PJ_TURN_STATE_DESTROYING) {
- rtp->turn_rtp = NULL;
- return;
- }
-
- /* We store the new state so the other thread can actually handle it */
- ast_mutex_lock(&rtp->lock);
- rtp->turn_state = new_state;
-
- /* If this is a state that the main thread should be notified about do so */
- if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
- ast_cond_signal(&rtp->cond);
- }
-
- ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtp_sock_cb = {
- .on_rx_data = ast_rtp_on_turn_rx_rtp_data,
- .on_state = ast_rtp_on_turn_rtp_state,
-};
-
-static void ast_rtp_on_turn_rx_rtcp_data(pj_turn_sock *turn_sock, void *pkt, unsigned pkt_len, const pj_sockaddr_t *peer_addr, unsigned addr_len)
-{
- struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
- struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
-
- ast_sendto(rtp->rtcp->s, pkt, pkt_len, 0, &rtp->rtcp->us);
-}
-
-static void ast_rtp_on_turn_rtcp_state(pj_turn_sock *turn_sock, pj_turn_state_t old_state, pj_turn_state_t new_state)
-{
- struct ast_rtp_instance *instance = pj_turn_sock_get_user_data(turn_sock);
- struct ast_rtp *rtp = NULL;
-
- /* If this is a leftover from an already destroyed RTP instance just ignore the state change */
- if (!instance) {
- return;
- }
-
- rtp = ast_rtp_instance_get_data(instance);
-
- /* If the TURN session is being destroyed we need to remove it from the RTP instance */
- if (new_state == PJ_TURN_STATE_DESTROYING) {
- rtp->turn_rtcp = NULL;
- return;
- }
-
- /* We store the new state so the other thread can actually handle it */
- ast_mutex_lock(&rtp->lock);
- rtp->turn_state = new_state;
-
- /* If this is a state that the main thread should be notified about do so */
- if (new_state == PJ_TURN_STATE_READY || new_state == PJ_TURN_STATE_DEALLOCATING || new_state == PJ_TURN_STATE_DEALLOCATED) {
- ast_cond_signal(&rtp->cond);
- }
-
- ast_mutex_unlock(&rtp->lock);
-}
-
-/* RTCP TURN Socket interface declaration */
-static pj_turn_sock_cb ast_rtp_turn_rtcp_sock_cb = {
- .on_rx_data = ast_rtp_on_turn_rx_rtcp_data,
- .on_state = ast_rtp_on_turn_rtcp_state,
-};
-
-/*! \brief Worker thread for I/O queue and timerheap */
-static int ice_worker_thread(void *data)
+/*! \brief Worker thread for timerheap */
+static int timer_worker_thread(void *data)
{
- while (!worker_terminate) {
- const pj_time_val delay = {0, 10};
-
- pj_ioqueue_poll(ioqueue, &delay);
-
- pj_timer_heap_poll(timerheap, NULL);
+ while (!timer_terminate) {
+ pj_timer_heap_poll(timer_heap, NULL);
}
return 0;
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
struct ast_srtp *srtp = ast_rtp_instance_get_srtp(instance);
char *in = buf;
+#ifdef HAVE_PJPROJECT
+ struct ast_sockaddr *loop = rtcp ? &rtp->rtcp_loop : &rtp->rtp_loop;
+#endif
if ((len = ast_recvfrom(rtcp ? rtp->rtcp->s : rtp->s, buf, size, flags, sa)) < 0) {
return len;
#endif
#ifdef HAVE_PJPROJECT
- if (rtp->ice) {
+ if (!ast_sockaddr_isnull(loop) && !ast_sockaddr_cmp(loop, sa)) {
+ /* ICE traffic will have been handled in the TURN callback, so skip it but update the address
+ * so it reflects the actual source and not the loopback
+ */
+ if (rtcp) {
+ ast_sockaddr_copy(sa, &rtp->rtcp->them);
+ } else {
+ ast_rtp_instance_get_remote_address(instance, sa);
+ }
+ } else if (rtp->ice) {
pj_str_t combined = pj_str(ast_sockaddr_stringify(sa));
pj_sockaddr address;
pj_status_t status;
pj_strerror(status, buf, sizeof(buf));
ast_log(LOG_WARNING, "PJ ICE Rx error status code: %d '%s'.\n",
- (int) status, buf);
+ (int)status, buf);
return -1;
}
if (!rtp->passthrough) {
#ifdef HAVE_PJPROJECT
static void rtp_add_candidates_to_ice(struct ast_rtp_instance *instance, struct ast_rtp *rtp, struct ast_sockaddr *addr, int port, int component,
- int transport, const pj_turn_sock_cb *turn_cb, pj_turn_sock **turn_sock)
+ int transport)
{
pj_sockaddr address[16];
unsigned int count = PJ_ARRAY_SIZE(address), pos = 0;
}
/* If configured to use a TURN relay create a session and allocate */
- if (pj_strlen(&turnaddr) && pj_turn_sock_create(&rtp->ice->stun_cfg, ast_sockaddr_is_ipv4(addr) ? pj_AF_INET() : pj_AF_INET6(), PJ_TURN_TP_TCP,
- turn_cb, NULL, instance, turn_sock) == PJ_SUCCESS) {
- pj_stun_auth_cred cred = { 0, };
- struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_ALLOCATION_WAIT_TIME, 1000));
- struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
-
- cred.type = PJ_STUN_AUTH_CRED_STATIC;
- cred.data.static_cred.username = turnusername;
- cred.data.static_cred.data_type = PJ_STUN_PASSWD_PLAIN;
- cred.data.static_cred.data = turnpassword;
-
- /* Because the TURN socket is asynchronous but we are synchronous we need to wait until it is done */
- ast_mutex_lock(&rtp->lock);
- pj_turn_sock_alloc(*turn_sock, &turnaddr, turnport, NULL, &cred, NULL);
- ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
- ast_mutex_unlock(&rtp->lock);
-
- /* If a TURN session was allocated add it as a candidate */
- if (rtp->turn_state == PJ_TURN_STATE_READY) {
- pj_turn_session_info info;
-
- pj_turn_sock_get_info(*turn_sock, &info);
-
- if (transport == TRANSPORT_SOCKET_RTP) {
- transport = TRANSPORT_TURN_RTP;
- } else if (transport == TRANSPORT_SOCKET_RTCP) {
- transport = TRANSPORT_TURN_RTCP;
- }
-
- ast_rtp_ice_add_cand(rtp, component, transport, PJ_ICE_CAND_TYPE_RELAYED, 65535, &info.relay_addr, &info.relay_addr,
- NULL, pj_sockaddr_get_len(&info.relay_addr));
- }
+ if (pj_strlen(&turnaddr)) {
+ ast_rtp_ice_turn_request(instance, component, AST_TRANSPORT_TCP, pj_strbuf(&turnaddr), turnport,
+ pj_strbuf(&turnusername), pj_strbuf(&turnpassword));
}
}
#endif
pj_thread_register_check();
- pj_stun_config_init(&stun_config, &cachingpool.factory, 0, ioqueue, timerheap);
+ pj_stun_config_init(&stun_config, &cachingpool.factory, 0, NULL, timer_heap);
ufrag = pj_str(rtp->local_ufrag);
passwd = pj_str(rtp->local_passwd);
/* Add all of the available candidates to the ICE session */
rtp_add_candidates_to_ice(instance, rtp, addr, port, AST_RTP_ICE_COMPONENT_RTP,
- TRANSPORT_SOCKET_RTP, &ast_rtp_turn_rtp_sock_cb, &rtp->turn_rtp);
+ TRANSPORT_SOCKET_RTP);
/* Only add the RTCP candidates to ICE when replacing the session. New sessions
* handle this in a separate part of the setup phase */
if (replace && rtp->rtcp) {
rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us,
ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP,
- TRANSPORT_SOCKET_RTCP, &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+ TRANSPORT_SOCKET_RTCP);
}
return 0;
static int ast_rtp_destroy(struct ast_rtp_instance *instance)
{
struct ast_rtp *rtp = ast_rtp_instance_get_data(instance);
+ struct timeval wait = ast_tvadd(ast_tvnow(), ast_samp2tv(TURN_STATE_WAIT_TIME, 1000));
+ struct timespec ts = { .tv_sec = wait.tv_sec, .tv_nsec = wait.tv_usec * 1000, };
/* Destroy the smoother that was smoothing out audio if present */
if (rtp->smoother) {
#ifdef HAVE_PJPROJECT
pj_thread_register_check();
- /* Destroy the ICE session if being used */
- if (rtp->ice) {
- pj_ice_sess_destroy(rtp->ice);
- }
-
/* Destroy the RTP TURN relay if being used */
+ ast_mutex_lock(&rtp->lock);
if (rtp->turn_rtp) {
- pj_turn_sock_set_user_data(rtp->turn_rtp, NULL);
pj_turn_sock_destroy(rtp->turn_rtp);
+ rtp->turn_state = PJ_TURN_STATE_NULL;
+ while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+ ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ }
}
/* Destroy the RTCP TURN relay if being used */
if (rtp->turn_rtcp) {
- pj_turn_sock_set_user_data(rtp->turn_rtcp, NULL);
pj_turn_sock_destroy(rtp->turn_rtcp);
+ rtp->turn_state = PJ_TURN_STATE_NULL;
+ while (rtp->turn_state != PJ_TURN_STATE_DESTROYING) {
+ ast_cond_timedwait(&rtp->cond, &rtp->lock, &ts);
+ }
+ }
+ ast_mutex_unlock(&rtp->lock);
+
+ if (rtp->ioqueue) {
+ rtp_ioqueue_thread_remove(rtp->ioqueue);
+ }
+
+ /* Destroy the ICE session if being used */
+ if (rtp->ice) {
+ pj_ice_sess_destroy(rtp->ice);
}
/* Destroy any candidates */
ast_sockaddr_stringify(&remote_address),
strerror(errno));
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
strerror(errno));
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
strerror(errno));
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent RTP DTMF packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
rtp->rtcp->rr_count++;
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTCP, &remote_address);
-
if (rtcp_debug_test_addr(&rtp->rtcp->them)) {
ast_verbose("* Sent RTCP %s to %s%s\n", sr ? "SR" : "RR",
ast_sockaddr_stringify(&remote_address), ice ? " (via ICE)" : "");
}
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
return 0;
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent RTP P2P packet to %s%s (type %-2.2d, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
#ifdef HAVE_PJPROJECT
if (rtp->ice) {
- rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP,
- &ast_rtp_turn_rtcp_sock_cb, &rtp->turn_rtcp);
+ rtp_add_candidates_to_ice(instance, rtp, &rtp->rtcp->us, ast_sockaddr_port(&rtp->rtcp->us), AST_RTP_ICE_COMPONENT_RTCP, TRANSPORT_SOCKET_RTCP);
}
#endif
return res;
}
- update_address_with_ice_candidate(rtp, AST_RTP_ICE_COMPONENT_RTP, &remote_address);
-
if (rtp_debug_test_addr(&remote_address)) {
ast_verbose("Sent Comfort Noise RTP packet to %s%s (type %-2.2d, seq %-6.6d, ts %-6.6u, len %-6.6d)\n",
ast_sockaddr_stringify(&remote_address),
if (ast_parse_arg(s, PARSE_INADDR, &addr)) {
ast_log(LOG_WARNING, "Invalid TURN server address: %s\n", s);
} else {
- pj_strdup2(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
+ pj_strdup2_with_null(pool, &turnaddr, ast_inet_ntoa(addr.sin_addr));
/* ntohs() is not a bug here. The port number is used in host byte order with
* a pjnat API. */
turnport = ntohs(addr.sin_port);
}
}
if ((s = ast_variable_retrieve(cfg, "general", "turnusername"))) {
- pj_strdup2(pool, &turnusername, s);
+ pj_strdup2_with_null(pool, &turnusername, s);
}
if ((s = ast_variable_retrieve(cfg, "general", "turnpassword"))) {
- pj_strdup2(pool, &turnpassword, s);
+ pj_strdup2_with_null(pool, &turnpassword, s);
}
#endif
ast_config_destroy(cfg);
return 0;
}
+#ifdef HAVE_PJPROJECT
+static void rtp_terminate_pjproject(void)
+{
+ if (timer_thread) {
+ timer_terminate = 1;
+ pj_thread_join(timer_thread);
+ pj_thread_destroy(timer_thread);
+ }
+
+ pj_caching_pool_destroy(&cachingpool);
+ pj_shutdown();
+}
+#endif
+
static int load_module(void)
{
#ifdef HAVE_PJPROJECT
}
if (pjlib_util_init() != PJ_SUCCESS) {
- pj_shutdown();
+ rtp_terminate_pjproject();
return AST_MODULE_LOAD_DECLINE;
}
if (pjnath_init() != PJ_SUCCESS) {
- pj_shutdown();
+ rtp_terminate_pjproject();
return AST_MODULE_LOAD_DECLINE;
}
pj_caching_pool_init(&cachingpool, &pj_pool_factory_default_policy, 0);
- pool = pj_pool_create(&cachingpool.factory, "rtp", 512, 512, NULL);
+ pool = pj_pool_create(&cachingpool.factory, "timer", 512, 512, NULL);
- if (pj_timer_heap_create(pool, 100, &timerheap) != PJ_SUCCESS) {
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ if (pj_timer_heap_create(pool, 100, &timer_heap) != PJ_SUCCESS) {
+ rtp_terminate_pjproject();
return AST_MODULE_LOAD_DECLINE;
}
if (pj_lock_create_recursive_mutex(pool, "rtp%p", &lock) != PJ_SUCCESS) {
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ rtp_terminate_pjproject();
return AST_MODULE_LOAD_DECLINE;
}
- pj_timer_heap_set_lock(timerheap, lock, PJ_TRUE);
+ pj_timer_heap_set_lock(timer_heap, lock, PJ_TRUE);
- if (pj_ioqueue_create(pool, 16, &ioqueue) != PJ_SUCCESS) {
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ if (pj_thread_create(pool, "timer", &timer_worker_thread, NULL, 0, 0, &timer_thread) != PJ_SUCCESS) {
+ rtp_terminate_pjproject();
return AST_MODULE_LOAD_DECLINE;
}
- if (pj_thread_create(pool, "ice", &ice_worker_thread, NULL, 0, 0, &thread) != PJ_SUCCESS) {
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
- return AST_MODULE_LOAD_DECLINE;
- }
#endif
if (ast_rtp_engine_register(&asterisk_rtp_engine)) {
#ifdef HAVE_PJPROJECT
- worker_terminate = 1;
- pj_thread_join(thread);
- pj_thread_destroy(thread);
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ rtp_terminate_pjproject();
#endif
return AST_MODULE_LOAD_DECLINE;
}
if (ast_cli_register_multiple(cli_rtp, ARRAY_LEN(cli_rtp))) {
#ifdef HAVE_PJPROJECT
- worker_terminate = 1;
- pj_thread_join(thread);
- pj_thread_destroy(thread);
ast_rtp_engine_unregister(&asterisk_rtp_engine);
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ rtp_terminate_pjproject();
#endif
return AST_MODULE_LOAD_DECLINE;
}
ast_cli_unregister_multiple(cli_rtp, ARRAY_LEN(cli_rtp));
#ifdef HAVE_PJPROJECT
- worker_terminate = 1;
-
pj_thread_register_check();
-
- pj_thread_join(thread);
- pj_thread_destroy(thread);
-
- pj_caching_pool_destroy(&cachingpool);
- pj_shutdown();
+ rtp_terminate_pjproject();
#endif
return 0;