SIP TCP/TLS: move client connection setup/write into tcp helper thread, various relat...
authorDavid Vossel <dvossel@digium.com>
Thu, 22 Oct 2009 19:55:51 +0000 (19:55 +0000)
committerDavid Vossel <dvossel@digium.com>
Thu, 22 Oct 2009 19:55:51 +0000 (19:55 +0000)
        What this patch fixes
1.Moves sip TCP/TLS connection setup into the TCP helper thread:
  Connection setup takes awhile and before this it was being
  done while holding the monitor lock.
2.Moves TCP/TLS writing to the TCP helper thread:  Through the
  use of a packet queue and an alert pipe, the TCP helper thread
  can now be woken up to write data as well as read data.
3.Locking error: sip_xmit returned an XMIT_ERROR without giving
  up the tcptls_session lock.  This lock has been completely removed
  from sip_xmit and placed in the new sip_tcptls_write() function.
4.Memory leak:  When creating a tcptls_client the tls_cfg was alloced
  but never freed unless the tcptls_session failed to start.  Now the
  session_args for a sip client are an ao2 object which frees the
  tls_cfg on destruction.
5.Pointer to stack variable: During sip_prepare_socket the creation
  of a client's ast_tcptls_session_args was done on the stack and
  stored as a pointer in the newly created tcptls_session.  Depending
  on the events that followed, there was a slight possibility that
  pointer could have been accessed after the stack returned.  Given
  the new changes, it is always accessed after the stack returns
  which is why I found it.

Notable code changes
1.I broke tcptls.c's ast_tcptls_client_start() function into two
  functions.  One for creating and allocating the new tcptls_session,
  and a separate one for starting and handling the new connection.
  This allowed me to create the tcptls_session, launch the helper
  thread, and then establish the connection within the helper thread.
2.Writes to a tcptls_session are now done within the helper thread.
  This is done by using an alert pipe to wake up the thread if new
  data needs to be sent.  The thread's sip_threadinfo object contains
  the alert pipe as well as the packet queue.
3.Since the threadinfo object contains the alert pipe, it must now be
  accessed outside of the helper thread for every write (queuing of a
  packet).  For easy lookup, I moved the threadinfo objects from a
  linked list to an ao2_container.

