[Pkg-privacy-commits] [onioncat] 16/340: socket_connector redesigned for parallel queueing

Ximin Luo infinity0 at moszumanska.debian.org
Sat Aug 22 13:04:20 UTC 2015


This is an automated email from the git hooks/post-receive script.

infinity0 pushed a commit to branch debian
in repository onioncat.

commit b2295b9c463cf65d92a2f539ad657bd28d12ca46
Author: eagle <eagle at 58e1ccc2-750e-0410-8d0d-f93ca75ab447>
Date:   Mon Feb 11 01:23:18 2008 +0000

    socket_connector redesigned for parallel queueing
    
    git-svn-id: http://www.cypherpunk.at/svn/onioncat/trunk@132 58e1ccc2-750e-0410-8d0d-f93ca75ab447
---
 ocat.h      |  5 ++++
 ocatroute.c | 93 +++++++++++++++++++++++++++++++++++++++++++++----------------
 2 files changed, 74 insertions(+), 24 deletions(-)

diff --git a/ocat.h b/ocat.h
index c8259a2..4155a6b 100644
--- a/ocat.h
+++ b/ocat.h
@@ -48,6 +48,9 @@
 
 #define THREAD_NAME_LEN 11
 
+#define SOCKS_CONNECTING 1
+#define SOCKS_MAX_RETRY 3
+
 
 typedef struct PacketQueue
 {
@@ -91,6 +94,8 @@ typedef struct SocksQueue
 {
    struct SocksQueue *next;
    struct in6_addr addr;
+   int state;
+//   int retry;
 } SocksQueue_t;
 
 // next header value for ocat internal use (RFC3692)
diff --git a/ocatroute.c b/ocatroute.c
index 7a9acf0..0f24bad 100644
--- a/ocatroute.c
+++ b/ocatroute.c
@@ -29,7 +29,7 @@ static int sockfd_;
 static int lpfd_[2];
 // file descriptors of socks_connector pipe
 // used for internal communication
-static int cpfd_[2];
+//static int cpfd_[2];
 // array of active peers
 static OcatPeer_t peer_[MAXPEERS];
 // mutex for locking array of peers
@@ -42,8 +42,8 @@ static pthread_cond_t queue_cond_ = PTHREAD_COND_INITIALIZER;
 
 // SOCKS connector queue vars
 static SocksQueue_t *socks_queue_ = NULL;
-static int socks_queue_cnt_ = 0;
-static int socks_cth_cnt_ = 0;
+static int socks_connect_cnt_ = 0;
+static int socks_thread_cnt_ = 0;
 static pthread_mutex_t socks_queue_mutex_ = PTHREAD_MUTEX_INITIALIZER;
 static pthread_cond_t socks_queue_cond_ = PTHREAD_COND_INITIALIZER;
 
@@ -647,49 +647,91 @@ int socks_connect(const struct in6_addr *addr)
 }
 
 
+void socks_queue(const struct in6_addr *addr)
+{
+   SocksQueue_t *squeue;
+
+   pthread_mutex_lock(&socks_queue_mutex_);
+   for (squeue = socks_queue_; squeue; squeue = squeue->next)
+      if (!memcmp(&squeue->addr, addr, sizeof(struct in6_addr)))
+         break;
+   if (!squeue)
+   {
+      log_msg(L_DEBUG, "queueing new SOCKS connection request");
+      if (!(squeue = calloc(1, sizeof(SocksQueue_t))))
+         log_msg(L_FATAL, "could not get memory for SocksQueue entry: \"%s\"", strerror(errno)), exit(1);
+      memcpy(&squeue->addr, addr, sizeof(struct in6_addr));
+      squeue->next = socks_queue_;
+      socks_queue_ = squeue;
+      log_msg(L_DEBUG, "signalling connector");
+      pthread_cond_signal(&socks_queue_cond_);
+   }
+   else
+      log_msg(L_DEBUG, "connection already exists, not queueing SOCKS connection");
+   pthread_mutex_unlock(&socks_queue_mutex_);
+}
+
+
 void *socks_connector(void *p)
 {
    OcatPeer_t *peer;
-   struct in6_addr addr;
-   int len;
+   SocksQueue_t **squeue, *sq;
+   int i, rc, run = 1;
 
    (void) init_ocat_thread(p);
 
-   if (pipe(cpfd_) < 0)
-      log_msg(L_FATAL, "[init_socks_connector] could not create pipe for socks_connector: \"%s\"", strerror(errno)), exit(1);
+   if ((rc = pthread_detach(pthread_self())))
+      log_msg(L_ERROR, "couldn't detach: \"%s\"", rc);
 
-   log_msg(L_NOTICE, "[socks_connector] running");
+   pthread_mutex_lock(&socks_queue_mutex_);
+   socks_thread_cnt_++;
+   pthread_mutex_unlock(&socks_queue_mutex_);
 
-   for (;;)
+   while (run)
    {
-      log_msg(L_DEBUG, "[socks_connector] reading from connector pipe %d", cpfd_[0]);
-      if ((len = read(cpfd_[0], &addr, sizeof(addr))) == -1)
-         log_msg(L_FATAL, "[socks_connector] error reading from connector pipe %d: %s", cpfd_[0], strerror(errno)), exit(1);
-      if (len != sizeof(addr))
+      pthread_mutex_lock(&socks_queue_mutex_);
+      do
       {
-         log_msg(L_ERROR, "[socks_connector] illegal read on connector pipe %d: %d bytes", cpfd_[0], len);
-         continue;
+         pthread_cond_wait(&socks_queue_cond_, &socks_queue_mutex_);
+         for (squeue = &socks_queue_; *squeue; squeue = &(*squeue)->next)
+            if (!(*squeue)->state)
+               break;
       }
+      while (!(*squeue));
+      (*squeue)->state = SOCKS_CONNECTING;
+      socks_connect_cnt_++;
+      if (socks_thread_cnt_ <= socks_connect_cnt_)
+         run_ocat_thread("connector", socks_connector);
+      pthread_mutex_unlock(&socks_queue_mutex_);
 
       pthread_mutex_lock(&peer_mutex_);
-      peer = search_peer(&addr);
+      peer = search_peer(&(*squeue)->addr);
       pthread_mutex_unlock(&peer_mutex_);
 
-      if (peer)
-      {
-         log_msg(L_NOTICE, "[socks_connector] peer already exists, ignoring");
-         continue;
-      }
-
-      socks_connect(&addr);
+      if (!peer)
+         for (i = 0; i < SOCKS_MAX_RETRY; i++)
+            socks_connect(&(*squeue)->addr);
+      else
+         log_msg(L_NOTICE, "peer already exists, ignoring");
+
+      log_msg(L_NOTICE, "removing from SOCKS queue");
+      pthread_mutex_lock(&socks_queue_mutex_);
+      sq = *squeue;
+      *squeue = (*squeue)->next;
+      free(sq);
+      socks_connect_cnt_--;
+      if (socks_connect_cnt_ < socks_thread_cnt_ - 1)
+         run = 0;
+      pthread_mutex_unlock(&socks_queue_mutex_);
    }
+   return NULL;
 }
 
 
 void packet_forwarder(void)
 {
    char buf[FRAME_SIZE];
-   char addr[INET6_ADDRSTRLEN];
+   //char addr[INET6_ADDRSTRLEN];
    struct ip6_hdr *ihd;
    int rlen;
 
@@ -712,9 +754,12 @@ void packet_forwarder(void)
       {
          log_msg(L_NOTICE, "[packet_forwarder] establishing new socks peer");
          //push_socks_connector(&ihd->ip6_dst);
+         /*
          log_msg(L_DEBUG, "[packet_forwarder] writing %s to socks connector pipe %d", inet_ntop(AF_INET6, &ihd->ip6_dst, addr, INET6_ADDRSTRLEN), cpfd_[1]);
          if (write(cpfd_[1], &ihd->ip6_dst, sizeof(struct in6_addr)) != sizeof(struct in6_addr))
             log_msg(L_ERROR, "couldn't write %d bytes to SOCKS connector pipe %d", sizeof(struct in6_addr), cpfd_[1]);
+            */
+         socks_queue(&ihd->ip6_dst);
          log_msg(L_DEBUG, "[packet_forwarder] queuing packet");
          queue_packet(&ihd->ip6_dst, buf, rlen);
       }

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/onioncat.git



More information about the Pkg-privacy-commits mailing list