[Pkg-privacy-commits] [onioncat] 16/241: socket_connector redesigned for parallel queueing
Intrigeri
intrigeri at moszumanska.debian.org
Wed Aug 26 16:16:15 UTC 2015
This is an automated email from the git hooks/post-receive script.
intrigeri pushed a commit to branch upstream-master
in repository onioncat.
commit bfbd1032955efafb9a1bbf8d9d766890cec89c40
Author: eagle <eagle at 58e1ccc2-750e-0410-8d0d-f93ca75ab447>
Date: Mon Feb 11 01:23:18 2008 +0000
socket_connector redesigned for parallel queueing
git-svn-id: https://www.cypherpunk.at/svn/onioncat/trunk@132 58e1ccc2-750e-0410-8d0d-f93ca75ab447
---
ocat.h | 5 ++++
ocatroute.c | 93 +++++++++++++++++++++++++++++++++++++++++++++----------------
2 files changed, 74 insertions(+), 24 deletions(-)
diff --git a/ocat.h b/ocat.h
index c8259a2..4155a6b 100644
--- a/ocat.h
+++ b/ocat.h
@@ -48,6 +48,9 @@
#define THREAD_NAME_LEN 11
+#define SOCKS_CONNECTING 1
+#define SOCKS_MAX_RETRY 3
+
typedef struct PacketQueue
{
@@ -91,6 +94,8 @@ typedef struct SocksQueue
{
struct SocksQueue *next;
struct in6_addr addr;
+ int state;
+// int retry;
} SocksQueue_t;
// next header value for ocat internal use (RFC3692)
diff --git a/ocatroute.c b/ocatroute.c
index 7a9acf0..0f24bad 100644
--- a/ocatroute.c
+++ b/ocatroute.c
@@ -29,7 +29,7 @@ static int sockfd_;
static int lpfd_[2];
// file descriptors of socks_connector pipe
// used for internal communication
-static int cpfd_[2];
+//static int cpfd_[2];
// array of active peers
static OcatPeer_t peer_[MAXPEERS];
// mutex for locking array of peers
@@ -42,8 +42,8 @@ static pthread_cond_t queue_cond_ = PTHREAD_COND_INITIALIZER;
// SOCKS connector queue vars
static SocksQueue_t *socks_queue_ = NULL;
-static int socks_queue_cnt_ = 0;
-static int socks_cth_cnt_ = 0;
+static int socks_connect_cnt_ = 0;
+static int socks_thread_cnt_ = 0;
static pthread_mutex_t socks_queue_mutex_ = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t socks_queue_cond_ = PTHREAD_COND_INITIALIZER;
@@ -647,49 +647,91 @@ int socks_connect(const struct in6_addr *addr)
}
+void socks_queue(const struct in6_addr *addr)
+{
+ SocksQueue_t *squeue;
+
+ pthread_mutex_lock(&socks_queue_mutex_);
+ for (squeue = socks_queue_; squeue; squeue = squeue->next)
+ if (!memcmp(&squeue->addr, addr, sizeof(struct in6_addr)))
+ break;
+ if (!squeue)
+ {
+ log_msg(L_DEBUG, "queueing new SOCKS connection request");
+ if (!(squeue = calloc(1, sizeof(SocksQueue_t))))
+ log_msg(L_FATAL, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
+ memcpy(&squeue->addr, addr, sizeof(struct in6_addr));
+ squeue->next = socks_queue_;
+ socks_queue_ = squeue;
+ log_msg(L_DEBUG, "signalling connector");
+ pthread_cond_signal(&socks_queue_cond_);
+ }
+ else
+ log_msg(L_DEBUG, "connection already exists, not queueing SOCKS connection");
+ pthread_mutex_unlock(&socks_queue_mutex_);
+}
+
+
void *socks_connector(void *p)
{
OcatPeer_t *peer;
- struct in6_addr addr;
- int len;
+ SocksQueue_t **squeue, *sq;
+ int i, rc, run = 1;
(void) init_ocat_thread(p);
- if (pipe(cpfd_) < 0)
- log_msg(L_FATAL, "[init_socks_connector] could not create pipe for socks_connector: \"%s\"", strerror(errno)), exit(1);
+ if ((rc = pthread_detach(pthread_self())))
+ log_msg(L_ERROR, "couldn't detach: \"%s\"", rc);
- log_msg(L_NOTICE, "[socks_connector] running");
+ pthread_mutex_lock(&socks_queue_mutex_);
+ socks_thread_cnt_++;
+ pthread_mutex_unlock(&socks_queue_mutex_);
- for (;;)
+ while (run)
{
- log_msg(L_DEBUG, "[socks_connector] reading from connector pipe %d", cpfd_[0]);
- if ((len = read(cpfd_[0], &addr, sizeof(addr))) == -1)
- log_msg(L_FATAL, "[socks_connector] error reading from connector pipe %d: %s", cpfd_[0], strerror(errno)), exit(1);
- if (len != sizeof(addr))
+ pthread_mutex_lock(&socks_queue_mutex_);
+ do
{
- log_msg(L_ERROR, "[socks_connector] illegal read on connector pipe %d: %d bytes", cpfd_[0], len);
- continue;
+ pthread_cond_wait(&socks_queue_cond_, &socks_queue_mutex_);
+ for (squeue = &socks_queue_; *squeue; squeue = &(*squeue)->next)
+ if (!(*squeue)->state)
+ break;
}
+ while (!(*squeue));
+ (*squeue)->state = SOCKS_CONNECTING;
+ socks_connect_cnt_++;
+ if (socks_thread_cnt_ <= socks_connect_cnt_)
+ run_ocat_thread("connector", socks_connector);
+ pthread_mutex_unlock(&socks_queue_mutex_);
pthread_mutex_lock(&peer_mutex_);
- peer = search_peer(&addr);
+ peer = search_peer(&(*squeue)->addr);
pthread_mutex_unlock(&peer_mutex_);
- if (peer)
- {
- log_msg(L_NOTICE, "[socks_connector] peer already exists, ignoring");
- continue;
- }
-
- socks_connect(&addr);
+ if (!peer)
+ for (i = 0; i < SOCKS_MAX_RETRY; i++)
+ socks_connect(&(*squeue)->addr);
+ else
+ log_msg(L_NOTICE, "peer already exists, ignoring");
+
+ log_msg(L_NOTICE, "removing from SOCKS queue");
+ pthread_mutex_lock(&socks_queue_mutex_);
+ sq = *squeue;
+ *squeue = (*squeue)->next;
+ free(sq);
+ socks_connect_cnt_--;
+ if (socks_connect_cnt_ < socks_thread_cnt_ - 1)
+ run = 0;
+ pthread_mutex_unlock(&socks_queue_mutex_);
}
+ return NULL;
}
void packet_forwarder(void)
{
char buf[FRAME_SIZE];
- char addr[INET6_ADDRSTRLEN];
+ //char addr[INET6_ADDRSTRLEN];
struct ip6_hdr *ihd;
int rlen;
@@ -712,9 +754,12 @@ void packet_forwarder(void)
{
log_msg(L_NOTICE, "[packet_forwarder] establishing new socks peer");
//push_socks_connector(&ihd->ip6_dst);
+ /*
log_msg(L_DEBUG, "[packet_forwarder] writing %s to socks connector pipe %d", inet_ntop(AF_INET6, &ihd->ip6_dst, addr, INET6_ADDRSTRLEN), cpfd_[1]);
if (write(cpfd_[1], &ihd->ip6_dst, sizeof(struct in6_addr)) != sizeof(struct in6_addr))
log_msg(L_ERROR, "couldn't write %d bytes to SOCKS connector pipe %d", sizeof(struct in6_addr), cpfd_[1]);
+ */
+ socks_queue(&ihd->ip6_dst);
log_msg(L_DEBUG, "[packet_forwarder] queuing packet");
queue_packet(&ihd->ip6_dst, buf, rlen);
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/onioncat.git
More information about the Pkg-privacy-commits
mailing list