[Pkg-privacy-commits] [onioncat] 177/241: SOCKS connector completely rewritten. Works now without dynamic thread pool. This is more memory efficient.

Intrigeri intrigeri at moszumanska.debian.org
Wed Aug 26 16:17:02 UTC 2015


This is an automated email from the git hooks/post-receive script.

intrigeri pushed a commit to branch upstream-master
in repository onioncat.

commit 3c7e9c0bb4814c435a639b9d747f7e36fdd84113
Author: eagle <eagle at 58e1ccc2-750e-0410-8d0d-f93ca75ab447>
Date:   Thu Mar 26 16:36:44 2009 +0000

    SOCKS connector completely rewritten. Works now without dynamic thread pool. This is more memory efficient.
    
    
    
    git-svn-id: https://www.cypherpunk.at/svn/onioncat/trunk@497 58e1ccc2-750e-0410-8d0d-f93ca75ab447
---
 ChangeLog       |   2 +
 configure       |  22 +-
 glob_id.txt     |  12 ++
 src/ocat.c      |   7 +-
 src/ocat.h      |  15 +-
 src/ocatctrl.c  |  15 +-
 src/ocatsetup.c |   3 +
 src/ocatsocks.c | 645 ++++++++++++++++++++++++++++++++------------------------
 8 files changed, 425 insertions(+), 296 deletions(-)

diff --git a/ChangeLog b/ChangeLog
index b3ff9a4..5361afa 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,6 @@
 * version 0.1.12
+ - socks connector completely rewritten. Works now
+   without dynamic thread pool. This is more memory efficient.
  - Windows branch merges into trunk
  - syslogging implemented
  - idle timeout increased to 3 mins
diff --git a/configure b/configure
index 1309a6b..c382d9c 100755
--- a/configure
+++ b/configure
@@ -1,6 +1,6 @@
 #! /bin/sh
 # Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r493.
+# Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r496.
 #
 # Report bugs to <rahra at cypherpunk.at>.
 #
@@ -596,8 +596,8 @@ SHELL=${CONFIG_SHELL-/bin/sh}
 # Identity of this package.
 PACKAGE_NAME='onioncat'
 PACKAGE_TARNAME='onioncat'
-PACKAGE_VERSION='0.1.12.r493'
-PACKAGE_STRING='onioncat 0.1.12.r493'
+PACKAGE_VERSION='0.1.12.r496'
+PACKAGE_STRING='onioncat 0.1.12.r496'
 PACKAGE_BUGREPORT='rahra at cypherpunk.at'
 
 ac_subst_vars='LTLIBOBJS
@@ -1249,7 +1249,7 @@ if test "$ac_init_help" = "long"; then
   # Omit some internal or obsolete options to make the list less imposing.
   # This message is too long to be a string in the A/UX 3.1 sh.
   cat <<_ACEOF
-\`configure' configures onioncat 0.1.12.r493 to adapt to many kinds of systems.
+\`configure' configures onioncat 0.1.12.r496 to adapt to many kinds of systems.
 
 Usage: $0 [OPTION]... [VAR=VALUE]...
 
@@ -1315,7 +1315,7 @@ fi
 
 if test -n "$ac_init_help"; then
   case $ac_init_help in
-     short | recursive ) echo "Configuration of onioncat 0.1.12.r493:";;
+     short | recursive ) echo "Configuration of onioncat 0.1.12.r496:";;
    esac
   cat <<\_ACEOF
 
@@ -1407,7 +1407,7 @@ fi
 test -n "$ac_init_help" && exit $ac_status
 if $ac_init_version; then
   cat <<\_ACEOF
-onioncat configure 0.1.12.r493
+onioncat configure 0.1.12.r496
 generated by GNU Autoconf 2.63
 
 Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
@@ -1421,7 +1421,7 @@ cat >config.log <<_ACEOF
 This file contains any messages produced by compilers while
 running configure, to aid debugging if configure makes a mistake.
 
-It was created by onioncat $as_me 0.1.12.r493, which was
+It was created by onioncat $as_me 0.1.12.r496, which was
 generated by GNU Autoconf 2.63.  Invocation command line was
 
   $ $0 $@
@@ -2137,7 +2137,7 @@ fi
 
 # Define the identity of the package.
  PACKAGE='onioncat'
- VERSION='0.1.12.r493'
+ VERSION='0.1.12.r496'
 
 
 cat >>confdefs.h <<_ACEOF
@@ -2284,7 +2284,7 @@ ac_config_headers="$ac_config_headers config.h"
 
 
 cat >>confdefs.h <<\_ACEOF
-#define SVN_REVISION "493"
+#define SVN_REVISION "496"
 _ACEOF
 
 
@@ -4803,7 +4803,7 @@ exec 6>&1
 # report actual input values of CONFIG_FILES etc. instead of their
 # values after options handling.
 ac_log="
-This file was extended by onioncat $as_me 0.1.12.r493, which was
+This file was extended by onioncat $as_me 0.1.12.r496, which was
 generated by GNU Autoconf 2.63.  Invocation command line was
 
   CONFIG_FILES    = $CONFIG_FILES
@@ -4866,7 +4866,7 @@ Report bugs to <bug-autoconf at gnu.org>."
 _ACEOF
 cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
 ac_cs_version="\\
-onioncat config.status 0.1.12.r493
+onioncat config.status 0.1.12.r496
 configured by $0, generated by GNU Autoconf 2.63,
   with options \\"`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\"
 