(closes issue #13136)
Reported by: pabelanger
Tested by: dvossel, whys

(closes issue #15894)
Reported by: dvossel
Tested by: dvossel

Review: https://reviewboard.asterisk.org/r/380/

git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@225445 65c4cc65-6c06-0410-ace0-fbb531ad65f3

apps/app_externalivr.c
channels/chan_sip.c
include/asterisk/tcptls.h
main/tcptls.c

index 3a36449..1dd1fc4 100644 (file)
@@ -457,9 +457,7 @@ static int app_exec(struct ast_channel *chan, const char *data)
                ivr_desc.local_address.sin_family = AF_INET;
                ivr_desc.local_address.sin_port = htons(port);
                memcpy(&ivr_desc.local_address.sin_addr.s_addr, hp.hp.h_addr, hp.hp.h_length);
-               ser = ast_tcptls_client_start(&ivr_desc);
-
-               if (!ser) {
+               if (!(ser = ast_tcptls_client_create(&ivr_desc)) || !(ser = ast_tcptls_client_start(ser))) {
                        goto exit;
                }
                res = eivr_comm(chan, u, ser->fd, ser->fd, -1, pipe_delim_args, flags);
index e15b5fb..50756fa 100644 (file)
@@ -2111,13 +2111,26 @@ struct sip_registry {
        char lastmsg[256];              /*!< Last Message sent/received */
 };
 
+enum sip_tcptls_alert {
+       /*! \brief There is new data to be sent out */
+       TCPTLS_ALERT_DATA,
+       /*! \brief A request to stop the tcp_handler thread */
+       TCPTLS_ALERT_STOP,
+};
+
+struct tcptls_packet {
+       AST_LIST_ENTRY(tcptls_packet) entry;
+       struct ast_str *data;
+       size_t len;
+};
 /*! \brief Definition of a thread that handles a socket */
 struct sip_threadinfo {
        int stop;
+       int alert_pipe[2]; /*! Used to alert tcptls thread when packet is ready to be written */
        pthread_t threadid;
        struct ast_tcptls_session_instance *tcptls_session;
        enum sip_transport type;        /*!< We keep a copy of the type here so we can display it in the connection list */
-       AST_LIST_ENTRY(sip_threadinfo) list;
+       AST_LIST_HEAD_NOLOCK(, tcptls_packet) packet_q;
 };
 
 /*! \brief Definition of an MWI subscription to another server */
@@ -2151,8 +2164,8 @@ static int hash_dialog_size = 563;
 static int hash_user_size = 563;
 #endif
 
-/*! \brief  The thread list of TCP threads */
-static AST_LIST_HEAD_STATIC(threadl, sip_threadinfo);
+/*! \brief  The table of TCP threads */
+static struct ao2_container *threadt;
 
 /*! \brief  The peer list: Users, Peers and Friends */
 static struct ao2_container *peers;
@@ -2244,6 +2257,21 @@ static int peer_ipcmp_cb(void *obj, void *arg, int flags)
        return peer->addr.sin_port == peer2->addr.sin_port ? (CMP_MATCH | CMP_STOP) : 0;
 }
 
+
+static int threadt_hash_cb(const void *obj, const int flags)
+{
+       const struct sip_threadinfo *th = obj;
+
+       return (int) th->tcptls_session->remote_address.sin_addr.s_addr;
+}
+
+static int threadt_cmp_cb(void *obj, void *arg, int flags)
+{
+       struct sip_threadinfo *th = obj, *th2 = arg;
+
+       return (th->tcptls_session == th2->tcptls_session) ? CMP_MATCH | CMP_STOP : 0;
+}
+
 /*!
  * \note The only member of the dialog used here callid string
  */
@@ -2890,6 +2918,130 @@ static struct ast_variable *copy_vars(struct ast_variable *src)
        return res;
 }
 
+static void tcptls_packet_destructor(void *obj)
+{
+       struct tcptls_packet *packet = obj;
+
+       ast_free(packet->data);
+}
+
+static void sip_tcptls_client_args_destructor(void *obj)
+{
+       struct ast_tcptls_session_args *args = obj;
+       if (args->tls_cfg) {
+               ast_free(args->tls_cfg->certfile);
+               ast_free(args->tls_cfg->pvtfile);
+               ast_free(args->tls_cfg->cipher);
+               ast_free(args->tls_cfg->cafile);
+               ast_free(args->tls_cfg->capath);
+       }
+       ast_free(args->tls_cfg);
+       ast_free((char *) args->name);
+}
+
+static void sip_threadinfo_destructor(void *obj)
+{
+       struct sip_threadinfo *th = obj;
+       struct tcptls_packet *packet;
+       if (th->alert_pipe[1] > -1) {
+               close(th->alert_pipe[0]);
+       }
+       if (th->alert_pipe[1] > -1) {
+               close(th->alert_pipe[1]);
+       }
+       th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+       while ((packet = AST_LIST_REMOVE_HEAD(&th->packet_q, entry))) {
+               ao2_t_ref(packet, -1, "thread destruction, removing packet from frame queue");
+       }
+
+       if (th->tcptls_session) {
+               ao2_t_ref(th->tcptls_session, -1, "remove tcptls_session for sip_threadinfo object");
+       }
+}
+
+/*! \brief creates a sip_threadinfo object and links it into the threadt table. */
+static struct sip_threadinfo *sip_threadinfo_create(struct ast_tcptls_session_instance *tcptls_session, int transport)
+{
+       struct sip_threadinfo *th;
+
+       if (!tcptls_session || !(th = ao2_alloc(sizeof(*th), sip_threadinfo_destructor))) {
+               return NULL;
+       }
+
+       th->alert_pipe[0] = th->alert_pipe[1] = -1;
+
+       if (pipe(th->alert_pipe) == -1) {
+               ao2_t_ref(th, -1, "Failed to open alert pipe on sip_threadinfo");
+               ast_log(LOG_ERROR, "Could not create sip alert pipe in tcptls thread, error %s\n", strerror(errno));
+               return NULL;
+       }
+       ao2_t_ref(tcptls_session, +1, "tcptls_session ref for sip_threadinfo object");
+       th->tcptls_session = tcptls_session;
+       th->type = transport ? transport : (tcptls_session->ssl ? SIP_TRANSPORT_TLS: SIP_TRANSPORT_TCP);
+       ao2_t_link(threadt, th, "Adding new tcptls helper thread");
+       ao2_t_ref(th, -1, "Decrementing threadinfo ref from alloc, only table ref remains");
+       return th;
+}
+
+/*! \brief used to indicate to a tcptls thread that data is ready to be written */
+static int sip_tcptls_write(struct ast_tcptls_session_instance *tcptls_session, const void *buf, size_t len)
+{
+       int res = len;
+       struct sip_threadinfo *th = NULL;
+       struct tcptls_packet *packet = NULL;
+       struct sip_threadinfo tmp = {
+               .tcptls_session = tcptls_session,
+       };
+       enum sip_tcptls_alert alert = TCPTLS_ALERT_DATA;
+
+       if (!tcptls_session) {
+               return XMIT_ERROR;
+       }
+
+       ast_mutex_lock(&tcptls_session->lock);
+
+       if ((tcptls_session->fd == -1) ||
+               !(th = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread")) ||
+               !(packet = ao2_alloc(sizeof(*packet), tcptls_packet_destructor)) ||
+               !(packet->data = ast_str_create(len))) {
+               goto tcptls_write_setup_error;
+       }
+
+       /* goto tcptls_write_error should _NOT_ be used beyond this point */
+       ast_str_set(&packet->data, 0, "%s", (char *) buf);
+       packet->len = len;
+
+       /* alert tcptls thread handler that there is a packet to be sent.
+        * must lock the thread info object to guarantee control of the
+        * packet queue */
+       ao2_lock(th);
+       if (write(th->alert_pipe[1], &alert, sizeof(alert)) == -1) {
+               ast_log(LOG_ERROR, "write() to alert pipe failed: %s\n", strerror(errno));
+               ao2_t_ref(packet, -1, "could not write to alert pipe, remove packet");
+               packet = NULL;
+               res = XMIT_ERROR;
+       } else { /* it is safe to queue the frame after issuing the alert when we hold the threadinfo lock */
+               AST_LIST_INSERT_TAIL(&th->packet_q, packet, entry);
+       }
+       ao2_unlock(th);
+
+       ast_mutex_unlock(&tcptls_session->lock);
+       ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo object after finding it");
+       return res;
+
+tcptls_write_setup_error:
+       if (th) {
+               ao2_t_ref(th, -1, "In sip_tcptls_write, unref threadinfo obj, could not create packet");
+       }
+       if (packet) {
+               ao2_t_ref(packet, -1, "could not allocate packet's data");
+       }
+       ast_mutex_unlock(&tcptls_session->lock);
+
+       return XMIT_ERROR;
+}
+
 /*! \brief SIP TCP connection handler */
 static void *sip_tcp_worker_fn(void *data)
 {
@@ -2905,26 +3057,45 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi
 {
        int res, cl;
        struct sip_request req = { 0, } , reqcpy = { 0, };
-       struct sip_threadinfo *me;
+       struct sip_threadinfo *me = NULL;
        char buf[1024] = "";
+       struct pollfd fds[2] = { { 0 }, { 0 }, };
+       struct ast_tcptls_session_args *ca = NULL;
+
+       /* If this is a server session, then the connection has already been setup,
+        * simply create the threadinfo object so we can access this thread for writing.
+        * 
+        * if this is a client connection more work must be done.
+        * 1. We own the parent session args for a client connection.  This pointer needs
+        *    to be held on to so we can decrement it's ref count on thread destruction.
+        * 2. The threadinfo object was created before this thread was launched, however
+        *    it must be found within the threadt table.
+        * 3. Last, the tcptls_session must be started.
+        */
+       if (!tcptls_session->client) {
+               if (!(me = sip_threadinfo_create(tcptls_session, tcptls_session->ssl ? SIP_TRANSPORT_TLS : SIP_TRANSPORT_TCP))) {
+                       goto cleanup;
+               }
+               ao2_t_ref(me, +1, "Adding threadinfo ref for tcp_helper_thread");
+       } else {
+               struct sip_threadinfo tmp = {
+                       .tcptls_session = tcptls_session,
+               };
 
-       me = ast_calloc(1, sizeof(*me));
-
-       if (!me)
-               goto cleanup2;
+               if ((!(ca = tcptls_session->parent)) ||
+                       (!(me = ao2_t_find(threadt, &tmp, OBJ_POINTER, "ao2_find, getting sip_threadinfo in tcp helper thread"))) ||
+                       (!(tcptls_session = ast_tcptls_client_start(tcptls_session)))) {
+                       goto cleanup;
+               }
+       }
 
        me->threadid = pthread_self();
-       me->tcptls_session = tcptls_session;
-       if (tcptls_session->ssl)
-               me->type = SIP_TRANSPORT_TLS;
-       else
-               me->type = SIP_TRANSPORT_TCP;
-
        ast_debug(2, "Starting thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
 
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_INSERT_TAIL(&threadl, me, list);
-       AST_LIST_UNLOCK(&threadl);
+       /* set up pollfd to watch for reads on both the socket and the alert_pipe */
+       fds[0].fd = tcptls_session->fd;
+       fds[1].fd = me->alert_pipe[0];
+       fds[0].events = fds[1].events = POLLIN | POLLPRI;
 
        if (!(req.data = ast_str_create(SIP_MIN_PACKET)))
                goto cleanup;
@@ -2934,81 +3105,120 @@ static void *_sip_tcp_helper_thread(struct sip_pvt *pvt, struct ast_tcptls_sessi
        for (;;) {
                struct ast_str *str_save;
 
-               str_save = req.data;
-               memset(&req, 0, sizeof(req));
-               req.data = str_save;
-               ast_str_reset(req.data);
-
-               str_save = reqcpy.data;
-               memset(&reqcpy, 0, sizeof(reqcpy));
-               reqcpy.data = str_save;
-               ast_str_reset(reqcpy.data);
-
-               memset(buf, 0, sizeof(buf));
-
-               if (tcptls_session->ssl) {
-                       set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
-                       req.socket.port = htons(ourport_tls);
-               } else {
-                       set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
-                       req.socket.port = htons(ourport_tcp);
-               }
-               req.socket.fd = tcptls_session->fd;
-               res = ast_wait_for_input(tcptls_session->fd, -1);
+               res = ast_poll(fds, 2, -1); /* polls for both socket and alert_pipe */
                if (res < 0) {
                        ast_debug(2, "SIP %s server :: ast_wait_for_input returned %d\n", tcptls_session->ssl ? "SSL": "TCP", res);
                        goto cleanup;
                }
 
-               /* Read in headers one line at a time */
-               while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
-                       ast_mutex_lock(&tcptls_session->lock);
-                       if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
-                               ast_mutex_unlock(&tcptls_session->lock);
-                               goto cleanup;
+               /* handle the socket event, check for both reads from the socket fd,
+                * and writes from alert_pipe fd */
+               if (fds[0].revents) { /* there is data on the socket to be read */
+
+                       fds[0].revents = 0;
+
+                       /* clear request structure */
+                       str_save = req.data;
+                       memset(&req, 0, sizeof(req));
+                       req.data = str_save;
+                       ast_str_reset(req.data);
+
+                       str_save = reqcpy.data;
+                       memset(&reqcpy, 0, sizeof(reqcpy));
+                       reqcpy.data = str_save;
+                       ast_str_reset(reqcpy.data);
+
+                       memset(buf, 0, sizeof(buf));
+
+                       if (tcptls_session->ssl) {
+                               set_socket_transport(&req.socket, SIP_TRANSPORT_TLS);
+                               req.socket.port = htons(ourport_tls);
+                       } else {
+                               set_socket_transport(&req.socket, SIP_TRANSPORT_TCP);
+                               req.socket.port = htons(ourport_tcp);
                        }
-                       ast_mutex_unlock(&tcptls_session->lock);
-                       if (me->stop)
-                                goto cleanup;
-                       ast_str_append(&req.data, 0, "%s", buf);
-                       req.len = req.data->used;
-               }
-               copy_request(&reqcpy, &req);
-               parse_request(&reqcpy);
-               /* In order to know how much to read, we need the content-length header */
-               if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
-                       while (cl > 0) {
-                               size_t bytes_read;
+                       req.socket.fd = tcptls_session->fd;
+
+                       /* Read in headers one line at a time */
+                       while (req.len < 4 || strncmp(REQ_OFFSET_TO_STR(&req, len - 4), "\r\n\r\n", 4)) {
                                ast_mutex_lock(&tcptls_session->lock);
-                               if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+                               if (!fgets(buf, sizeof(buf), tcptls_session->f)) {
                                        ast_mutex_unlock(&tcptls_session->lock);
                                        goto cleanup;
                                }
-                               buf[bytes_read] = '\0';
                                ast_mutex_unlock(&tcptls_session->lock);
                                if (me->stop)
-                                       goto cleanup;
-                               cl -= strlen(buf);
+                                        goto cleanup;
                                ast_str_append(&req.data, 0, "%s", buf);
                                req.len = req.data->used;
                        }
+                       copy_request(&reqcpy, &req);
+                       parse_request(&reqcpy);
+                       /* In order to know how much to read, we need the content-length header */
+                       if (sscanf(get_header(&reqcpy, "Content-Length"), "%30d", &cl)) {
+                               while (cl > 0) {
+                                       size_t bytes_read;
+                                       ast_mutex_lock(&tcptls_session->lock);
+                                       if (!(bytes_read = fread(buf, 1, MIN(sizeof(buf) - 1, cl), tcptls_session->f))) {
+                                               ast_mutex_unlock(&tcptls_session->lock);
+                                               goto cleanup;
+                                       }
+                                       buf[bytes_read] = '\0';
+                                       ast_mutex_unlock(&tcptls_session->lock);
+                                       if (me->stop)
+                                               goto cleanup;
+                                       cl -= strlen(buf);
+                                       ast_str_append(&req.data, 0, "%s", buf);
+                                       req.len = req.data->used;
+                               }
+                       }
+                       /*! \todo XXX If there's no Content-Length or if the content-length and what
+                                       we receive is not the same - we should generate an error */
+
+                       req.socket.tcptls_session = tcptls_session;
+                       handle_request_do(&req, &tcptls_session->remote_address);
                }
-               /*! \todo XXX If there's no Content-Length or if the content-length and what
-                               we receive is not the same - we should generate an error */
 
-               req.socket.tcptls_session = tcptls_session;
-               handle_request_do(&req, &tcptls_session->remote_address);
+               if (fds[1].revents) { /* alert_pipe indicates there is data in the send queue to be sent */
+                       enum sip_tcptls_alert alert;
+                       struct tcptls_packet *packet;
+
+                       fds[1].revents = 0;
+
+                       if (read(me->alert_pipe[0], &alert, sizeof(alert)) == -1) {
+                               ast_log(LOG_ERROR, "read() failed: %s\n", strerror(errno));
+                               continue;
+                       }
+
+                       switch (alert) {
+                       case TCPTLS_ALERT_STOP:
+                               goto cleanup;
+                       case TCPTLS_ALERT_DATA:
+                               ao2_lock(me);
+                               if (!(packet = AST_LIST_REMOVE_HEAD(&me->packet_q, entry))) {
+                                       ast_log(LOG_WARNING, "TCPTLS thread alert_pipe indicated packet should be sent, but frame_q is empty");
+                               } else if (ast_tcptls_server_write(tcptls_session, ast_str_buffer(packet->data), packet->len) == -1) {
+                                       ast_log(LOG_WARNING, "Failure to write to tcp/tls socket\n");
+                               }
+
+                               if (packet) {
+                                       ao2_t_ref(packet, -1, "tcptls packet sent, this is no longer needed");
+                               }
+                               ao2_unlock(me);
+                               break;
+                       default:
+                               ast_log(LOG_ERROR, "Unknown tcptls thread alert '%d'\n", alert);
+                       }
+               }
        }
 
+       ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
+
 cleanup:
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_REMOVE(&threadl, me, list);
-       AST_LIST_UNLOCK(&threadl);
-       ast_free(me);
-cleanup2:
-       fclose(tcptls_session->f);
-       tcptls_session->f = NULL;
-       tcptls_session->fd = -1;
+       if (me) {
+               ao2_t_unlink(threadt, me, "Removing tcptls helper thread, thread is closing");
+               ao2_t_ref(me, -1, "Removing tcp_helper_threads threadinfo ref");
+       }
        if (reqcpy.data) {
                ast_free(reqcpy.data);
        }
@@ -3018,12 +3228,27 @@ cleanup2:
                req.data = NULL;
        }
 
-       ast_debug(2, "Shutting down thread for %s server\n", tcptls_session->ssl ? "SSL" : "TCP");
-       
+       /* if client, we own the parent session arguments and must decrement ref */
+       if (ca) {
+               ao2_t_ref(ca, -1, "closing tcptls thread, getting rid of client tcptls_session arguments");
+       }
 
-       ao2_ref(tcptls_session, -1);
-       tcptls_session = NULL;
+       if (tcptls_session) {
+               ast_mutex_lock(&tcptls_session->lock);
+               if (tcptls_session->f) {
+                       fclose(tcptls_session->f);
+                       tcptls_session->f = NULL;
+               }
+               if (tcptls_session->fd != -1) {
+                       close(tcptls_session->fd);
+                       tcptls_session->fd = -1;
+               }
+               tcptls_session->parent = NULL;
+               ast_mutex_unlock(&tcptls_session->lock);
 
+               ao2_ref(tcptls_session, -1);
+               tcptls_session = NULL;
+       }
        return NULL;
 }
 
@@ -3480,26 +3705,15 @@ static int __sip_xmit(struct sip_pvt *p, struct ast_str *data, int len)
        if (sip_prepare_socket(p) < 0)
                return XMIT_ERROR;
 
-       if (p->socket.tcptls_session)
-               ast_mutex_lock(&p->socket.tcptls_session->lock);
-
-       if (p->socket.type & SIP_TRANSPORT_UDP) {
+       if (p->socket.type == SIP_TRANSPORT_UDP) {
                res = sendto(p->socket.fd, data->str, len, 0, (const struct sockaddr *)dst, sizeof(struct sockaddr_in));
        } else if (p->socket.tcptls_session) {
-               if (p->socket.tcptls_session->f) {
-                       res = ast_tcptls_server_write(p->socket.tcptls_session, data->str, len);
-               } else {
-                       ast_debug(2, "No p->socket.tcptls_session->f len=%d\n", len);
-                       return XMIT_ERROR;
-               }
+               res = sip_tcptls_write(p->socket.tcptls_session, data->str, len);
        } else {
                ast_debug(2, "Socket type is TCP but no tcptls_session is present to write to\n");
                return XMIT_ERROR;
        }
 
-       if (p->socket.tcptls_session)
-               ast_mutex_unlock(&p->socket.tcptls_session->lock);
-
        if (res == -1) {
                switch (errno) {
                case EBADF:             /* Bad file descriptor - seems like this is generated when the host exist, but doesn't accept the UDP packet */
@@ -12233,6 +12447,11 @@ static int expire_register(const void *data)
        destroy_association(peer);      /* remove registration data from storage */
        set_socket_transport(&peer->socket, peer->default_outbound_transport);
 
+       if (peer->socket.tcptls_session) {
+               ao2_ref(peer->socket.tcptls_session, -1);
+               peer->socket.tcptls_session = NULL;
+       }
+
        manager_event(EVENT_FLAG_SYSTEM, "PeerStatus", "ChannelType: SIP\r\nPeer: SIP/%s\r\nPeerStatus: Unregistered\r\nCause: Expired\r\n", peer->name);
        register_peer_exten(peer, FALSE);       /* Remove regexten */
        ast_devstate_changed(AST_DEVICE_UNKNOWN, "SIP/%s", peer->name);
@@ -14732,6 +14951,7 @@ static const char *cli_yesno(int x)
 static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
 {
        struct sip_threadinfo *th;
+       struct ao2_iterator i;
 
 #define FORMAT2 "%-30.30s %3.6s %9.9s %6.6s\n"
 #define FORMAT  "%-30.30s %-6d %-9.9s %-6.6s\n"
@@ -14751,15 +14971,16 @@ static char *sip_show_tcp(struct ast_cli_entry *e, int cmd, struct ast_cli_args
                return CLI_SHOWUSAGE;
 
        ast_cli(a->fd, FORMAT2, "Host", "Port", "Transport", "Type");
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE(&threadl, th, list) {
+
+       i = ao2_iterator_init(threadt, 0);
+       while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
                ast_cli(a->fd, FORMAT, ast_inet_ntoa(th->tcptls_session->remote_address.sin_addr),
                        ntohs(th->tcptls_session->remote_address.sin_port),
                        get_transport(th->type),
                        (th->tcptls_session->client ? "Client" : "Server"));
-
+               ao2_t_ref(th, -1, "decrement ref from iterator");
        }
-       AST_LIST_UNLOCK(&threadl);
+
        return CLI_SUCCESS;
 #undef FORMAT
 #undef FORMAT2
@@ -22678,6 +22899,18 @@ static int sip_standard_port(enum sip_transport type, int port)
                return port == STANDARD_SIP_PORT;
 }
 
+static int threadinfo_locate_cb(void *obj, void *arg, int flags)
+{
+       struct sip_threadinfo *th = obj;
+       struct sockaddr_in *s = arg;
+
+       if (!inaddrcmp(&th->tcptls_session->remote_address, s)) {
+               return CMP_MATCH | CMP_STOP;
+       }
+
+       return 0;
+}
+
 /*!
  * \brief Find thread for TCP/TLS session (based on IP/Port
  *
@@ -22688,16 +22921,10 @@ static struct ast_tcptls_session_instance *sip_tcp_locate(struct sockaddr_in *s)
        struct sip_threadinfo *th;
        struct ast_tcptls_session_instance *tcptls_instance = NULL;
 
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE(&threadl, th, list) {
-               if ((s->sin_family == th->tcptls_session->remote_address.sin_family) &&
-                       (s->sin_addr.s_addr == th->tcptls_session->remote_address.sin_addr.s_addr) &&
-                       (s->sin_port == th->tcptls_session->remote_address.sin_port))  {
-                               tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
-                               break;
-                       }
+       if ((th = ao2_callback(threadt, 0, threadinfo_locate_cb, s))) {
+               tcptls_instance = (ao2_ref(th->tcptls_session, +1), th->tcptls_session);
+               ao2_t_ref(th, -1, "decrement ref from callback");
        }
-       AST_LIST_UNLOCK(&threadl);
 
        return tcptls_instance;
 }
@@ -22707,14 +22934,23 @@ static int sip_prepare_socket(struct sip_pvt *p)
 {
        struct sip_socket *s = &p->socket;
        static const char name[] = "SIP socket";
+       struct sip_threadinfo *th;
        struct ast_tcptls_session_instance *tcptls_session;
-       struct ast_tcptls_session_args ca = {
+       struct ast_tcptls_session_args tmp_ca = {
                .name = name,
                .accept_fd = -1,
        };
+       struct ast_tcptls_session_args *ca;
 
-       if (s->fd != -1)
-               return s->fd;   /* This socket is already active */
+       /* check to see if a socket is already active */
+       if ((s->fd != -1) && (s->type == SIP_TRANSPORT_UDP)) {
+               return s->fd;
+       }
+       if ((s->type & (SIP_TRANSPORT_TCP | SIP_TRANSPORT_TLS)) &&
+                       (s->tcptls_session) &&
+                       (s->tcptls_session->fd != -1)) {
+               return s->tcptls_session->fd;
+       }
 
        /*! \todo Check this... This might be wrong, depending on the proxy configuration
                If proxy is in "force" mode its correct.
@@ -22723,14 +22959,23 @@ static int sip_prepare_socket(struct sip_pvt *p)
                s->type = p->outboundproxy->transport;
        }
 
-       if (s->type & SIP_TRANSPORT_UDP) {
+       if (s->type == SIP_TRANSPORT_UDP) {
                s->fd = sipsock;
                return s->fd;
        }
 
-       ca.remote_address = *(sip_real_dst(p));
+       /* At this point we are dealing with a TCP/TLS connection
+        * 1. We need to check to see if a connectin thread exists
+        *    for this address, if so use that.
+        * 2. If a thread does not exist for this address, but the tcptls_session
+        *    exists on the socket, the connection was closed.
+        * 3. If no tcptls_session thread exists for the address, and no tcptls_session
+        *    already exists on the socket, create a new one and launch a new thread.
+        */
 
-       if ((tcptls_session = sip_tcp_locate(&ca.remote_address))) {    /* Check if we have a thread handling a socket connected to this IP/port */
+       /* 1.  check for existing threads */
+       tmp_ca.remote_address = *(sip_real_dst(p));
+       if ((tcptls_session = sip_tcp_locate(&tmp_ca.remote_address))) {
                s->fd = tcptls_session->fd;
                if (s->tcptls_session) {
                        ao2_ref(s->tcptls_session, -1);
@@ -22738,46 +22983,82 @@ static int sip_prepare_socket(struct sip_pvt *p)
                }
                s->tcptls_session = tcptls_session;
                return s->fd;
+       /* 2.  Thread not found, if tcptls_session already exists, it once had a thread and is now terminated */
+       } else if (s->tcptls_session) {
+               return s->fd; /* XXX whether reconnection is ever necessary here needs to be investigated further */
        }
 
-       if (s->tcptls_session && s->tcptls_session->parent->tls_cfg) {
-               ca.tls_cfg = s->tcptls_session->parent->tls_cfg;
-       } else {
-               if (s->type & SIP_TRANSPORT_TLS) {
-                       ca.tls_cfg = ast_calloc(1, sizeof(*ca.tls_cfg));
-                       if (!ca.tls_cfg)
-                               return -1;
-                       memcpy(ca.tls_cfg, &default_tls_cfg, sizeof(*ca.tls_cfg));
-                       if (!ast_strlen_zero(p->tohost))
-                               ast_copy_string(ca.hostname, p->tohost, sizeof(ca.hostname));
-               }
+       /* 3.  Create a new TCP/TLS client connection */
+       /* create new session arguments for the client connection */
+       if (!(ca = ao2_alloc(sizeof(*ca), sip_tcptls_client_args_destructor)) ||
+               !(ca->name = ast_strdup(name))) {
+               goto create_tcptls_session_fail;
        }
-       
-       if (s->tcptls_session) {
-               /* the pvt socket already has a server instance ... */
-       } else {
-               s->tcptls_session = ast_tcptls_client_start(&ca); /* Start a client connection to this address */
+       ca->accept_fd = -1;
+       ca->remote_address = *(sip_real_dst(p));
+       /* if type is TLS, we need to create a tls cfg for this session arg */
+       if (s->type == SIP_TRANSPORT_TLS) {
+               if (!(ca->tls_cfg = ast_calloc(1, sizeof(*ca->tls_cfg)))) {
+                       goto create_tcptls_session_fail;
+               }
+               memcpy(ca->tls_cfg, &default_tls_cfg, sizeof(*ca->tls_cfg));
+
+               if (!(ca->tls_cfg->certfile = ast_strdup(default_tls_cfg.certfile)) ||
+                       !(ca->tls_cfg->pvtfile = ast_strdup(default_tls_cfg.pvtfile)) ||
+                       !(ca->tls_cfg->cipher = ast_strdup(default_tls_cfg.cipher)) ||
+                       !(ca->tls_cfg->cafile = ast_strdup(default_tls_cfg.cafile)) ||
+                       !(ca->tls_cfg->capath = ast_strdup(default_tls_cfg.capath))) {
+
+                       goto create_tcptls_session_fail;
+               }
+
+               /* this host is used as the common name in ssl/tls */
+               if (!ast_strlen_zero(p->tohost)) {
+                       ast_copy_string(ca->hostname, p->tohost, sizeof(ca->hostname));
+               }
        }
 
-       if (!s->tcptls_session) {
-               if (ca.tls_cfg)
-                       ast_free(ca.tls_cfg);
-               return -1;
+       /* Create a client connection for address, this does not start the connection, just sets it up. */
+       if (!(s->tcptls_session = ast_tcptls_client_create(ca))) {
+               goto create_tcptls_session_fail;
        }
 
-       s->fd = ca.accept_fd;
+       s->fd = s->tcptls_session->fd;
 
-       /* Give the new thread a reference */
+       /* client connections need to have the sip_threadinfo object created before
+        * the thread is detached.  This ensures the alert_pipe is up before it will
+        * be used.  Note that this function links the new threadinfo object into the
+        * threadt container. */
+       if (!(th = sip_threadinfo_create(s->tcptls_session, s->type))) {
+               goto create_tcptls_session_fail;
+       }
+
+       /* Give the new thread a reference to the tcptls_session */
        ao2_ref(s->tcptls_session, +1);
 
-       if (ast_pthread_create_background(&ca.master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
-               ast_debug(1, "Unable to launch '%s'.", ca.name);
-               ao2_ref(s->tcptls_session, -1);
-               close(ca.accept_fd);
-               s->fd = ca.accept_fd = -1;
+       if (ast_pthread_create_background(&ca->master, NULL, sip_tcp_worker_fn, s->tcptls_session)) {
+               ast_debug(1, "Unable to launch '%s'.", ca->name);
+               ao2_ref(s->tcptls_session, -1); /* take away the thread ref we just gave it */
+               goto create_tcptls_session_fail;
        }
 
        return s->fd;
+
+create_tcptls_session_fail:
+       if (ca) {
+               ao2_t_ref(ca, -1, "failed to create client, getting rid of client tcptls_session arguments");
+       }
+       if (s->tcptls_session) {
+               close(tcptls_session->fd);
+               s->fd = tcptls_session->fd = -1;
+               ao2_ref(s->tcptls_session, -1);
+               s->tcptls_session = NULL;
+       }
+       if (th) {
+               ao2_t_unlink(threadt, th, "Removing tcptls thread info object, thread failed to open");
+       }
+
+       return -1;
 }
 
 /*!
@@ -26362,6 +26643,7 @@ static int load_module(void)
        peers = ao2_t_container_alloc(hash_peer_size, peer_hash_cb, peer_cmp_cb, "allocate peers");
        peers_by_ip = ao2_t_container_alloc(hash_peer_size, peer_iphash_cb, peer_ipcmp_cb, "allocate peers_by_ip");
        dialogs = ao2_t_container_alloc(hash_dialog_size, dialog_hash_cb, dialog_cmp_cb, "allocate dialogs");
+       threadt = ao2_t_container_alloc(hash_dialog_size, threadt_hash_cb, threadt_cmp_cb, "allocate threadt table");
        
        ASTOBJ_CONTAINER_INIT(&regl); /* Registry object list -- not searched for anything */
        ASTOBJ_CONTAINER_INIT(&submwil); /* MWI subscription object list */
@@ -26492,17 +26774,15 @@ static int unload_module(void)
                ast_tcptls_server_stop(&sip_tls_desc);
 
        /* Kill all existing TCP/TLS threads */
-       AST_LIST_LOCK(&threadl);
-       AST_LIST_TRAVERSE_SAFE_BEGIN(&threadl, th, list) {
+       i = ao2_iterator_init(threadt, 0);
+       while ((th = ao2_t_iterator_next(&i, "iterate through tcp threads for 'sip show tcp'"))) {
                pthread_t thread = th->threadid;
                th->stop = 1;
-               AST_LIST_UNLOCK(&threadl);
                pthread_kill(thread, SIGURG);
                pthread_join(thread, NULL);
-               AST_LIST_LOCK(&threadl);
+               ao2_t_ref(th, -1, "decrement ref from iterator");
        }
-       AST_LIST_TRAVERSE_SAFE_END;
-       AST_LIST_UNLOCK(&threadl);
+       ao2_iterator_destroy(&i);
 
        /* Hangup all dialogs if they have an owner */
        i = ao2_iterator_init(dialogs, 0);
@@ -26555,6 +26835,7 @@ static int unload_module(void)
        ao2_t_ref(peers, -1, "unref the peers table");
        ao2_t_ref(peers_by_ip, -1, "unref the peers_by_ip table");
        ao2_t_ref(dialogs, -1, "unref the dialogs table");
+       ao2_t_ref(threadt, -1, "unref the thread table");
 
        clear_sip_domains();
        close(sipsock);
index b6cc9a3..ad04385 100644 (file)
@@ -156,12 +156,14 @@ struct ast_tcptls_session_instance {
 #define LEN_T size_t
 #endif
 
-/*!
- * \brief A generic client routine for a TCP client
- * and starts a thread for handling accept()
- * \version 1.6.1 changed desc parameter to be of ast_tcptls_session_args type
- */
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc);
+/*! 
+  * \brief attempts to connect and start tcptls session, on error the tcptls_session's
+  * ref count is decremented, fd and file are closed, and NULL is returned.
+  */
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session);
+
+/* \brief Creates a client connection's ast_tcptls_session_instance. */
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc);
 
 void *ast_tcptls_server_root(void *);
 
index e92209b..05e59f6 100644 (file)
@@ -125,7 +125,7 @@ static void session_instance_destructor(void *obj)
 *
 * \note must decrement ref count before returning NULL on error
 */
-static void *handle_tls_connection(void *data)
+static void *handle_tcptls_connection(void *data)
 {
        struct ast_tcptls_session_instance *tcptls_session = data;
 #ifdef DO_SSL
@@ -197,6 +197,7 @@ static void *handle_tls_connection(void *data)
                                                ast_log(LOG_ERROR, "Certificate common name did not match (%s)\n", tcptls_session->parent->hostname);
                                                if (peer)
                                                        X509_free(peer);
+                                               close(tcptls_session->fd);
                                                fclose(tcptls_session->f);
                                                ao2_ref(tcptls_session, -1);
                                                return NULL;
@@ -266,7 +267,7 @@ void *ast_tcptls_server_root(void *data)
                tcptls_session->client = 0;
 
                /* This thread is now the only place that controls the single ref to tcptls_session */
-               if (ast_pthread_create_detached_background(&launched, NULL, handle_tls_connection, tcptls_session)) {
+               if (ast_pthread_create_detached_background(&launched, NULL, handle_tcptls_connection, tcptls_session)) {
                        ast_log(LOG_WARNING, "Unable to launch helper thread: %s\n", strerror(errno));
                        close(tcptls_session->fd);
                        ao2_ref(tcptls_session, -1);
@@ -357,9 +358,45 @@ int ast_ssl_setup(struct ast_tls_config *cfg)
        return __ssl_setup(cfg, 0);
 }
 
-struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_args *desc)
+struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_session_instance *tcptls_session)
 {
+       struct ast_tcptls_session_args *desc;
        int flags;
+
+       if (!(desc = tcptls_session->parent)) {
+               goto client_start_error;
+       }
+
+       if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) {
+               ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n",
+                       desc->name,
+                       ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port),
+                       strerror(errno));
+               goto client_start_error;
+       }
+
+       flags = fcntl(desc->accept_fd, F_GETFL);
+       fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK);
+
+       if (desc->tls_cfg) {
+               desc->tls_cfg->enabled = 1;
+               __ssl_setup(desc->tls_cfg, 1);
+       }
+
+       return handle_tcptls_connection(tcptls_session);
+
+client_start_error:
+       close(desc->accept_fd);
+       desc->accept_fd = -1;
+       if (tcptls_session) {
+               ao2_ref(tcptls_session, -1);
+       }
+       return NULL;
+
+}
+
+struct ast_tcptls_session_instance *ast_tcptls_client_create(struct ast_tcptls_session_args *desc)
+{
        int x = 1;
        struct ast_tcptls_session_instance *tcptls_session = NULL;
 
@@ -394,39 +431,16 @@ struct ast_tcptls_session_instance *ast_tcptls_client_start(struct ast_tcptls_se
                }
        }
 
-       if (connect(desc->accept_fd, (const struct sockaddr *) &desc->remote_address, sizeof(desc->remote_address))) {
-               ast_log(LOG_ERROR, "Unable to connect %s to %s:%d: %s\n",
-                       desc->name,
-                       ast_inet_ntoa(desc->remote_address.sin_addr), ntohs(desc->remote_address.sin_port),
-                       strerror(errno));
-               goto error;
-       }
-
        if (!(tcptls_session = ao2_alloc(sizeof(*tcptls_session), session_instance_destructor)))
                goto error;
 
        ast_mutex_init(&tcptls_session->lock);
-
-       flags = fcntl(desc->accept_fd, F_GETFL);
-       fcntl(desc->accept_fd, F_SETFL, flags & ~O_NONBLOCK);
-
+       tcptls_session->client = 1;
        tcptls_session->fd = desc->accept_fd;
        tcptls_session->parent = desc;
        tcptls_session->parent->worker_fn = NULL;
        memcpy(&tcptls_session->remote_address, &desc->remote_address, sizeof(tcptls_session->remote_address));
 
-       tcptls_session->client = 1;
-
-       if (desc->tls_cfg) {
-               desc->tls_cfg->enabled = 1;
-               __ssl_setup(desc->tls_cfg, 1);
-       }
-
-       /* handle_tls_connection controls the single ref to tcptls_session. If
-        * tcptls_session returns NULL then the session has been destroyed */
-       if (!(tcptls_session = handle_tls_connection(tcptls_session)))
-               goto error;
-
        return tcptls_session;
 
 error: