SIP TCP/TLS: move client connection setup/write into tcp helper thread, various relat...
[asterisk/asterisk.git] / channels / chan_sip.c
index e15b5fb..50756fa 100644 (file)
@@ -2111,13 +2111,26 @@ struct sip_registry {
        char lastmsg[256];              /*!< Last Message sent/received */
 };
 
+enum sip_tcptls_alert {
+       /*! \brief There is new data to be sent out */
+       TCPTLS_ALERT_DATA,
+       /*! \brief A request to stop the tcp_handler thread */
+       TCPTLS_ALERT_STOP,
+};
+
+struct tcptls_packet {
+       AST_LIST_ENTRY(tcptls_packet) entry;
+       struct ast_str *data;
+       size_t len;
+};
 /*! \brief Definition of a thread that handles a socket */
 struct sip_threadinfo {
        int stop;
+       int alert_pipe[2]; /*! Used to alert tcptls thread when packet is ready to be written */
        pthread_t threadid;
        struct ast_tcptls_session_instance *tcptls_session;
        enum sip_transport type;        /*!< We keep a copy of the type here so we can display it in the connection list */
-       AST_LIST_ENTRY(sip_threadinfo) list;
+       AST_LIST_HEAD_NOLOCK(, tcptls_packet) packet_q;
 };
 
 /*! \brief Definition of an MWI subscription to another server */
@@ -2151,8 +2164,8 @@ static int hash_dialog_size = 563;
 static int hash_user_size = 563;
 #endif
 
-/*! \brief  The thread list of TCP threads */
-static AST_LIST_HEAD_STATIC(threadl, sip_threadinfo);
+/*! \brief  The table of TCP threads */
+static struct ao2_container *threadt;
 
 /*! \brief  The peer list: Users, Peers and Friends */
 static struct ao2_container *peers;
@@ -2244,6 +2257,21 @@ static int peer_ipcmp_cb(void *obj, void *arg, int flags)
        return peer->addr.sin_port == peer2->addr.sin_port ? (CMP_MATCH | CMP_STOP) : 0;
 }
 
+
+static int threadt_hash_cb(const void *obj, const int flags)
+{
+       const struct sip_threadinfo *th = obj;
+
+       return (int) th->tcptls_session->remote_address.sin_addr.s_addr;
+}
+
+static int threadt_cmp_cb(void *obj, void *arg, int flags)
+{
+       struct sip_threadinfo *th = obj, *th2 = arg;
+
+       return (th->tcptls_session == th2->tcptls_session) ? CMP_MATCH | CMP_STOP : 0;
+}
+
 /*!
  * \note The only member of the dialog used here callid string
  */
@@ -2890,6 +2918,130 @@ static struct ast_variable *copy_vars(struct ast_variable *src)
        return res;
 }
 