diff --git a/glob_id.txt b/glob_id.txt
index bc2e381..7493155 100644
--- a/glob_id.txt
+++ b/glob_id.txt
@@ -7,3 +7,15 @@ EUI-64: 21b:24ff:fe73:cd0e
 => global ID: 87d87eeb43
 => IPv6: FD87:D87E:EB43::/48
 
+/* 
+   DO NOT USE THIS!
+   IT IS JUST A PERSONAL REMARK...
+generation for I2P
+NTP timestamp: 0xcd6cd83f947d805e
+EUI-64: 21b:24ff:fece:d108
+=> key: cd6cd83f947d805e21b24fffeced108
+=> SHA1: 366168b7aa3fd7326df894b907991b60db4dddb5
+=> global ID: 60db4dddb5
+=> IPv6: FD60:DB4D:DDB5::/48
+*/
+
diff --git a/src/ocat.c b/src/ocat.c
index e34096b..cd0f3f8 100644
--- a/src/ocat.c
+++ b/src/ocat.c
@@ -467,8 +467,11 @@ int main(int argc, char *argv[])
          log_msg(LOG_ERR, "could not reconnect stdio to /dev/null: \"%s\"", strerror(errno));
    }
 
-   // create socks connector thread
-   run_ocat_thread("connector", socks_connector, NULL);
+   // create socks connector thread and communication queue
+   if (pipe(CNF(socksfd)) == -1)
+      log_msg(LOG_EMERG, "couldn't create socks connector pipe: \"%s\"", strerror(errno)), exit(1);
+   run_ocat_thread("connector", socks_connector_sel, NULL);
+
 #ifdef PACKET_QUEUE
    // start packet dequeuer
    run_ocat_thread("dequeuer", packet_dequeuer, NULL);
diff --git a/src/ocat.h b/src/ocat.h
index 431c96a..bd444d4 100644
--- a/src/ocat.h
+++ b/src/ocat.h
@@ -186,7 +186,12 @@
 //! thread stack size (default stack size on OpenBSD is too small)
 #define THREAD_STACK_SIZE 262144
 
+#define SOCKS_NEW 0
 #define SOCKS_CONNECTING 1
+#define SOCKS_4AREQ_SENT 2
+#define SOCKS_4ARESPONSE 3
+#define SOCKS_DELETE 127
+
 #define SOCKS_MAX_RETRY 3
 
 #define E_RT_NOMEM -1
@@ -217,6 +222,8 @@
 //! RECONN_ATTEMPTS must not be faster than MIN_RECONNECT_TIME
 #define MIN_RECONNECT_TIME 30
 
+#define MFD_SET(f,s,m) {FD_SET(f, s); m = f > m ? f : m;}
+
 //! copy an IPv6 address from b to a
 #define IN6_ADDR_COPY(a,b) *((struct in6_addr*)a)=*(struct in6_addr*)b
 
@@ -296,6 +303,8 @@ struct OcatSetup
    struct sockaddr **ctrl_listen;
    int *ctrl_listen_fd;
    int ctrl_listen_cnt;
+   //! communication pipe for socks "selected" connector
+   int socksfd[2];
 };
 
 #ifdef PACKET_QUEUE
@@ -368,6 +377,10 @@ typedef struct SocksQueue
    struct in6_addr addr;
    int state;
    int perm;
+   int fd;
+   time_t restart_time;
+   time_t connect_time;
+   int retry;
 } SocksQueue_t;
 
 //! IPv4 routing table entry
@@ -506,7 +519,6 @@ void packet_forwarder(void);
 void *packet_dequeuer(void *);
 #endif
 void *socket_acceptor(void *);
-void *socks_connector(void *);
 void *socket_cleaner(void *);
 void *ocat_controller(void *);
 void *ctrl_handler(void *);
@@ -577,6 +589,7 @@ uint16_t *malloc_ckbuf(const struct in6_addr *, const struct in6_addr *, uint16_
 void socks_queue(struct in6_addr, int);
 void print_socks_queue(FILE *);
 void sig_socks_connector(void);
+void *socks_connector_sel(void *);
 
 /* ocatlibe.c */
 void oe_close(int);
diff --git a/src/ocatctrl.c b/src/ocatctrl.c
index 737d7ea..2afb46d 100644
--- a/src/ocatctrl.c
+++ b/src/ocatctrl.c
@@ -34,16 +34,20 @@
  */
 void *ctrl_handler(void *p)
 {
-   int fd, c;
+   int fd, c, i;
    FILE *ff, *fo;
    char buf[FRAME_SIZE], addrstr[INET6_ADDRSTRLEN], onionstr[ONION_NAME_SIZE], timestr[32], *s, *tokbuf;
    int rlen, cfd;
    struct tm *tm;
    OcatPeer_t *peer;
    struct in6_addr in6;
+   int pfd[2];
 
    detach_thread();
 
+   if (pipe(pfd) == -1)
+      log_msg(LOG_EMERG, "couldn't create pipe: \"%s\"", strerror(errno)), exit(1);
+
    fd = (int) p;
    if (CNF(config_read))
    {
@@ -239,7 +243,14 @@ void *ctrl_handler(void *p)
       }
       else if (!strcmp(buf, "queue"))
       {
-         print_socks_queue(ff);
+         print_socks_queue((FILE*) pfd[1]);
+         for (;;)
+         {
+            read(pfd[0], buf, 1);
+            if (!buf[0])
+               break;
+            fprintf(ff, "%c", buf[0]);
+         }
       }
       else if (!strcmp(buf, "setup"))
       {
diff --git a/src/ocatsetup.c b/src/ocatsetup.c
index 4534e8d..8e8fd1c 100644
--- a/src/ocatsetup.c
+++ b/src/ocatsetup.c
@@ -97,6 +97,9 @@ struct OcatSetup setup_ =
 #else
    2
 #endif
+   ,
+   // socksfd
+   {-1, -1}
 };
 
 
diff --git a/src/ocatsocks.c b/src/ocatsocks.c
index 2967a3c..c38c27f 100644
--- a/src/ocatsocks.c
+++ b/src/ocatsocks.c
@@ -29,237 +29,188 @@
 
 // SOCKS connector queue vars
 static SocksQueue_t *socks_queue_ = NULL;
-static int socks_connect_cnt_ = 0;
-static int socks_thread_cnt_ = 0;
-static pthread_mutex_t socks_queue_mutex_ = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t socks_queue_cond_ = PTHREAD_COND_INITIALIZER;
 
+#define SOCKS_BUFLEN (sizeof(SocksHdr_t) + ONION_NAME_SIZE + strlen(CNF(usrname)) + 2)
 
-int socks_srv_con(void)
+
+int socks_send_request(const SocksQueue_t *sq)
 {
-   int fd, t, maxfd, so_err;
-   socklen_t err_len;
-   char addr[INET6_ADDRSTRLEN];
-   fd_set cset;
-   struct timeval tv;
+   int len, ret;
+   char buf[SOCKS_BUFLEN], onion[ONION_NAME_SIZE];
+   SocksHdr_t *shdr = (SocksHdr_t*) buf;
 
-   if ((fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) < 0)
-      return E_SOCKS_SOCK;
-   set_nonblock(fd);
+   ipv6tonion(&sq->addr, onion);
+   strlcat(onion, ".onion", sizeof(onion));
+   log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, SOCKS_BUFLEN));
 
-   t = time(NULL);
-   if (connect(fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1)
+   log_debug("doing SOCKS4a handshake");
+   shdr->ver = 4;
+   shdr->cmd = 1;
+   shdr->port = htons(CNF(ocat_dest_port));
+   shdr->addr.s_addr = htonl(0x00000001);
+   memcpy(buf + sizeof(SocksHdr_t), CNF(usrname), strlen(CNF(usrname)) + 1);
+   memcpy(buf + sizeof(SocksHdr_t) + strlen(CNF(usrname)) + 1, onion, strlen(onion) + 1);
+   len = sizeof(SocksHdr_t) + strlen(CNF(usrname)) + strlen(onion) + 2;
+   if ((ret = write(sq->fd, shdr, len)) == -1)
    {
-      if (errno != EINPROGRESS)
-      {
-         log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.", 
-            inet_ntop(CNF(socks_dst)->sin_family, 
-               CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, addr, sizeof(addr)), 
-            ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT);
-         oe_close(fd);
-         sleep(TOR_SOCKS_CONN_TIMEOUT);
-         return E_SOCKS_CONN;
-      }
-      log_debug("connection in progress");
+      log_msg(LOG_ERR, "error writing %d bytes to fd %d: \"%s\"", len, sq->fd, strerror(errno));
+      return -1;
    }
-   else
+   if (ret < len)
    {
-      log_debug("connected");
-      return fd;
+      log_msg(LOG_ERR, "SOCKS request truncated to %d of %d bytes", ret, len);
+      return -1;
    }
+   log_debug("SOCKS request sent successfully");
+   return 0;
+}
 
-   for (;;)
-   {
-      if (term_req())
-      {
-         oe_close(fd);
-         return -1;
-      }
-
-      FD_ZERO(&cset);
-      FD_SET(fd, &cset);
-      maxfd = fd;
-
-      set_select_timeout(&tv);
-      log_debug("selecting (maxfd = %d)", maxfd);
-      if ((maxfd = select(maxfd + 1, NULL, &cset, NULL, &tv)) == -1)
-      {
-         log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
-         continue;
-      }
-      log_debug("select returned %d", maxfd);
 
-      if (!maxfd)
-      {
-         log_debug("select timed out, restarting");
-         continue;
-      }
+int socks_rec_response(SocksQueue_t *sq)
+{
+   SocksHdr_t shdr;
+   int ret, len;
 
-      if (!FD_ISSET(fd, &cset))
-      {
-         log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd);
-         continue;
-      }
+   len = sizeof(SocksHdr_t);
+   if ((ret = read(sq->fd, &shdr, len)) == -1)
+   {
+      log_msg(LOG_ERR, "reading SOCKS response on fd %d failed: \"%s\"", sq->fd, strerror(errno));
+      return -1;
+   }
+   if (ret < len)
+   {
+      log_msg(LOG_ERR, "SOCKS response truncated to %d of %d bytes", ret, len);
+      return -1;
+   }
 
-      // test if connect() worked
-      log_debug("check socket error");
-      err_len = sizeof(so_err);
-      if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1)
-      {
-         log_msg(LOG_ERR, "getsockopt failed: \"%s\"", strerror(errno));
-         oe_close(fd);
-         return -1;
-      }
-      if (so_err)
-      {
-         log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err));
-         oe_close(fd);
-         return -1;
-      }
-      // everything seems to be ok, break loop
-      break;
-   } // for (;;)
+   log_debug("SOCKS response received");
+   if (shdr.ver || (shdr.cmd != 90))
+   {
+      log_msg(LOG_ERR, "SOCKS request failed, reason = %d", shdr.cmd);
+      return -1;
+   }
 
-   log_debug("connected");
-   return fd;
+   log_msg(LOG_INFO | LOG_FCONN, "SOCKS connection successfully opened on fd %d", sq->fd);
+   return 0;
 }
 
 
-int socks_connect(const SocksQueue_t *sq)
+int socks_activate_peer(SocksQueue_t *sq)
 {
-   int fd, t, len;
-   char buf[FRAME_SIZE], onion[ONION_NAME_SIZE];
-   SocksHdr_t *shdr = (SocksHdr_t*) buf;
    OcatPeer_t *peer;
-   int maxfd;
-   fd_set rset;
-   struct timeval tv;
 
-   ipv6tonion(&sq->addr, onion);
-   strlcat(onion, ".onion", sizeof(onion));
+   insert_peer(sq->fd, sq, time(NULL) - sq->connect_time);
 
-   log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, FRAME_SIZE));
-   t = time(NULL);
-   if ((fd = socks_srv_con()) < 0)
-      return fd;
+   // Send first keepalive immediately
+   lock_peers();
+   if ((peer = search_peer(&sq->addr)))
+      lock_peer(peer);
+   else
+      log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", sq->fd);
+   unlock_peers();
+   if (peer)
+   {
+      send_keepalive(peer);
+      unlock_peer(peer);
+   }
 
-   log_debug("doing SOCKS4a handshake");
+   return 0;
+}
 
-   shdr->ver = 4;
-   shdr->cmd = 1;
-   shdr->port = htons(CNF(ocat_dest_port));
-   shdr->addr.s_addr = htonl(0x00000001);
-   /*
-   strlcpy(buf + sizeof(SocksHdr_t), usrname_, strlen(usrname_) + 1);
-   strlcpy(buf + sizeof(SocksHdr_t) + strlen(usrname_) + 1, onion, sizeof(onion));
-   */
-   memcpy(buf + sizeof(SocksHdr_t), CNF(usrname), strlen(CNF(usrname)) + 1);
-   memcpy(buf + sizeof(SocksHdr_t) + strlen(CNF(usrname)) + 1, onion, strlen(onion) + 1);
-   len = sizeof(SocksHdr_t) + strlen(CNF(usrname)) + strlen(onion) + 2;
-   if (write(fd, shdr, len) != len)
-      // FIXME: there should be some additional error handling
-      log_msg(LOG_ERR, "couldn't write %d bytes to SOCKS connection %d", len, fd);
-   log_debug("connect request sent");
 
-   for (;;)
-   {
-      if (term_req())
-      {
-         oe_close(fd);
-         return E_SOCKS_TERMREQ;
-      }
+void socks_pipe_request(const SocksQueue_t *sq)
+{
+   fd_set wset;
+   int maxfd;
+   int len = sizeof(*sq), ret;
 
-      FD_ZERO(&rset);
-      FD_SET(fd, &rset);
-      maxfd = fd;
+   FD_ZERO(&wset);
+   FD_SET(CNF(socksfd[1]), &wset);
+   maxfd = CNF(socksfd[1]);
+
+   log_debug("selecting until socks request pipe gets ready");
 
-      set_select_timeout(&tv);
       log_debug("selecting (maxfd = %d)", maxfd);
-      if ((maxfd = select(maxfd + 1, &rset, NULL, NULL, &tv)) == -1)
+      if ((maxfd = select(maxfd + 1, NULL, &wset, NULL, NULL)) == -1)
       {
          log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
-         continue;
+         return;
       }
-
       log_debug("select returned %d", maxfd);
-      if (!maxfd)
-      {
-         log_debug("select timed out, restarting");
-         continue;
-      }
 
-      if (FD_ISSET(fd, &rset))
-         break;
 
-      log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd);
-      continue;
-   } // for (;;)
+      if (maxfd && FD_ISSET(CNF(socksfd[1]), &wset))
+      {
 
-   if (read(fd, shdr, sizeof(SocksHdr_t)) < sizeof(SocksHdr_t))
+   log_debug("writing %d bytes to fd %d", len, CNF(socksfd[1]));
+   if ((ret = write(CNF(socksfd[1]), sq, len) == -1))
    {
-      log_msg(LOG_ERR | LOG_FCONN, "short read, closing.");
-      oe_close(fd);
-      return E_SOCKS_REQ;
+      log_msg(LOG_WARNING, "error writing to SOCKS request pipe fd %d: \"%s\"", CNF(socksfd[1]), strerror(errno));
    }
-   log_debug("SOCKS response received");
-
-   if (shdr->ver || (shdr->cmd != 90))
+   else if (ret < len)
    {
-      log_msg(LOG_ERR, "request failed, reason = %d", shdr->cmd);
-      oe_close(fd);
-      return E_SOCKS_RQFAIL;
+      log_msg(LOG_WARNING, "write to SOCKS request pipe fd %d truncated to %d bytes of %d", CNF(socksfd[1]), ret, len);
    }
-   log_msg(LOG_INFO | LOG_FCONN, "connection to %s successfully opened on fd %d", onion, fd);
-
-   insert_peer(fd, sq, time(NULL) - t);
-
-   // Send first keepalive immediately
-   lock_peers();
-   if ((peer = search_peer(&sq->addr)))
-      lock_peer(peer);
    else
-      log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", fd);
-   unlock_peers();
-   if (peer)
    {
-      send_keepalive(peer);
-      unlock_peer(peer);
+      log_debug("wrote %d bytes to SOCKS request pipe fd %d", len, CNF(socksfd[1]));
    }
-
-   // return new file descriptor
-   return fd;
+      }
+      else
+         log_msg(LOG_WARNING, "fd %d not in write set", CNF(socksfd[1]));
 }
 
 
 void sig_socks_connector(void)
 {
-   pthread_cond_signal(&socks_queue_cond_);
+   SocksQueue_t sq;
+
+   memset(&sq, 0, sizeof(sq));
+   socks_pipe_request(&sq);
 }
 
 
-void socks_queue(struct in6_addr addr, int perm)
+/*! Add and link a SOCKS request to the SOCKS queue.
+ *  @param sq Request structure to add.
+ */
+void socks_enqueue(const SocksQueue_t *sq)
 {
    SocksQueue_t *squeue;
 
-   pthread_mutex_lock(&socks_queue_mutex_);
+   log_debug("queueing new SOCKS connection request");
+   if (!(squeue = malloc(sizeof(SocksQueue_t))))
+      log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
+   memcpy(squeue, sq, sizeof(*squeue));
+
+   squeue->next = socks_queue_;
+   socks_queue_ = squeue;
+}
+
+
+/*! Send a SOCKS request to the request pipe in order to get
+ *  added to the SOCKS queue with socks_enqueue()
+ *  @param addr IPv6 address to be requested
+ *  @param perm 1 if connection should kept opened inifitely after successful request, 0 else.
+ */
+void socks_queue(struct in6_addr addr, int perm)
+{
+   SocksQueue_t *squeue, sq;
+
    for (squeue = socks_queue_; squeue; squeue = squeue->next)
       if (IN6_ARE_ADDR_EQUAL(&squeue->addr, &addr))
          break;
+
    if (!squeue)
    {
       log_debug("queueing new SOCKS connection request");
-      if (!(squeue = calloc(1, sizeof(SocksQueue_t))))
-         log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
-      IN6_ADDR_COPY(&squeue->addr, &addr);
-      squeue->perm = perm;
-      squeue->next = socks_queue_;
-      socks_queue_ = squeue;
+      memset(&sq, 0, sizeof(sq));
+      IN6_ADDR_COPY(&sq.addr, &addr);
+      sq.perm = perm;
       log_debug("signalling connector");
-      sig_socks_connector();
+      socks_pipe_request(&sq);
    }
    else
       log_debug("connection already exists, not queueing SOCKS connection");
-   pthread_mutex_unlock(&socks_queue_mutex_);
 }
 
 
