[Pkg-privacy-commits] [onioncat] 177/241: SOCKS connector completely rewritten. Works now without dynamic thread pool. This is more memory efficient.
Intrigeri
intrigeri at moszumanska.debian.org
Wed Aug 26 16:17:02 UTC 2015
This is an automated email from the git hooks/post-receive script.
intrigeri pushed a commit to branch upstream-master
in repository onioncat.
commit 3c7e9c0bb4814c435a639b9d747f7e36fdd84113
Author: eagle <eagle at 58e1ccc2-750e-0410-8d0d-f93ca75ab447>
Date: Thu Mar 26 16:36:44 2009 +0000
SOCKS connector completely rewritten. Works now without dynamic thread pool. This is more memory efficient.
git-svn-id: https://www.cypherpunk.at/svn/onioncat/trunk@497 58e1ccc2-750e-0410-8d0d-f93ca75ab447
---
ChangeLog | 2 +
configure | 22 +-
glob_id.txt | 12 ++
src/ocat.c | 7 +-
src/ocat.h | 15 +-
src/ocatctrl.c | 15 +-
src/ocatsetup.c | 3 +
src/ocatsocks.c | 645 ++++++++++++++++++++++++++++++++------------------------
8 files changed, 425 insertions(+), 296 deletions(-)
diff --git a/ChangeLog b/ChangeLog
index b3ff9a4..5361afa 100644
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,4 +1,6 @@
* version 0.1.12
+ - socks connector completely rewritten. Works now
+ without dynamic thread pool. This is more memory efficient.
- Windows branch merges into trunk
- syslogging implemented
- idle timeout increased to 3 mins
diff --git a/configure b/configure
index 1309a6b..c382d9c 100755
--- a/configure
+++ b/configure
@@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
-# Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r493.
+# Generated by GNU Autoconf 2.63 for onioncat 0.1.12.r496.
#
# Report bugs to <rahra at cypherpunk.at>.
#
@@ -596,8 +596,8 @@ SHELL=${CONFIG_SHELL-/bin/sh}
# Identity of this package.
PACKAGE_NAME='onioncat'
PACKAGE_TARNAME='onioncat'
-PACKAGE_VERSION='0.1.12.r493'
-PACKAGE_STRING='onioncat 0.1.12.r493'
+PACKAGE_VERSION='0.1.12.r496'
+PACKAGE_STRING='onioncat 0.1.12.r496'
PACKAGE_BUGREPORT='rahra at cypherpunk.at'
ac_subst_vars='LTLIBOBJS
@@ -1249,7 +1249,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
-\`configure' configures onioncat 0.1.12.r493 to adapt to many kinds of systems.
+\`configure' configures onioncat 0.1.12.r496 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@@ -1315,7 +1315,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
- short | recursive ) echo "Configuration of onioncat 0.1.12.r493:";;
+ short | recursive ) echo "Configuration of onioncat 0.1.12.r496:";;
esac
cat <<\_ACEOF
@@ -1407,7 +1407,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
-onioncat configure 0.1.12.r493
+onioncat configure 0.1.12.r496
generated by GNU Autoconf 2.63
Copyright (C) 1992, 1993, 1994, 1995, 1996, 1998, 1999, 2000, 2001,
@@ -1421,7 +1421,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
-It was created by onioncat $as_me 0.1.12.r493, which was
+It was created by onioncat $as_me 0.1.12.r496, which was
generated by GNU Autoconf 2.63. Invocation command line was
$ $0 $@
@@ -2137,7 +2137,7 @@ fi
# Define the identity of the package.
PACKAGE='onioncat'
- VERSION='0.1.12.r493'
+ VERSION='0.1.12.r496'
cat >>confdefs.h <<_ACEOF
@@ -2284,7 +2284,7 @@ ac_config_headers="$ac_config_headers config.h"
cat >>confdefs.h <<\_ACEOF
-#define SVN_REVISION "493"
+#define SVN_REVISION "496"
_ACEOF
@@ -4803,7 +4803,7 @@ exec 6>&1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
-This file was extended by onioncat $as_me 0.1.12.r493, which was
+This file was extended by onioncat $as_me 0.1.12.r496, which was
generated by GNU Autoconf 2.63. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@@ -4866,7 +4866,7 @@ Report bugs to <bug-autoconf at gnu.org>."
_ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_version="\\
-onioncat config.status 0.1.12.r493
+onioncat config.status 0.1.12.r496
configured by $0, generated by GNU Autoconf 2.63,
with options \\"`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`\\"
diff --git a/glob_id.txt b/glob_id.txt
index bc2e381..7493155 100644
--- a/glob_id.txt
+++ b/glob_id.txt
@@ -7,3 +7,15 @@ EUI-64: 21b:24ff:fe73:cd0e
=> global ID: 87d87eeb43
=> IPv6: FD87:D87E:EB43::/48
+/*
+ DO NOT USE THIS!
+ IT IS JUST A PERSONAL REMARK...
+generation for I2P
+NTP timestamp: 0xcd6cd83f947d805e
+EUI-64: 21b:24ff:fece:d108
+=> key: cd6cd83f947d805e21b24fffeced108
+=> SHA1: 366168b7aa3fd7326df894b907991b60db4dddb5
+=> global ID: 60db4dddb5
+=> IPv6: FD60:DB4D:DDB5::/48
+*/
+
diff --git a/src/ocat.c b/src/ocat.c
index e34096b..cd0f3f8 100644
--- a/src/ocat.c
+++ b/src/ocat.c
@@ -467,8 +467,11 @@ int main(int argc, char *argv[])
log_msg(LOG_ERR, "could not reconnect stdio to /dev/null: \"%s\"", strerror(errno));
}
- // create socks connector thread
- run_ocat_thread("connector", socks_connector, NULL);
+ // create socks connector thread and communication queue
+ if (pipe(CNF(socksfd)) == -1)
+ log_msg(LOG_EMERG, "couldn't create socks connector pipe: \"%s\"", strerror(errno)), exit(1);
+ run_ocat_thread("connector", socks_connector_sel, NULL);
+
#ifdef PACKET_QUEUE
// start packet dequeuer
run_ocat_thread("dequeuer", packet_dequeuer, NULL);
diff --git a/src/ocat.h b/src/ocat.h
index 431c96a..bd444d4 100644
--- a/src/ocat.h
+++ b/src/ocat.h
@@ -186,7 +186,12 @@
//! thread stack size (default stack size on OpenBSD is too small)
#define THREAD_STACK_SIZE 262144
+#define SOCKS_NEW 0
#define SOCKS_CONNECTING 1
+#define SOCKS_4AREQ_SENT 2
+#define SOCKS_4ARESPONSE 3
+#define SOCKS_DELETE 127
+
#define SOCKS_MAX_RETRY 3
#define E_RT_NOMEM -1
@@ -217,6 +222,8 @@
//! RECONN_ATTEMPTS must not be faster than MIN_RECONNECT_TIME
#define MIN_RECONNECT_TIME 30
+#define MFD_SET(f,s,m) {FD_SET(f, s); m = f > m ? f : m;}
+
//! copy an IPv6 address from b to a
#define IN6_ADDR_COPY(a,b) *((struct in6_addr*)a)=*(struct in6_addr*)b
@@ -296,6 +303,8 @@ struct OcatSetup
struct sockaddr **ctrl_listen;
int *ctrl_listen_fd;
int ctrl_listen_cnt;
+ //! communication pipe for socks "selected" connector
+ int socksfd[2];
};
#ifdef PACKET_QUEUE
@@ -368,6 +377,10 @@ typedef struct SocksQueue
struct in6_addr addr;
int state;
int perm;
+ int fd;
+ time_t restart_time;
+ time_t connect_time;
+ int retry;
} SocksQueue_t;
//! IPv4 routing table entry
@@ -506,7 +519,6 @@ void packet_forwarder(void);
void *packet_dequeuer(void *);
#endif
void *socket_acceptor(void *);
-void *socks_connector(void *);
void *socket_cleaner(void *);
void *ocat_controller(void *);
void *ctrl_handler(void *);
@@ -577,6 +589,7 @@ uint16_t *malloc_ckbuf(const struct in6_addr *, const struct in6_addr *, uint16_
void socks_queue(struct in6_addr, int);
void print_socks_queue(FILE *);
void sig_socks_connector(void);
+void *socks_connector_sel(void *);
/* ocatlibe.c */
void oe_close(int);
diff --git a/src/ocatctrl.c b/src/ocatctrl.c
index 737d7ea..2afb46d 100644
--- a/src/ocatctrl.c
+++ b/src/ocatctrl.c
@@ -34,16 +34,20 @@
*/
void *ctrl_handler(void *p)
{
- int fd, c;
+ int fd, c, i;
FILE *ff, *fo;
char buf[FRAME_SIZE], addrstr[INET6_ADDRSTRLEN], onionstr[ONION_NAME_SIZE], timestr[32], *s, *tokbuf;
int rlen, cfd;
struct tm *tm;
OcatPeer_t *peer;
struct in6_addr in6;
+ int pfd[2];
detach_thread();
+ if (pipe(pfd) == -1)
+ log_msg(LOG_EMERG, "couldn't create pipe: \"%s\"", strerror(errno)), exit(1);
+
fd = (int) p;
if (CNF(config_read))
{
@@ -239,7 +243,14 @@ void *ctrl_handler(void *p)
}
else if (!strcmp(buf, "queue"))
{
- print_socks_queue(ff);
+ print_socks_queue((FILE*) pfd[1]);
+ for (;;)
+ {
+ read(pfd[0], buf, 1);
+ if (!buf[0])
+ break;
+ fprintf(ff, "%c", buf[0]);
+ }
}
else if (!strcmp(buf, "setup"))
{
diff --git a/src/ocatsetup.c b/src/ocatsetup.c
index 4534e8d..8e8fd1c 100644
--- a/src/ocatsetup.c
+++ b/src/ocatsetup.c
@@ -97,6 +97,9 @@ struct OcatSetup setup_ =
#else
2
#endif
+ ,
+ // socksfd
+ {-1, -1}
};
diff --git a/src/ocatsocks.c b/src/ocatsocks.c
index 2967a3c..c38c27f 100644
--- a/src/ocatsocks.c
+++ b/src/ocatsocks.c
@@ -29,237 +29,188 @@
// SOCKS connector queue vars
static SocksQueue_t *socks_queue_ = NULL;
-static int socks_connect_cnt_ = 0;
-static int socks_thread_cnt_ = 0;
-static pthread_mutex_t socks_queue_mutex_ = PTHREAD_MUTEX_INITIALIZER;
-static pthread_cond_t socks_queue_cond_ = PTHREAD_COND_INITIALIZER;
+#define SOCKS_BUFLEN (sizeof(SocksHdr_t) + ONION_NAME_SIZE + strlen(CNF(usrname)) + 2)
-int socks_srv_con(void)
+
+int socks_send_request(const SocksQueue_t *sq)
{
- int fd, t, maxfd, so_err;
- socklen_t err_len;
- char addr[INET6_ADDRSTRLEN];
- fd_set cset;
- struct timeval tv;
+ int len, ret;
+ char buf[SOCKS_BUFLEN], onion[ONION_NAME_SIZE];
+ SocksHdr_t *shdr = (SocksHdr_t*) buf;
- if ((fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) < 0)
- return E_SOCKS_SOCK;
- set_nonblock(fd);
+ ipv6tonion(&sq->addr, onion);
+ strlcat(onion, ".onion", sizeof(onion));
+ log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, SOCKS_BUFLEN));
- t = time(NULL);
- if (connect(fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1)
+ log_debug("doing SOCKS4a handshake");
+ shdr->ver = 4;
+ shdr->cmd = 1;
+ shdr->port = htons(CNF(ocat_dest_port));
+ shdr->addr.s_addr = htonl(0x00000001);
+ memcpy(buf + sizeof(SocksHdr_t), CNF(usrname), strlen(CNF(usrname)) + 1);
+ memcpy(buf + sizeof(SocksHdr_t) + strlen(CNF(usrname)) + 1, onion, strlen(onion) + 1);
+ len = sizeof(SocksHdr_t) + strlen(CNF(usrname)) + strlen(onion) + 2;
+ if ((ret = write(sq->fd, shdr, len)) == -1)
{
- if (errno != EINPROGRESS)
- {
- log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.",
- inet_ntop(CNF(socks_dst)->sin_family,
- CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, addr, sizeof(addr)),
- ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT);
- oe_close(fd);
- sleep(TOR_SOCKS_CONN_TIMEOUT);
- return E_SOCKS_CONN;
- }
- log_debug("connection in progress");
+ log_msg(LOG_ERR, "error writing %d bytes to fd %d: \"%s\"", len, sq->fd, strerror(errno));
+ return -1;
}
- else
+ if (ret < len)
{
- log_debug("connected");
- return fd;
+ log_msg(LOG_ERR, "SOCKS request truncated to %d of %d bytes", ret, len);
+ return -1;
}
+ log_debug("SOCKS request sent successfully");
+ return 0;
+}
- for (;;)
- {
- if (term_req())
- {
- oe_close(fd);
- return -1;
- }
-
- FD_ZERO(&cset);
- FD_SET(fd, &cset);
- maxfd = fd;
-
- set_select_timeout(&tv);
- log_debug("selecting (maxfd = %d)", maxfd);
- if ((maxfd = select(maxfd + 1, NULL, &cset, NULL, &tv)) == -1)
- {
- log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
- continue;
- }
- log_debug("select returned %d", maxfd);
- if (!maxfd)
- {
- log_debug("select timed out, restarting");
- continue;
- }
+int socks_rec_response(SocksQueue_t *sq)
+{
+ SocksHdr_t shdr;
+ int ret, len;
- if (!FD_ISSET(fd, &cset))
- {
- log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd);
- continue;
- }
+ len = sizeof(SocksHdr_t);
+ if ((ret = read(sq->fd, &shdr, len)) == -1)
+ {
+ log_msg(LOG_ERR, "reading SOCKS response on fd %d failed: \"%s\"", sq->fd, strerror(errno));
+ return -1;
+ }
+ if (ret < len)
+ {
+ log_msg(LOG_ERR, "SOCKS response truncated to %d of %d bytes", ret, len);
+ return -1;
+ }
- // test if connect() worked
- log_debug("check socket error");
- err_len = sizeof(so_err);
- if (getsockopt(fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1)
- {
- log_msg(LOG_ERR, "getsockopt failed: \"%s\"", strerror(errno));
- oe_close(fd);
- return -1;
- }
- if (so_err)
- {
- log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err));
- oe_close(fd);
- return -1;
- }
- // everything seems to be ok, break loop
- break;
- } // for (;;)
+ log_debug("SOCKS response received");
+ if (shdr.ver || (shdr.cmd != 90))
+ {
+ log_msg(LOG_ERR, "SOCKS request failed, reason = %d", shdr.cmd);
+ return -1;
+ }
- log_debug("connected");
- return fd;
+ log_msg(LOG_INFO | LOG_FCONN, "SOCKS connection successfully opened on fd %d", sq->fd);
+ return 0;
}
-int socks_connect(const SocksQueue_t *sq)
+int socks_activate_peer(SocksQueue_t *sq)
{
- int fd, t, len;
- char buf[FRAME_SIZE], onion[ONION_NAME_SIZE];
- SocksHdr_t *shdr = (SocksHdr_t*) buf;
OcatPeer_t *peer;
- int maxfd;
- fd_set rset;
- struct timeval tv;
- ipv6tonion(&sq->addr, onion);
- strlcat(onion, ".onion", sizeof(onion));
+ insert_peer(sq->fd, sq, time(NULL) - sq->connect_time);
- log_msg(LOG_INFO, "trying to connect to \"%s\" [%s]", onion, inet_ntop(AF_INET6, &sq->addr, buf, FRAME_SIZE));
- t = time(NULL);
- if ((fd = socks_srv_con()) < 0)
- return fd;
+ // Send first keepalive immediately
+ lock_peers();
+ if ((peer = search_peer(&sq->addr)))
+ lock_peer(peer);
+ else
+ log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", sq->fd);
+ unlock_peers();
+ if (peer)
+ {
+ send_keepalive(peer);
+ unlock_peer(peer);
+ }
- log_debug("doing SOCKS4a handshake");
+ return 0;
+}
- shdr->ver = 4;
- shdr->cmd = 1;
- shdr->port = htons(CNF(ocat_dest_port));
- shdr->addr.s_addr = htonl(0x00000001);
- /*
- strlcpy(buf + sizeof(SocksHdr_t), usrname_, strlen(usrname_) + 1);
- strlcpy(buf + sizeof(SocksHdr_t) + strlen(usrname_) + 1, onion, sizeof(onion));
- */
- memcpy(buf + sizeof(SocksHdr_t), CNF(usrname), strlen(CNF(usrname)) + 1);
- memcpy(buf + sizeof(SocksHdr_t) + strlen(CNF(usrname)) + 1, onion, strlen(onion) + 1);
- len = sizeof(SocksHdr_t) + strlen(CNF(usrname)) + strlen(onion) + 2;
- if (write(fd, shdr, len) != len)
- // FIXME: there should be some additional error handling
- log_msg(LOG_ERR, "couldn't write %d bytes to SOCKS connection %d", len, fd);
- log_debug("connect request sent");
- for (;;)
- {
- if (term_req())
- {
- oe_close(fd);
- return E_SOCKS_TERMREQ;
- }
+void socks_pipe_request(const SocksQueue_t *sq)
+{
+ fd_set wset;
+ int maxfd;
+ int len = sizeof(*sq), ret;
- FD_ZERO(&rset);
- FD_SET(fd, &rset);
- maxfd = fd;
+ FD_ZERO(&wset);
+ FD_SET(CNF(socksfd[1]), &wset);
+ maxfd = CNF(socksfd[1]);
+
+ log_debug("selecting until socks request pipe gets ready");
- set_select_timeout(&tv);
log_debug("selecting (maxfd = %d)", maxfd);
- if ((maxfd = select(maxfd + 1, &rset, NULL, NULL, &tv)) == -1)
+ if ((maxfd = select(maxfd + 1, NULL, &wset, NULL, NULL)) == -1)
{
log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
- continue;
+ return;
}
-
log_debug("select returned %d", maxfd);
- if (!maxfd)
- {
- log_debug("select timed out, restarting");
- continue;
- }
- if (FD_ISSET(fd, &rset))
- break;
- log_msg(LOG_ERR, "fd %d not in fd_set! Restarting.", fd);
- continue;
- } // for (;;)
+ if (maxfd && FD_ISSET(CNF(socksfd[1]), &wset))
+ {
- if (read(fd, shdr, sizeof(SocksHdr_t)) < sizeof(SocksHdr_t))
+ log_debug("writing %d bytes to fd %d", len, CNF(socksfd[1]));
+ if ((ret = write(CNF(socksfd[1]), sq, len) == -1))
{
- log_msg(LOG_ERR | LOG_FCONN, "short read, closing.");
- oe_close(fd);
- return E_SOCKS_REQ;
+ log_msg(LOG_WARNING, "error writing to SOCKS request pipe fd %d: \"%s\"", CNF(socksfd[1]), strerror(errno));
}
- log_debug("SOCKS response received");
-
- if (shdr->ver || (shdr->cmd != 90))
+ else if (ret < len)
{
- log_msg(LOG_ERR, "request failed, reason = %d", shdr->cmd);
- oe_close(fd);
- return E_SOCKS_RQFAIL;
+ log_msg(LOG_WARNING, "write to SOCKS request pipe fd %d truncated to %d bytes of %d", CNF(socksfd[1]), ret, len);
}
- log_msg(LOG_INFO | LOG_FCONN, "connection to %s successfully opened on fd %d", onion, fd);
-
- insert_peer(fd, sq, time(NULL) - t);
-
- // Send first keepalive immediately
- lock_peers();
- if ((peer = search_peer(&sq->addr)))
- lock_peer(peer);
else
- log_msg(LOG_EMERG, "newly inserted peer not found, fd = %d", fd);
- unlock_peers();
- if (peer)
{
- send_keepalive(peer);
- unlock_peer(peer);
+ log_debug("wrote %d bytes to SOCKS request pipe fd %d", len, CNF(socksfd[1]));
}
-
- // return new file descriptor
- return fd;
+ }
+ else
+ log_msg(LOG_WARNING, "fd %d not in write set", CNF(socksfd[1]));
}
void sig_socks_connector(void)
{
- pthread_cond_signal(&socks_queue_cond_);
+ SocksQueue_t sq;
+
+ memset(&sq, 0, sizeof(sq));
+ socks_pipe_request(&sq);
}
-void socks_queue(struct in6_addr addr, int perm)
+/*! Add and link a SOCKS request to the SOCKS queue.
+ * @param sq Request structure to add.
+ */
+void socks_enqueue(const SocksQueue_t *sq)
{
SocksQueue_t *squeue;
- pthread_mutex_lock(&socks_queue_mutex_);
+ log_debug("queueing new SOCKS connection request");
+ if (!(squeue = malloc(sizeof(SocksQueue_t))))
+ log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
+ memcpy(squeue, sq, sizeof(*squeue));
+
+ squeue->next = socks_queue_;
+ socks_queue_ = squeue;
+}
+
+
+/*! Send a SOCKS request to the request pipe in order to get
+ * added to the SOCKS queue with socks_enqueue()
+ * @param addr IPv6 address to be requested
+ * @param perm 1 if connection should kept opened inifitely after successful request, 0 else.
+ */
+void socks_queue(struct in6_addr addr, int perm)
+{
+ SocksQueue_t *squeue, sq;
+
for (squeue = socks_queue_; squeue; squeue = squeue->next)
if (IN6_ARE_ADDR_EQUAL(&squeue->addr, &addr))
break;
+
if (!squeue)
{
log_debug("queueing new SOCKS connection request");
- if (!(squeue = calloc(1, sizeof(SocksQueue_t))))
- log_msg(LOG_EMERG, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
- IN6_ADDR_COPY(&squeue->addr, &addr);
- squeue->perm = perm;
- squeue->next = socks_queue_;
- socks_queue_ = squeue;
+ memset(&sq, 0, sizeof(sq));
+ IN6_ADDR_COPY(&sq.addr, &addr);
+ sq.perm = perm;
log_debug("signalling connector");
- sig_socks_connector();
+ socks_pipe_request(&sq);
}
else
log_debug("connection already exists, not queueing SOCKS connection");
- pthread_mutex_unlock(&socks_queue_mutex_);
}
@@ -281,121 +232,22 @@ void socks_unqueue(SocksQueue_t *squeue)
}
-void *socks_connector(void *p)
+void print_socks_queue(FILE *f)
{
- OcatPeer_t *peer;
- SocksQueue_t *squeue;
- int i, ps, run = 1, t_abs, t_diff;
- char thn[THREAD_NAME_LEN] = "cn:", on[ONION_NAME_LEN];
+ SocksQueue_t sq;
- detach_thread();
-
- pthread_mutex_lock(&socks_queue_mutex_);
- socks_thread_cnt_++;
- log_debug("%d connector threads running", socks_thread_cnt_);
- pthread_mutex_unlock(&socks_queue_mutex_);
-
- while (run)
- {
- pthread_mutex_lock(&socks_queue_mutex_);
- for (;;)
- {
- for (squeue = socks_queue_; squeue; squeue = squeue->next)
- if (!squeue->state)
- {
- log_debug("unhandled queue entry found");
- break;
- }
-
- if (squeue)
- break;
-
- log_debug("waiting for new queue entry");
- pthread_cond_wait(&socks_queue_cond_, &socks_queue_mutex_);
- // check termination request
- if (term_req())
- {
- pthread_mutex_unlock(&socks_queue_mutex_);
- return NULL;
- }
- }
-
- // spawn spare thread if there is no one left
- log_debug("change queue element state to CONNECTING");
- squeue->state = SOCKS_CONNECTING;
- socks_connect_cnt_++;
- if (socks_thread_cnt_ <= socks_connect_cnt_)
- {
- log_debug("spawning new connector threads");
- run_ocat_thread("connector", socks_connector, NULL);
- }
- pthread_mutex_unlock(&socks_queue_mutex_);
-
- // changing thread name
- log_debug("changing thread name");
- ipv6tonion(&squeue->addr, on);
- strlcat(thn, on, THREAD_NAME_LEN);
- set_thread_name(thn);
-
- // search for existing peer
- lock_peers();
- peer = search_peer(&squeue->addr);
- unlock_peers();
-
- // connect via SOCKS if no peer exists
- if (!peer)
- for (i = 0, ps = -1, t_abs = 0; ((i < SOCKS_MAX_RETRY) || squeue->perm) && ps < 0; i++)
- {
- // FIXME: term_req should be checked here
-
- // every third connection attempt
- if (!(i % RECONN_ATTEMPTS))
- {
- // check that it does not reconnect too fast
- t_diff = time(NULL) - t_abs;
- if (t_diff < MIN_RECONNECT_TIME)
- {
- // and sleep if necessary
- log_msg(LOG_WARNING, "reconnecting too fast. sleeping %d seconds", MIN_RECONNECT_TIME - t_diff);
- sleep(MIN_RECONNECT_TIME - t_diff);
- }
- t_abs = time(NULL);
- }
- log_debug("%d. SOCKS connection attempt", i + 1);
- ps = socks_connect(squeue);
- }
- else
- log_msg(LOG_INFO, "peer already exists, ignoring");
-
- // remove request from queue after connect
- log_debug("removing destination from SOCKS queue");
- pthread_mutex_lock(&socks_queue_mutex_);
-
- socks_unqueue(squeue);
- socks_connect_cnt_--;
-
- // if there are more threads then pending connections
- // terminate thread
- if (socks_connect_cnt_ < socks_thread_cnt_ - 1)
- {
- log_debug("going to terminate connector thread");
- socks_thread_cnt_--;
- run = 0;
- }
- pthread_mutex_unlock(&socks_queue_mutex_);
- }
- return NULL;
+ memset(&sq, 0, sizeof(sq));
+ sq.next = (SocksQueue_t*) f;
+ socks_pipe_request(&sq);
}
-void print_socks_queue(FILE *f)
+void socks_output_queue(FILE *f)
{
int i;
- char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN];
+ char addrstr[INET6_ADDRSTRLEN], onstr[ONION_NAME_LEN], buf[SIZE_1K];
SocksQueue_t *squeue;
- pthread_mutex_lock(&socks_queue_mutex_);
-
for (squeue = socks_queue_, i = 0; squeue; squeue = squeue->next, i++)
{
if (!inet_ntop(AF_INET6, &squeue->addr, addrstr, INET6_ADDRSTRLEN))
@@ -404,20 +256,27 @@ void print_socks_queue(FILE *f)
strlcpy(addrstr, "ERROR", INET6_ADDRSTRLEN);
}
- fprintf(f, "%d %39s %s.onion %s(%d) %s(%d)\n",
+ snprintf(buf, SIZE_1K, "%d: %39s, %s.onion, state = %d, %s(%d), retry = %d, connect_time = %ld, restart_time = %ld",
i,
addrstr,
ipv6tonion(&squeue->addr, onstr),
- squeue->state == SOCKS_CONNECTING ? "CONNECTING" : "QUEUED",
squeue->state,
squeue->perm ? "PERMANENT" : "TEMPORARY",
- squeue->perm);
+ squeue->perm,
+ squeue->retry,
+ squeue->connect_time,
+ squeue->restart_time
+ );
+// log_debug("%s", buf);
+ write((int) f, buf, strlen(buf));
+ write((int) f, "\n", 1);
}
-
- pthread_mutex_unlock(&socks_queue_mutex_);
+ write((int) f, "\0", 1);
+ log_debug("socks_output_queue() finished");
}
+#if 0
int socks5_connect(const SocksQueue_t *sq)
{
char buf[256 + sizeof(uint16_t) + sizeof(Socks5Hdr_t)];
@@ -441,4 +300,230 @@ int socks5_connect(const SocksQueue_t *sq)
return fd;
}
+#endif
+
+
+int socks_tcp_connect(int fd, struct sockaddr *addr, int len)
+{
+ char astr[INET6_ADDRSTRLEN];
+ if (connect(fd, addr, len) == -1)
+ {
+ if (errno != EINPROGRESS)
+ {
+ log_msg(LOG_ERR, "connect() to SOCKS port %s:%d failed: \"%s\". Sleeping for %d seconds.",
+ inet_ntop(CNF(socks_dst)->sin_family,
+ CNF(socks_dst)->sin_family == AF_INET ? (char*) &CNF(socks_dst)->sin_addr : (char*) &CNF(socks_dst6)->sin6_addr, astr, sizeof(astr)),
+ ntohs(CNF(socks_dst)->sin_port), strerror(errno), TOR_SOCKS_CONN_TIMEOUT);
+ return -1;
+ }
+ log_debug("connection in progress");
+ }
+ else
+ log_debug("connected");
+
+ return 0;
+}
+
+
+void socks_reschedule(SocksQueue_t *squeue)
+{
+ log_msg(LOG_ERR, "rescheduling SOCKS request");
+ if (squeue->fd > 0)
+ {
+ oe_close(squeue->fd);
+ squeue->fd = 0;
+ }
+ squeue->restart_time = time(NULL) + TOR_SOCKS_CONN_TIMEOUT;
+ squeue->state = SOCKS_NEW;
+}
+
+
+void *socks_connector_sel(void *p)
+{
+ fd_set rset, wset;
+ int maxfd = 0, len, so_err;
+ SocksQueue_t *squeue, sq;
+ time_t t;
+ struct timeval tv;
+ socklen_t err_len;
+
+ for (;;)
+ {
+ if (term_req())
+ return NULL;
+
+ FD_ZERO(&rset);
+ FD_ZERO(&wset);
+ MFD_SET(CNF(socksfd[0]), &rset, maxfd);
+ t = time(NULL);
+
+ for (squeue = socks_queue_; squeue; squeue = squeue->next)
+ {
+ switch (squeue->state)
+ {
+ case SOCKS_NEW:
+ /*if (!squeue->fd)
+ {
+ log_msg(LOG_CRIT, "SOCKS_NEW and fd = %d, but should be 0", squeue->fd);
+ squeue->state = SOCKS_DELETE;
+ continue;
+ }*/
+
+ if (t < squeue->restart_time)
+ {
+ log_debug("SOCKS request is scheduled for connection not before %ds", squeue->restart_time - t);
+ continue;
+ }
+
+ // check and increase retry counter
+ squeue->retry++;
+ if (!squeue->perm && (squeue->retry > SOCKS_MAX_RETRY))
+ {
+ log_msg(LOG_NOTICE, "temporary request failed %d times and will be removed", squeue->retry - 1);
+ squeue->state = SOCKS_DELETE;
+ continue;
+ }
+
+ log_debug("creating socket for unconnected SOCKS request");
+ if ((squeue->fd = socket(CNF(socks_dst)->sin_family == AF_INET ? PF_INET : PF_INET6, SOCK_STREAM, 0)) == -1)
+ {
+ log_msg(LOG_ERR, "cannot create socket for new SOCKS request: \"%s\"", strerror(errno));
+ continue;
+ }
+
+ set_nonblock(squeue->fd);
+ log_debug("queueing fd %d for connect", squeue->fd);
+ squeue->connect_time = t;
+ if (socks_tcp_connect(squeue->fd, (struct sockaddr*) CNF(socks_dst), SOCKADDR_SIZE(CNF(socks_dst))) == -1)
+ {
+ socks_reschedule(squeue);
+ continue;
+ }
+
+ squeue->state = SOCKS_CONNECTING;
+ MFD_SET(squeue->fd, &wset, maxfd);
+
+ break;
+
+ case SOCKS_4AREQ_SENT:
+ MFD_SET(squeue->fd, &rset, maxfd);
+ break;
+ }
+ }
+
+ // select all file descriptors
+ set_select_timeout(&tv);
+ log_debug("selecting (maxfd = %d)", maxfd);
+ if ((maxfd = select(maxfd + 1, &rset, &wset, NULL, &tv)) == -1)
+ {
+ log_msg(LOG_EMERG, "select encountered error: \"%s\", restarting", strerror(errno));
+ continue;
+ }
+ log_debug("select returned %d", maxfd);
+
+ // check socks request pipe
+ if (FD_ISSET(CNF(socksfd[0]), &rset))
+ {
+ maxfd--;
+ if ((len = read(CNF(socksfd[0]), &sq, sizeof(sq))) == -1)
+ log_msg(LOG_ERR, "failed to read from SOCKS request pipe, fd = %d: \"%s\"",
+ CNF(socksfd[0]), strerror(errno));
+ if (len < sizeof(sq))
+ log_msg(LOG_ERR, "read from SOCKS request pipe truncated to %d of %d bytes, ignoring.",
+ len, sizeof(sq));
+ else
+ {
+ log_debug("received %d bytes on SOCKS request pipe fd %d", len, CNF(socksfd[0]));
+ if (sq.next)
+ {
+ log_debug("output of SOCKS request queue triggered");
+ socks_output_queue((FILE*) sq.next);
+ }
+ else if (IN6_IS_ADDR_UNSPECIFIED(&sq.addr))
+ {
+ log_debug("termination request on SOCKS request queue received");
+ }
+ else
+ {
+ log_debug("SOCKS queuing request received");
+ socks_enqueue(&sq);
+ }
+ }
+ }
+
+ // handle all other file descriptors
+ t = time(NULL);
+ for (squeue = socks_queue_; maxfd && squeue; squeue = squeue->next)
+ {
+ // check write set, this is valid after connect()
+ if (FD_ISSET(squeue->fd, &wset))
+ {
+ maxfd--;
+ if (squeue->state == SOCKS_CONNECTING)
+ {
+ // test if connect() worked
+ log_debug("check socket error");
+ err_len = sizeof(so_err);
+ if (getsockopt(squeue->fd, SOL_SOCKET, SO_ERROR, &so_err, &err_len) == -1)
+ {
+ log_msg(LOG_ERR, "getsockopt failed: \"%s\", rescheduling request", strerror(errno));
+ socks_reschedule(squeue);
+ continue;
+ }
+ if (so_err)
+ {
+ log_msg(LOG_ERR, "getsockopt returned %d (\"%s\")", so_err, strerror(so_err));
+ socks_reschedule(squeue);
+ continue;
+ }
+ // everything seems to be ok, now check request status
+ if (socks_send_request(squeue) == -1)
+ {
+ log_msg(LOG_ERR, "SOCKS request failed");
+ socks_reschedule(squeue);
+ continue;
+ }
+ // request successfully sent, advance state machine
+ squeue->state = SOCKS_4AREQ_SENT;
+ }
+ else
+ log_debug("unknown state %d in write set", squeue->state);
+ }
+
+ // check read set, this is valid after write, i.e. receiving SOCKS response
+ if (FD_ISSET(squeue->fd, &rset))
+ {
+ maxfd--;
+ if (squeue->state == SOCKS_4AREQ_SENT)
+ {
+ if (socks_rec_response(squeue) == -1)
+ {
+ socks_reschedule(squeue);
+ continue;
+ }
+ // success
+ log_debug("activating peer fd %d", squeue->fd);
+ socks_activate_peer(squeue);
+ squeue->state = SOCKS_DELETE;
+ }
+ else
+ log_debug("unknown state %d in read set", squeue->state);
+ }
+ }
+
+ // delete requests from queue which are marked for deletion
+ for (squeue = socks_queue_; squeue; squeue = squeue->next)
+ if (squeue->state == SOCKS_DELETE)
+ {
+ socks_unqueue(squeue);
+ // restart loop
+ squeue = socks_queue_;
+ if (!squeue)
+ {
+ log_debug("last entry deleted, breaking loop");
+ break;
+ }
+ }
+ }
+}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/onioncat.git
More information about the Pkg-privacy-commits
mailing list