+static void tcptls_packet_destructor(void *obj)
+{
+       struct tcptls_packet *packet = obj;
+
+       ast_free(packet->data);
+}
+
+static void sip_tcptls_client_args_destructor(void *obj)
+{
+       struct ast_tcptls_session_args *args = obj;
+       if (args->tls_cfg) {
+               ast_free(args->tls_cfg->certfile);
+               ast_free(args->tls_cfg->pvtfile);
+               ast_free(args->tls_cfg->cipher);
+               ast_free(args->tls_cfg->cafile);
+               ast_free(args->tls_cfg->capath);
+       }
+       ast_free(args->tls_cfg);
+       ast_free((char *) args->name);
+}
+
+static void sip_threadinfo_destructor(void *obj)
+{
+       struct sip_threadinfo *th = obj;
+       struct tcptls_packet *packet;
+       if (th->alert_pipe[1] > -1) {
+               close(th->alert_pipe[0]);
+       }
+       if (th->alert_pipe[1] > -1) {
+               close(th->alert_pipe[1]);
+       }
+       th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+       while ((packet = AST_LIST_REMOVE_HEAD(&th->packet_q, entry))) {
+               ao2_t_ref(packet, -1, "thread destruction, removing packet from frame queue");
+       }
+
+       if (th->tcptls_session) {
+               ao2_t_ref(th->tcptls_session, -1, "remove tcptls_session for sip_threadinfo object");
+       }
+}
+
+/*! \brief creates a sip_threadinfo object and links it into the threadt table. */
+static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session, int transport)
+{
+       struct sip_threadinfo *th;
+
+       if (!tcptls_session || !(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) {
+               return NULL;
+       }
+
+       th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+       if (pipe(th->alert_pipe) == -1) {
+               ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
+               ast_log(LOG_ERROR, "Could not create sip alert pipe in tcptls thread, error %s\n", strerror(errno));
+               return NULL;
+       }
+       ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object");
+       th->tcptls_session = tcptls_session;
+       th->type = transport ? transport : (tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP);
+       ao2_t_link(threadt, th, "Adding new tcptls helper thread");
+       ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains");
+       return th;
+}
+
+/*! \brief used to indicate to a tcptls thread that data is ready to be written */
+static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len)
+{
+       int res = len;
+       struct sip_threadinfo *th = NULL;
+       struct tcptls_packet *packet = NULL;
+       struct sip_threadinfo tmp = {
+               .tcptls_session = tcptls_session,
+       };
+       enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
+
+       if (!tcptls_session) {
+               return XMIT_ERROR;
+       }
+
+       ast_mutex_lock(&tcptls_session->lock);
+
+       if ((tcptls_session->fd == -1) ||
+               !(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) ||
+               !(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) ||
+               !(packet->data = ast_str_create(len))) {
+               goto tcptls_write_setup_error;
+       }
+
+       /* goto tcptls_write_error should _NOT_ be used beyond this point */
+       ast_str_set(&packet->data, 0, "%s", (char *) buf);
+       packet->len = len;
+
+       /* alert tcptls thread handler that there is a packet to be sent.
+        * must lock the thread info object to guarantee control of the
+        * packet queue */
+       ao2_lock(th);
+       if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
+               ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
+               ao2_t_ref(packet, -1, "could not write to alert pipe, remove packet");
+               packet = NULL;
+               res = XMIT_ERROR;
+       } else { /* it is safe to queue the frame after issuing the alert when we hold the threadinfo lock */
+               AST_LIST_INSERT_TAIL(&th->packet_q, packet, entry);
+       }
+       ao2_unlock(th);
+
+       ast_mutex_unlock(&tcptls_session->lock);
+       ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
+       return res;
+
+tcptls_write_setup_error:
+       if (th) {
+               ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
+       }
+       if (packet) {
+               ao2_t_ref(packet, -1, "could not allocate packet's data");
+       }
+       ast_mutex_unlock(&tcptls_session->lock);
+
+       return XMIT_ERROR;
+}
+
 /*! \brief SIP TCP connection handler */
 static void *sip_tcp_worker_fn(void *data)
 {
@@ -2905,26 +3057,45 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi
 {
        int res, cl;
        struct sip_request req = { 0, } , reqcpy = { 0, };
-       struct sip_threadinfo *me;
+       struct sip_threadinfo *me = NULL;
        char buf[1024] = "";
+       struct pollfd fds[2] = { { 0 }, { 0 }, };
+       struct ast_tcptls_session_args *ca = NULL;
+
+       /* If this is a server session, then the connection has already been setup,
+        * simply create the threadinfo object so we can access this thread for writing.
+        * 
+        * if this is a client connection more work must be done.
+        * 1. We own the parent session args for a client connection.  This pointer needs
+        *    to be held on to so we can decrement it's ref count on thread destruction.
+        * 2. The threadinfo object was created before this thread was launched, however
+        *    it must be found within the threadt table.
+        * 3. Last, the tcptls_session must be started.
+        */
+       if (!tcptls_session->client) {
+               if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? SIP_TRANSPORT_TLS : SIP_TRANSPORT_TCP))) {
+                       goto cleanup;
+               }
+               ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread");
+       } else {
+               struct sip_threadinfo tmp = {
+                       .tcptls_session = tcptls_session,
+               };
 
-       me = ast_calloc(1, sizeof(*me));
-
-       if (!me)
-               goto cleanup2;
+               if ((!(ca = tcptls_session->parent)) ||
+                       (!(me = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) ||
+                       (!(tcptls_session = ast_tcptls_client_start(tcptls_session)))) {
+                       goto cleanup;
+               }
+       }
 
        me->threadid = pthread_self();
-       me->tcptls_session = tcptls_session;
-       if (tcptls_session->ssl)
-               me->type = SIP_TRANSPORT_TLS;
-       else
-               me->type = SIP_TRANSPORT_TCP;
-
        ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_INSERT_TAIL(&threadl, me, list);
-       AST_LIST_UNLOCK(&threadl);
+       /* set up pollfd to watch for reads on both the socket and the alert_pipe */
+       fds[0].fd = tcptls_session->fd;
+       fds[1].fd = me->alert_pipe[0];
+       fds[0].events = fds[1].events = POLLIN | POLLPRI;
 
        if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
                goto cleanup;
@@ -2934,81 +3105,120 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi
        for (;;) {
                struct ast_str *str_save;
 
-               str_save = req.data;
-               memset(&req, 0, sizeof(req));
-               req.data = str_save;
-               ast_str_reset(req.data);
-
-               str_save = reqcpy.data;
-               memset(&reqcpy, 0, sizeof(reqcpy));
-               reqcpy.data = str_save;
-               ast_str_reset(reqcpy.data);
-
-               memset(buf, 0, sizeof(buf));
-
-               if (tcptls_session->ssl) {
-                       set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
-                       req.socket.port = htons(ourport_tls);
-               } else {
-                       set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
-                       req.socket.port = htons(ourport_tcp);
-               }
-               req.socket.fd = tcptls_session->fd;
-               res = ast_wait_for_input(tcptls_session->fd, -1);
+               res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */
                if (res < 0) {
                        ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res);
                        goto cleanup;
                }
 
-               /* Read in headers one line at a time */
-               while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
-                       ast_mutex_lock(&tcptls_session->lock);
-                       if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
-                               ast_mutex_unlock(&tcptls_session->lock);
-                               goto cleanup;
+               /* handle the socket event, check for both reads from the socket fd,
+                * and writes from alert_pipe fd */
+               if (fds[0].revents) { /* there is data on the socket to be read */
+
+                       fds[0].revents = 0;
+
+                       /* clear request structure */
+                       str_save = req.data;
+                       memset(&req, 0, sizeof(req));
+                       req.data = str_save;
+                       ast_str_reset(req.data);
+
+                       str_save = reqcpy.data;
+                       memset(&reqcpy, 0, sizeof(reqcpy));
+                       reqcpy.data = str_save;
+                       ast_str_reset(reqcpy.data);
+
+                       memset(buf, 0, sizeof(buf));
+
+                       if (tcptls_session->ssl) {
+                               set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
+                               req.socket.port = htons(ourport_tls);
+                       } else {
+                               set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
+                               req.socket.port = htons(ourport_tcp);
                        }
-                       ast_mutex_unlock(&tcptls_session->lock);
-                       if (me->stop)
-                                goto cleanup;
-                       ast_str_append(&req.data, 0, "%s", buf);
-                       req.len = req.data->used;
-               }
-               copy_request(&reqcpy, &req);
-               parse_request(&reqcpy);
-               /* In order to know how much to read, we need the content-length header */
-               if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
-                       while (cl > 0) {
-                               size_t bytes_read;
+                       req.socket.fd = tcptls_session->fd;
+
+                       /* Read in headers one line at a time */
+                       while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
                                ast_mutex_lock(&tcptls_session->lock);
-                               if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+                               if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
                                        ast_mutex_unlock(&tcptls_session->lock);
                                        goto cleanup;
                                }
-                               buf[bytes_read] = '\0';
                                ast_mutex_unlock(&tcptls_session->lock);
                                if (me->stop)
-                                       goto cleanup;
-                               cl -= strlen(buf);
+                                        goto cleanup;
                                ast_str_append(&req.data, 0, "%s", buf);
                                req.len = req.data->used;
                        }