@@ -281,121 +232,22 @@ void socks_unqueue(SocksQueue_t *squeue)
 }
 
 
-void *socks_connector(void *p)
+void print_socks_queue(FILE *f)
 {
-   OcatPeer_t *peer;
-   SocksQueue_t *squeue;
-   int i, ps, run = 1, t_abs, t_diff;
-   char thn[THREAD_NAME_LEN] = "cn:", on[ONION_NAME_LEN];
+   SocksQueue_t sq;
 
-   detach_thread();
-
-   pthread_mutex_lock(&socks_queue_mutex_);
-   socks_thread_cnt_++;
-   log_debug("%d connector threads running", socks_thread_cnt_);
-   pthread_mutex_unlock(&socks_queue_mutex_);
-
-   while (run)
-   {
-      pthread_mutex_lock(&socks_queue_mutex_);
-      for (;;)
-      {
-         for (squeue = socks_queue_; squeue; squeue = squeue->next)
-            if (!squeue->state)
-            {
-               log_debug("unhandled queue entry found");
-               break;
-            }
-
-         if (squeue)
-            break;
-
-         log_debug("waiting for new queue entry");
-         pthread_cond_wait(&socks_queue_cond_, &socks_queue_mutex_);
-         // check termination request
-         if (term_req())
-         {
-            pthread_mutex_unlock(&socks_queue_mutex_);
-            return NULL;
-         }
-      }
-
-      // spawn spare thread if there is no one left
-      log_debug("change queue element state to CONNECTING");
-      squeue->state = SOCKS_CONNECTING;
-      socks_connect_cnt_++;
-      if (socks_thread_cnt_ <= socks_connect_cnt_)
-      {
-         log_debug("spawning new connector threads");
-         run_ocat_thread("connector", socks_connector, NULL);
-      }
-      pthread_mutex_unlock(&socks_queue_mutex_);
-
-      // changing thread name
-      log_debug("changing thread name");
-      ipv6tonion(&squeue->addr, on);
-      strlcat(thn, on, THREAD_NAME_LEN);
-      set_thread_name(thn);
-
-      // search for existing peer
-      lock_peers();
-      peer = search_peer(&squeue->addr);
-      unlock_peers();
-
-      // connect via SOCKS if no peer exists
-      if (!peer)
-         for (i = 0, ps = -1, t_abs = 0; ((i < SOCKS_MAX_RETRY) || squeue->perm) && ps < 0; i++)
-         {
-            // FIXME: term_req should be checked here
-
-            // every third connection attempt
-            if (!(i % RECONN_ATTEMPTS))
-            {
-               // check that it does not reconnect too fast
-               t_diff = time(NULL) - t_abs;
-               if (t_diff < MIN_RECONNECT_TIME)
-               {
-                  // and sleep if necessary
-                  log_msg(LOG_WARNING, "reconnecting too fast. sleeping %d seconds", MIN_RECONNECT_TIME - t_diff);
-                  sleep(MIN_RECONNECT_TIME - t_diff);
-               }
-               t_abs = time(NULL);
-            }
-            log_debug("%d. SOCKS connection attempt", i + 1);
-            ps = socks_connect(squeue);
-         }
-      else
-         log_msg(LOG_INFO, "peer already exists, ignoring");
-
-      // remove request from queue after connect
-      log_debug("removing destination from SOCKS queue");
-      pthread_mutex_lock(&socks_queue_mutex_);
-
-      socks_unqueue(squeue);
-      socks_connect_cnt_--;
-
-      // if there are more threads then pending connections
-      // terminate thread
-      if (socks_connect_cnt_ < socks_thread_cnt_ - 1)
-      {
-         log_debug("going to terminate connector thread");
-         socks_thread_cnt_--;
-         run = 0;
-      }
-      pthread_mutex_unlock(&socks_queue_mutex_);
-   }
-   return NULL;
+   memset(&sq, 0, sizeof(sq));
+   sq.next = (SocksQueue_t*) f;
+   socks_pipe_request(&sq);
 }
 
 