+                       copy_request(&reqcpy, &req);
+                       parse_request(&reqcpy);
+                       /* In order to know how much to read, we need the content-length header */
+                       if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
+                               while (cl > 0) {
+                                       size_t bytes_read;
+                                       ast_mutex_lock(&tcptls_session->lock);
+                                       if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+                                               ast_mutex_unlock(&tcptls_session->lock);
+                                               goto cleanup;
+                                       }
+                                       buf[bytes_read] = '\0';
+                                       ast_mutex_unlock(&tcptls_session->lock);
+                                       if (me->stop)
+                                               goto cleanup;
+                                       cl -= strlen(buf);
+                                       ast_str_append(&req.data, 0, "%s", buf);
+                                       req.len = req.data->used;
+                               }
+                       }
+                       /*! \todo XXX If there's no Content-Length or if the content-length and what
+                                       we receive is not the same - we should generate an error */
+
+                       req.socket.tcptls_session = tcptls_session;
+                       handle_request_do(&req, &tcptls_session->remote_address);
                }
-               /*! \todo XXX If there's no Content-Length or if the content-length and what
-                               we receive is not the same - we should generate an error */
 
-               req.socket.tcptls_session = tcptls_session;
-               handle_request_do(&req, &tcptls_session->remote_address);
+               if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */
+                       enum sip_tcptls_alert alert;
+                       struct tcptls_packet *packet;
+
+                       fds[1].revents = 0;
+
+                       if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) {
+                               ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno));
+                               continue;
+                       }
+
+                       switch (alert) {
+                       case TCPTLS_ALERT_STOP:
+                               goto cleanup;
+                       case TCPTLS_ALERT_DATA:
+                               ao2_lock(me);
+                               if (!(packet = AST_LIST_REMOVE_HEAD(&me->packet_q, entry))) {
+                                       ast_log(LOG_WARNING, "TCPTLS thread alert_pipe indicated packet should be sent, but frame_q is empty");
+                               } else if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) {
+                                       ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n");
+                               }
+
+                               if (packet) {
+                                       ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed");
+                               }
+                               ao2_unlock(me);
+                               break;
+                       default:
+                               ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
+                       }
+               }
        }
 