-void print_socks_queue(FILE *f)
+void socks_output_queue(FILE *f)
 {
    int i;
-   char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN];
+   char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN], buf[SIZE_1K];
    SocksQueue_t *squeue;
 
-   pthread_mutex_lock(&socks_queue_mutex_);
-
    for (squeue = socks_queue_, i = 0; squeue; squeue = squeue->next, i++)
    {
       if (!inet_ntop(AF_INET6, &squeue->addr, addrstr, INET6_ADDRSTRLEN))
@@ -404,20 +256,27 @@ void print_socks_queue(FILE *f)
          strlcpy(addrstr, "ERROR", INET6_ADDRSTRLEN);
       }
 
-      fprintf(f, "%d %39s %s.onion %s(%d) %s(%d)\n",
+      snprintf(buf, SIZE_1K, "%d: %39s, %s.onion, state = %d, %s(%d), retry = %d, connect_time = %ld, restart_time = %ld",
             i, 
             addrstr, 
             ipv6tonion(&squeue->addr, onstr),
-            squeue->state == SOCKS_CONNECTING ? "CONNECTING" : "QUEUED", 
             squeue->state,
             squeue->perm ? "PERMANENT" : "TEMPORARY",
-            squeue->perm);
+            squeue->perm,
+            squeue->retry,
+            squeue->connect_time,
+            squeue->restart_time
+            );
+//      log_debug("%s", buf);
+      write((int) f, buf, strlen(buf));
+      write((int) f, "\n", 1);
    }
-
-   pthread_mutex_unlock(&socks_queue_mutex_);
+   write((int) f, "\0", 1);
+   log_debug("socks_output_queue() finished");
 }
 
 
+#if 0
 int socks5_connect(const SocksQueue_t *sq)
 {
    char buf[256 + sizeof(uint16_t) + sizeof(Socks5Hdr_t)];
@@ -441,4 +300,230 @@ int socks5_connect(const SocksQueue_t *sq)
 
    return fd;
 }