+       ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
+
 cleanup:
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_REMOVE(&threadl, me, list);
-       AST_LIST_UNLOCK(&threadl);
-       ast_free(me);
-cleanup2:
-       fclose(tcptls_session->f);
-       tcptls_session->f = NULL;
-       tcptls_session->fd = -1;
+       if (me) {
+               ao2_t_unlink(threadt, me, "Removing tcptls helper thread, thread is closing");
+               ao2_t_ref(me, -1, "Removing tcp_helper_threads threadinfo ref");
+       }
        if (reqcpy.data) {
                ast_free(reqcpy.data);
        }
@@ -3018,12 +3228,27 @@ cleanup2:
                req.data = NULL;
        }
 
-       ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
-       
+       /* if client, we own the parent session arguments and must decrement ref */
+       if (ca) {
+               ao2_t_ref(ca, -1, "closing tcptls thread, getting rid of client tcptls_session arguments");
+       }
 
-       ao2_ref(tcptls_session, -1);
-       tcptls_session = NULL;
+       if (tcptls_session) {
+               ast_mutex_lock(&tcptls_session->lock);
+               if (tcptls_session->f) {
+                       fclose(tcptls_session->f);
+                       tcptls_session->f = NULL;
+               }
+               if (tcptls_session->fd != -1) {
+                       close(tcptls_session->fd);
+                       tcptls_session->fd = -1;
+               }
+               tcptls_session->parent = NULL;
+               ast_mutex_unlock(&tcptls_session->lock);
 
+               ao2_ref(tcptls_session, -1);
+               tcptls_session = NULL;
+       }
        return NULL;
 }
 
@@ -3480,26 +3705,15 @@ static int __sip_xmit(struct sip_pvt *p, struct ast_str *data, int len)
        if (sip_prepare_socket(p) < 0)
                return XMIT_ERROR;
 
-       if (p->socket.tcptls_session)
-               ast_mutex_lock(&p->socket.tcptls_session->lock);
-
-       if (p->socket.type & SIP_TRANSPORT_UDP) {
+       if (p->socket.type == SIP_TRANSPORT_UDP) {
                res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
        } else if (p->socket.tcptls_session) {
-               if (p->socket.tcptls_session->f) {
-                       res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len);
-               } else {
-                       ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
-                       return XMIT_ERROR;
-               }
+               res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
        } else {
                ast_debug(2, "Socket type is TCP but no tcptls_session is present to write to\n");
                return XMIT_ERROR;
        }
 
-       if (p->socket.tcptls_session)
-               ast_mutex_unlock(&p->socket.tcptls_session->lock);
-
        if (res == -1) {
                switch (errno) {
                case EBADF:             /* Bad file descriptor - seems like this is generated when the host exist, but doesn't accept the UDP packet */
@@ -12233,6 +12447,11 @@ static int expire_register(const void *data)
        destroy_association(peer);      /* remove registration data from storage */
        set_socket_transport(&peer->socket, peer->default_outbound_transport);
 
+       if (peer->socket.tcptls_session) {
+               ao2_ref(peer->socket.tcptls_session, -1);
+               peer->socket.tcptls_session = NULL;
+       }
+
        manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: SIP\r\nPeer: SIP/%s\r\nPeerStatus: Unregistered\r\nCause: Expired\r\n", peer->name);
        register_peer_exten(peer, FALSE);       /* Remove regexten */
        ast_devstate_changed(AST_DEVICE_UNKNOWN, "SIP/%s", peer->name);
@@ -14732,6 +14951,7 @@ static const char *cli_yesno(int x)
 static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
        struct sip_threadinfo *th;
+       struct ao2_iterator i;
 
 #define FORMAT2 "%-30.30s %3.6s %9.9s %6.6s\n"
 #define FORMAT  "%-30.30s %-6d %-9.9s %-6.6s\n"
@@ -14751,15 +14971,16 @@ static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                return CLI_SHOWUSAGE;
 
        ast_cli(a->fd, FORMAT2, "Host", "Port", "Transport", "Type");
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE(&threadl, th, list) {
+
+       i = ao2_iterator_init(threadt, 0);
+       while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
                ast_cli(a->fd, FORMAT, ast_inet_ntoa(th->tcptls_session->remote_address.sin_addr),
                        ntohs(th->tcptls_session->remote_address.sin_port),
                        get_transport(th->type),
                        (th->tcptls_session->client ? "Client" : "Server"));
-
+               ao2_t_ref(th, -1, "decrement ref from iterator");
        }
-       AST_LIST_UNLOCK(&threadl);
+
        return CLI_SUCCESS;
 #undef FORMAT
 #undef FORMAT2
@@ -22678,6 +22899,18 @@ static int sip_standard_port(enum sip_transport type, int port)
                return port == STANDARD_SIP_PORT;
 }
 
+static int threadinfo_locate_cb(void *obj, void *arg, int flags)
+{
+       struct sip_threadinfo *th = obj;
+       struct sockaddr_in *s = arg;
+
+       if (!inaddrcmp(&th->tcptls_session->remote_address, s)) {
+               return CMP_MATCH | CMP_STOP;
+       }
+
+       return 0;
+}
+
 /*!
  * \brief Find thread for TCP/TLS session (based on IP/Port
  *
@@ -22688,16 +22921,10 @@ static struct ast_tcptls_session_instance *sip_tcp_locate(struct sockaddr_in *s)
        struct sip_threadinfo *th;
        struct ast_tcptls_session_instance *tcptls_instance = NULL;
 
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE(&threadl, th, list) {
-               if ((s->sin_family == th->tcptls_session->remote_address.sin_family) &&
-                       (s->sin_addr.s_addr == th->tcptls_session->remote_address.sin_addr.s_addr) &&
-                       (s->sin_port == th->tcptls_session->remote_address.sin_port))  {
-                               tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
-                               break;
-                       }
+       if ((th = ao2_callback(threadt, 0, threadinfo_locate_cb, s))) {
+               tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
+               ao2_t_ref(th, -1, "decrement ref from callback");
        }
-       AST_LIST_UNLOCK(&threadl);
 
        return tcptls_instance;
 }
@@ -22707,14 +22934,23 @@ static int sip_prepare_socket(struct sip_pvt *p)
 {
        struct sip_socket *s = &p->socket;
        static const char name[] = "SIP socket";
+       struct sip_threadinfo *th;
        struct ast_tcptls_session_instance *tcptls_session;
-       struct ast_tcptls_session_args ca = {
+       struct ast_tcptls_session_args tmp_ca = {
                .name = name,
                .accept_fd = -1,
        };
+       struct ast_tcptls_session_args *ca;
 
-       if (s->fd != -1)
-               return s->fd;   /* This socket is already active */
+       /* check to see if a socket is already active */
+       if ((s->fd != -1) && (s->type == SIP_TRANSPORT_UDP)) {
+               return s->fd;
+       }
+       if ((s->type & (SIP_TRANSPORT_TCP | SIP_TRANSPORT_TLS)) &&
+                       (s->tcptls_session) &&
+                       (s->tcptls_session->fd != -1)) {
+               return s->tcptls_session->fd;
+       }
 
        /*! \todo Check this... This might be wrong, depending on the proxy configuration
                If proxy is in "force" mode its correct.
@@ -22723,14 +22959,23 @@ static int sip_prepare_socket(struct sip_pvt *p)
                s->type = p->outboundproxy->transport;
        }
 
-       if (s->type & SIP_TRANSPORT_UDP) {
+       if (s->type == SIP_TRANSPORT_UDP) {
                s->fd = sipsock;
                return s->fd;
        }
 
-       ca.remote_address = *(sip_real_dst(p));
+       /* At this point we are dealing with a TCP/TLS connection
+        * 1. We need to check to see if a connectin thread exists
+        *    for this address, if so use that.
+        * 2. If a thread does not exist for this address, but the tcptls_session
+        *    exists on the socket, the connection was closed.
+        * 3. If no tcptls_session thread exists for the address, and no tcptls_session
+        *    already exists on the socket, create a new one and launch a new thread.
+        */
 
-       if ((tcptls_session = sip_tcp_locate(&ca.remote_address))) {    /* Check if we have a thread handling a socket connected to this IP/port */
+       /* 1.  check for existing threads */
+       tmp_ca.remote_address = *(sip_real_dst(p));
+       if ((tcptls_session = sip_tcp_locate(&tmp_ca.remote_address))) {
                s->fd = tcptls_session->fd;
                if (s->tcptls_session) {
                        ao2_ref(s->tcptls_session, -1);
@@ -22738,46 +22983,82 @@ static int sip_prepare_socket(struct sip_pvt *p)
                }
                s->tcptls_session = tcptls_session;
                return s->fd;
+       /* 2.  Thread not found, if tcptls_session already exists, it once had a thread and is now terminated */
+       } else if (s->tcptls_session) {
+               return s->fd; /* XXX whether reconnection is ever necessary here needs to be investigated further */
        }
 
-       if (s->tcptls_session && s->tcptls_session->parent->tls_cfg) {
-               ca.tls_cfg = s->tcptls_session->parent->tls_cfg;
-       } else {
-               if (s->type & SIP_TRANSPORT_TLS) {
-                       ca.tls_cfg = ast_calloc(1, sizeof(*ca.tls_cfg));
-                       if (!ca.tls_cfg)
-                               return -1;
-                       memcpy(ca.tls_cfg, &default_tls_cfg, sizeof(*ca.tls_cfg));
-                       if (!ast_strlen_zero(p->tohost))
-                               ast_copy_string(ca.hostname, p->tohost, sizeof(ca.hostname));
-               }
+       /* 3.  Create a new TCP/TLS client connection */
+       /* create new session arguments for the client connection */
+       if (!(ca = ao2_alloc(sizeof(*ca), sip_tcptls_client_args_destructor)) ||
+               !(ca->name = ast_strdup(name))) {
+               goto create_tcptls_session_fail;
        }
-       
-       if (s->tcptls_session) {
-               /* the pvt socket already has a server instance ... */
-       } else {
-               s->tcptls_session = ast_tcptls_client_start(&ca); /* Start a client connection to this address */
+       ca->accept_fd = -1;
+       ca->remote_address = *(sip_real_dst(p));
+       /* if type is TLS, we need to create a tls cfg for this session arg */
+       if (s->type == SIP_TRANSPORT_TLS) {
+               if (!(ca->tls_cfg = ast_calloc(1, sizeof(*ca->tls_cfg)))) {
+                       goto create_tcptls_session_fail;
+               }
+               memcpy(ca->tls_cfg, &default_tls_cfg, sizeof(*ca->tls_cfg));
+
+               if (!(ca->tls_cfg->certfile = ast_strdup(default_tls_cfg.certfile)) ||
+                       !(ca->tls_cfg->pvtfile = ast_strdup(default_tls_cfg.pvtfile)) ||
+                       !(ca->tls_cfg->cipher = ast_strdup(default_tls_cfg.cipher)) ||
+                       !(ca->tls_cfg->cafile = ast_strdup(default_tls_cfg.cafile)) ||
+                       !(ca->tls_cfg->capath = ast_strdup(default_tls_cfg.capath))) {
+
+                       goto create_tcptls_session_fail;
+               }
+
+               /* this host is used as the common name in ssl/tls */
+               if (!ast_strlen_zero(p->tohost)) {
+                       ast_copy_string(ca->hostname, p->tohost, sizeof(ca->hostname));
+               }
        }
 
-       if (!s->tcptls_session) {
-               if (ca.tls_cfg)
-                       ast_free(ca.tls_cfg);
-               return -1;
+       /* Create a client connection for address, this does not start the connection, just sets it up. */
+       if (!(s->tcptls_session = ast_tcptls_client_create(ca))) {
+               goto create_tcptls_session_fail;
        }
 
-       s->fd = ca.accept_fd;
+       s->fd = s->tcptls_session->fd;
 
-       /* Give the new thread a reference */
+       /* client connections need to have the sip_threadinfo object created before
+        * the thread is detached.  This ensures the alert_pipe is up before it will
+        * be used.  Note that this function links the new threadinfo object into the
+        * threadt container. */
+       if (!(th = sip_threadinfo_create(s->tcptls_session, s->type))) {
+               goto create_tcptls_session_fail;
+       }
+
+       /* Give the new thread a reference to the tcptls_session */
        ao2_ref(s->tcptls_session, +1);
 
-       if (ast_pthread_create_background(&ca.master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
-               ast_debug(1, "Unable to launch '%s'.", ca.name);
-               ao2_ref(s->tcptls_session, -1);
-               close(ca.accept_fd);
-               s->fd = ca.accept_fd = -1;
+       if (ast_pthread_create_background(&ca->master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
+               ast_debug(1, "Unable to launch '%s'.", ca->name);
+               ao2_ref(s->tcptls_session, -1); /* take away the thread ref we just gave it */
+               goto create_tcptls_session_fail;
        }
 
        return s->fd;
+
+create_tcptls_session_fail:
+       if (ca) {
+               ao2_t_ref(ca, -1, "failed to create client, getting rid of client tcptls_session arguments");
+       }
+       if (s->tcptls_session) {
+               close(tcptls_session->fd);
+               s->fd = tcptls_session->fd = -1;
+               ao2_ref(s->tcptls_session, -1);
+               s->tcptls_session = NULL;
+       }
+       if (th) {
+               ao2_t_unlink(threadt, th, "Removing tcptls thread info object, thread failed to open");
+       }
+
+       return -1;
 }
 
 /*!
@@ -26362,6 +26643,7 @@ static int load_module(void)
        peers = ao2_t_container_alloc(hash_peer_size, peer_hash_cb, peer_cmp_cb, "allocate peers");
        peers_by_ip = ao2_t_container_alloc(hash_peer_size, peer_iphash_cb, peer_ipcmp_cb, "allocate peers_by_ip");
        dialogs = ao2_t_container_alloc(hash_dialog_size, dialog_hash_cb, dialog_cmp_cb, "allocate dialogs");
+       threadt = ao2_t_container_alloc(hash_dialog_size, threadt_hash_cb, threadt_cmp_cb, "allocate threadt table");
        
        ASTOBJ_CONTAINER_INIT(&regl); /* Registry object list -- not searched for anything */
        ASTOBJ_CONTAINER_INIT(&submwil); /* MWI subscription object list */
@@ -26492,17 +26774,15 @@ static int unload_module(void)
                ast_tcptls_server_stop(&sip_tls_desc);
 
        /* Kill all existing TCP/TLS threads */
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE_SAFE_BEGIN(&threadl, th, list) {
+       i = ao2_iterator_init(threadt, 0);
+       while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
                pthread_t thread = th->threadid;
                th->stop = 1;
-               AST_LIST_UNLOCK(&threadl);
                pthread_kill(thread, SIGURG);
                pthread_join(thread, NULL);
-               AST_LIST_LOCK(&threadl);
+               ao2_t_ref(th, -1, "decrement ref from iterator");
        }
-       AST_LIST_TRAVERSE_SAFE_END;
-       AST_LIST_UNLOCK(&threadl);
+       ao2_iterator_destroy(&i);
 
        /* Hangup all dialogs if they have an owner */
        i = ao2_iterator_init(dialogs, 0);
@@ -26555,6 +26835,7 @@ static int unload_module(void)
        ao2_t_ref(peers, -1, "unref the peers table");
        ao2_t_ref(peers_by_ip, -1, "unref the peers_by_ip table");
        ao2_t_ref(dialogs, -1, "unref the dialogs table");
+       ao2_t_ref(threadt, -1, "unref the thread table");
 
        clear_sip_domains();
        close(sipsock);