+#endif
+
+
+int socks_tcp_connect(int fd, struct sockaddr *addr, int len)
+{
+   char astr[INET6_ADDRSTRLEN];
+   if (connect(fd, addr, len) == -1)
+   {
+      if (errno != EINPROGRESS)
+      {
+         log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.", 
+            inet_ntop(CNF(socks_dst)->sin_family, 
+               CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, astr, sizeof(astr)), 
+            ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT);
+         return -1;
+      }
+      log_debug("connection in progress");
+   }
+   else
+      log_debug("connected");
+
+   return 0;
+}
+
+
+void socks_reschedule(SocksQueue_t *squeue)
+{
+   log_msg(LOG_ERR, "rescheduling SOCKS request");
+   if (squeue->fd > 0)
+   {
+      oe_close(squeue->fd);
+      squeue->fd = 0;
+   }
+   squeue->restart_time = time(NULL) + TOR_SOCKS_CONN_TIMEOUT;
+   squeue->state = SOCKS_NEW;
+}
+
+ 
+void *socks_connector_sel(void *p)
+{
+   fd_set rset, wset;
+   int maxfd = 0, len, so_err;
+   SocksQueue_t *squeue, sq;
+   time_t t;
+   struct timeval tv;
+   socklen_t err_len;
+
+   for (;;)
+   {
+      if (term_req())
+         return NULL;
+
+      FD_ZERO(&rset);
+      FD_ZERO(&wset);
+      MFD_SET(CNF(socksfd[0]), &rset, maxfd);
+      t = time(NULL);
+
+      for (squeue = socks_queue_; squeue; squeue = squeue->next)
+      {
+         switch (squeue->state)
+         {
+            case SOCKS_NEW:
+               /*if (!squeue->fd)
+               {
+                  log_msg(LOG_CRIT, "SOCKS_NEW and fd = %d, but should be 0", squeue->fd);
+                  squeue->state = SOCKS_DELETE;
+                  continue;
+               }*/
+
+               if (t < squeue->restart_time)
+               {
+                  log_debug("SOCKS request is scheduled for connection not before %ds", squeue->restart_time - t);
+                  continue;
+               }
+
+               // check and increase retry counter
+               squeue->retry++;
+               if (!squeue->perm && (squeue->retry > SOCKS_MAX_RETRY))
+               {
+                  log_msg(LOG_NOTICE, "temporary request failed %d times and will be removed", squeue->retry - 1);
+                  squeue->state = SOCKS_DELETE;
+                  continue;
+               }
+
+               log_debug("creating socket for unconnected SOCKS request");
+               if ((squeue->fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) == -1)
+               {
+                  log_msg(LOG_ERR, "cannot create socket for new SOCKS request: \"%s\"", strerror(errno));
+                  continue;
+               }
+
+               set_nonblock(squeue->fd);
+               log_debug("queueing fd %d for connect", squeue->fd);
+               squeue->connect_time = t;
+               if (socks_tcp_connect(squeue->fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1)
+               {
+                  socks_reschedule(squeue);
+                  continue;
+               }
+
+               squeue->state = SOCKS_CONNECTING;
+               MFD_SET(squeue->fd, &wset, maxfd);
+
+               break;
+
+            case SOCKS_4AREQ_SENT:
+               MFD_SET(squeue->fd, &rset, maxfd);
+               break;
+         }
+      }
+
+      // select all file descriptors
+      set_select_timeout(&tv);
+      log_debug("selecting (maxfd = %d)", maxfd);
+      if ((maxfd = select(maxfd + 1, &rset, &wset, NULL, &tv)) == -1)
+      {
+         log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
+         continue;
+      }
+      log_debug("select returned %d", maxfd);
+
+      // check socks request pipe
+      if (FD_ISSET(CNF(socksfd[0]), &rset))
+      {
+         maxfd--;
+         if ((len = read(CNF(socksfd[0]), &sq, sizeof(sq))) == -1)
+            log_msg(LOG_ERR, "failed to read from SOCKS request pipe, fd = %d: \"%s\"", 
+                  CNF(socksfd[0]), strerror(errno));
+         if (len < sizeof(sq))
+            log_msg(LOG_ERR, "read from SOCKS request pipe truncated to %d of %d bytes, ignoring.", 
+                  len, sizeof(sq));
+         else
+         {
+            log_debug("received %d bytes on SOCKS request pipe fd %d", len, CNF(socksfd[0]));
+            if (sq.next)
+            {
+               log_debug("output of SOCKS request queue triggered");
+               socks_output_queue((FILE*) sq.next);
+            }
+            else if (IN6_IS_ADDR_UNSPECIFIED(&sq.addr))
+            {
+               log_debug("termination request on SOCKS request queue received");
+            }
+            else
+            {
+               log_debug("SOCKS queuing request received");
+               socks_enqueue(&sq);
+            }
+         }
+      }
+
+      // handle all other file descriptors
+      t = time(NULL);
+      for (squeue = socks_queue_; maxfd && squeue; squeue = squeue->next)
+      {
+         // check write set, this is valid after connect()
+         if (FD_ISSET(squeue->fd, &wset))
+         {
+            maxfd--;
+            if (squeue->state == SOCKS_CONNECTING)
+            {
+               // test if connect() worked
+               log_debug("check socket error");
+               err_len = sizeof(so_err);
+               if (getsockopt(squeue->fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1)
+               {
+                  log_msg(LOG_ERR, "getsockopt failed: \"%s\", rescheduling request", strerror(errno));
+                  socks_reschedule(squeue);
+                  continue;
+               }
+               if (so_err)
+               {
+                  log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err));
+                  socks_reschedule(squeue);
+                  continue;
+               }
+               // everything seems to be ok, now check request status
+               if (socks_send_request(squeue) == -1)
+               {
+                  log_msg(LOG_ERR, "SOCKS request failed");
+                  socks_reschedule(squeue);
+                  continue;
+               }
+               // request successfully sent, advance state machine
+               squeue->state = SOCKS_4AREQ_SENT;
+            }
+            else
+               log_debug("unknown state %d in write set", squeue->state);
+         }
+
+         // check read set, this is valid after write, i.e. receiving SOCKS response
+         if (FD_ISSET(squeue->fd, &rset))
+         {
+            maxfd--;
+            if (squeue->state == SOCKS_4AREQ_SENT)
+            {
+               if (socks_rec_response(squeue) == -1)
+               {
+                  socks_reschedule(squeue);
+                  continue;
+               }
+               // success
+               log_debug("activating peer fd %d", squeue->fd);
+               socks_activate_peer(squeue);
+               squeue->state = SOCKS_DELETE;
+            }
+            else
+               log_debug("unknown state %d in read set", squeue->state);
+         }
+      }
+
+      // delete requests from queue which are marked for deletion
+      for (squeue = socks_queue_; squeue; squeue = squeue->next)
+         if (squeue->state == SOCKS_DELETE)
+         {
+            socks_unqueue(squeue);
+            // restart loop
+            squeue = socks_queue_;
+            if (!squeue)
+            {
+               log_debug("last entry deleted, breaking loop");
+               break;
+            }
+         }
+   }
+}
 

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/onioncat.git



More information about the Pkg-privacy-commits mailing list