[Pkg-privacy-commits] [onioncat] 38/241: peers now do have one mutexes, bug in socket_cleaner fixed
Intrigeri
intrigeri at moszumanska.debian.org
Wed Aug 26 16:16:22 UTC 2015
This is an automated email from the git hooks/post-receive script.
intrigeri pushed a commit to branch upstream-master
in repository onioncat.
commit dcd5a1645cd0ea6bb55fa66e8b155d00414b9326
Author: eagle <eagle at 58e1ccc2-750e-0410-8d0d-f93ca75ab447>
Date: Sun Mar 23 19:53:01 2008 +0000
peers now do have one mutexes, bug in socket_cleaner fixed
git-svn-id: https://www.cypherpunk.at/svn/onioncat/trunk@179 58e1ccc2-750e-0410-8d0d-f93ca75ab447
---
ocat.h | 41 ++++------
ocatroute.c | 260 ++++++++++++++++++++++++++++++++++++------------------------
2 files changed, 172 insertions(+), 129 deletions(-)
diff --git a/ocat.h b/ocat.h
index 2d503e6..7c7575f 100644
--- a/ocat.h
+++ b/ocat.h
@@ -79,19 +79,20 @@ typedef struct SocksHdr
typedef struct OcatPeer
{
- struct OcatPeer *next;
- struct in6_addr addr; //<! remote address of peer
- int tcpfd; //<! remote file descriptor
- time_t time; //<! timestamp of latest packet
- time_t sdelay; //<! connection setup delay
- time_t otime; //<! opening time
- int state; //<! status of peer
- int dir;
- unsigned long out;
- unsigned long in;
- uint32_t fraghdr;
- char fragbuf[FRAME_SIZE - 4];
- int fraglen;
+ struct OcatPeer *next; //!< pointer to next peer in list
+ struct in6_addr addr; //!< remote address of peer
+ int tcpfd; //!< remote file descriptor
+ time_t time; //!< timestamp of latest packet
+ time_t sdelay; //!< connection setup delay
+ time_t otime; //!< opening time
+ int state; //!< status of peer
+ int dir; //!< direction this session was opened
+ unsigned long out; //!< bytes output
+ unsigned long in; //!< bytes input
+ uint32_t fraghdr; //!< local tun frame header
+ char fragbuf[FRAME_SIZE - 4]; //!< (de)frag buffer
+ int fraglen; //!< current frag buffer size
+ pthread_mutex_t mutex; //!< mutex for thread locking
} OcatPeer_t;
typedef struct OcatThread
@@ -195,30 +196,16 @@ void test_tun_hdr(void);
#endif
/* ocatroute.c */
-//OnionPeer_t *search_peer(const struct in6_addr *);
-//OnionPeer_t *establish_peer(int fd, const struct in6_addr *);
void init_peers(void);
-//void init_socket_acceptor(void);
-//void init_socket_receiver(void);
-//void init_socks_connector(void);
-//void push_socks_connector(const struct in6_addr *);
-//int socks_connect(const char *);
void *socket_receiver(void *);
-//void update_peer_time(const OnionPeer_t *);
-//const OnionPeer_t *forward_packet(const struct in6_addr *, const char *, int);
-//void queue_packet(const struct in6_addr *, const char *, int);
-//void init_packet_dequeuer(void);
void packet_forwarder(void);
-//void init_socket_cleaner(void);
void *packet_dequeuer(void *);
void *socket_acceptor(void *);
void *socks_connector(void *);
void *socket_cleaner(void *);
void *ocat_controller(void *);
-
/* ocatthread.c */
-//void init_threads(void);
const OcatThread_t *init_ocat_thread(const char *);
int run_ocat_thread(const char *, void *(*)(void*));
const OcatThread_t *get_thread(void);
diff --git a/ocatroute.c b/ocatroute.c
index 4aae2af..455a6d6 100644
--- a/ocatroute.c
+++ b/ocatroute.c
@@ -57,6 +57,7 @@ uint16_t ocat_dest_port_ = OCAT_DEST_PORT;
int vrec_ = 0;
+//k
OcatPeer_t *search_peer(const struct in6_addr *addr)
{
OcatPeer_t *peer;
@@ -68,92 +69,76 @@ OcatPeer_t *search_peer(const struct in6_addr *addr)
}
+//k
OcatPeer_t *get_empty_peer(void)
{
+ int rc;
OcatPeer_t *peer;
if (!(peer = calloc(1, sizeof(OcatPeer_t))))
- log_msg(L_ERROR, "cannot get memory for new peer: %s", strerror(errno));
- else
{
- peer->fraghdr = fhd_key_;
- peer->next = peer_;
- peer_ = peer;
+ log_msg(L_ERROR, "cannot get memory for new peer: \"%s\"", strerror(errno));
+ return NULL;
+ }
+
+ peer->fraghdr = fhd_key_;
+ if ((rc = pthread_mutex_init(&peer->mutex, NULL)))
+ {
+ log_msg(L_FATAL, "cannot init new peer mutex: \"%s\"", strerror(rc));
+ free(peer);
+ return NULL;
}
+ peer->next = peer_;
+ peer_ = peer;
+
return peer;
}
+//k
void delete_peer(OcatPeer_t *peer)
{
+ int rc;
OcatPeer_t **p;
+
for (p = &peer_; *p; p = &(*p)->next)
if (*p == peer)
{
+ pthread_mutex_lock(&peer->mutex);
*p = peer->next;
+ pthread_mutex_unlock(&peer->mutex);
+ if ((rc = pthread_mutex_destroy(&peer->mutex)))
+ log_msg(L_FATAL, "cannot destroy mutex: \"%s\"", strerror(rc));
free(peer);
return;
}
}
-/*
-void rewrite_framehdr(char *buf, int len)
-{
- uint32_t *fhd = (uint32_t*) buf;
- struct ip6_hdr *ihd;
- int ofs;
-
- if (*fhd == fhd_key_)
- {
- log_msg(L_DEBUG, "[rewrite_framehdr] frame header already of correct type");
- return;
- }
-
- while(len > 4)
- {
- if (*fhd != htonl(0x1c) && *fhd != htonl(0x86dd))
- {
- log_msg(L_DEBUG, "[rewrite_framehdr] frame seems to be fragment");
- return;
- }
- // replace header type
- log_msg(L_DEBUG, "[rewrite_framehdr] rewriting");
- *fhd = fhd_key_;
- // finding next header
- if (len < 4 + sizeof(struct ip6_hdr))
- {
- log_msg(L_DEBUG, "[rewrite_framehdr] short frag");
- return;
- }
- ihd = (struct ip6_hdr*) (fhd + 1);
- ofs = 4 + sizeof(struct ip6_hdr) + ihd->ip6_plen;
- len -= ofs;
- fhd = (uint32_t*) (buf + ofs);
- }
-}
-*/
-
-/*const*/ OcatPeer_t *forward_packet(const struct in6_addr *addr, const char *buf, int buflen)
+//k
+int forward_packet(const struct in6_addr *addr, const char *buf, int buflen)
{
OcatPeer_t *peer;
pthread_mutex_lock(&peer_mutex_);
if ((peer = search_peer(addr)))
{
+ pthread_mutex_lock(&peer->mutex);
log_msg(L_DEBUG, "forwarding %d bytes to TCP fd %d", buflen, peer->tcpfd);
if (write(peer->tcpfd, buf, buflen) != buflen)
log_msg(L_ERROR, "could not write %d bytes to peer %d", buflen, peer->tcpfd);
peer->time = time(NULL);
peer->out += buflen;
+ pthread_mutex_unlock(&peer->mutex);
}
pthread_mutex_unlock(&peer_mutex_);
- return peer;
+ return peer != NULL;
}
+//k
void queue_packet(const struct in6_addr *addr, const char *buf, int buflen)
{
PacketQueue_t *queue;
@@ -181,10 +166,11 @@ void queue_packet(const struct in6_addr *addr, const char *buf, int buflen)
}
+//k
void *packet_dequeuer(void *p)
{
PacketQueue_t **queue, *fqueue;
- OcatPeer_t *peer;
+// OcatPeer_t *peer;
struct timespec ts;
int rc, timed = 0;
time_t delay;
@@ -211,11 +197,11 @@ void *packet_dequeuer(void *p)
log_msg(L_DEBUG, "starting dequeuing");
for (queue = &queue_; *queue; /*queue = &(*queue)->next*/)
{
- peer = forward_packet(&(*queue)->addr, (*queue)->data, (*queue)->psize);
+ rc = forward_packet(&(*queue)->addr, (*queue)->data, (*queue)->psize);
// delete packet from queue if it was sent or is too old
delay = time(NULL) - (*queue)->time;
- if (peer || (delay > MAX_QUEUE_DELAY))
+ if (rc || (delay > MAX_QUEUE_DELAY))
{
fqueue = *queue;
*queue = (*queue)->next;
@@ -248,28 +234,33 @@ void hex_code_header(const char *frame, int len, char *buf)
// do some packet validation
+//k
int validate_frame(const struct ip6_hdr *ihd, int len)
{
char buf[INET6_ADDRSTRLEN];
char hexbuf[IP6HLEN * 3 + 1];
- hex_code_header((char*) ihd, len > IP6HLEN ? IP6HLEN : len, hexbuf);
- log_msg(L_DEBUG, "[validate_frame] header \"%s\"", hexbuf);
+ if ((ihd->ip6_vfc & 0xf0) != 0x60)
+ {
+ hex_code_header((char*) ihd, len > IP6HLEN ? IP6HLEN : len, hexbuf);
+ log_msg(L_DEBUG, "header \"%s\"", hexbuf);
+ return 0;
+ }
if (!has_tor_prefix(&ihd->ip6_dst))
{
- log_msg(L_ERROR, "[validate_frame] destination %s unreachable", inet_ntop(AF_INET6, &ihd->ip6_dst, buf, INET6_ADDRSTRLEN));
+ log_msg(L_ERROR, "destination %s unreachable", inet_ntop(AF_INET6, &ihd->ip6_dst, buf, INET6_ADDRSTRLEN));
return 0;
}
if (!has_tor_prefix(&ihd->ip6_src))
{
- log_msg(L_ERROR, "[validate_frame] source address invalid. Remote ocat could not reply");
+ log_msg(L_ERROR, "source address invalid. Remote ocat could not reply");
return 0;
}
#ifdef TEST_TUN_HDR
if (is_testping(&ihd->ip6_dst))
{
- log_msg(L_DEBUG, "[validate_frame] test ping detected");
+ log_msg(L_DEBUG, "test ping detected");
return 0;
}
#endif
@@ -277,6 +268,7 @@ int validate_frame(const struct ip6_hdr *ihd, int len)
}
+//k
void cleanup_socket(int fd, OcatPeer_t *peer)
{
log_msg(L_NOTICE, "fd %d reached EOF, closing.", fd);
@@ -287,9 +279,10 @@ void cleanup_socket(int fd, OcatPeer_t *peer)
}
+//k
void *socket_receiver(void *p)
{
- int fd, maxfd, len, state, plen;
+ int maxfd, len, plen;
char buf[FRAME_SIZE];
char addr[INET6_ADDRSTRLEN];
fd_set rset;
@@ -312,19 +305,26 @@ void *socket_receiver(void *p)
pthread_mutex_lock(&peer_mutex_);
for (peer = peer_; peer; peer = peer->next)
{
+ pthread_mutex_lock(&peer->mutex);
// only select active peers
if (peer->state != PEER_ACTIVE)
+ {
+ pthread_mutex_unlock(&peer->mutex);
continue;
- if ((fd = peer->tcpfd) >= FD_SETSIZE)
- log_msg(L_FATAL, "%d >= FD_SETIZE(%d)", fd, FD_SETSIZE), exit(1);
- FD_SET(fd, &rset);
- if (fd > maxfd)
- maxfd = fd;
+ }
+
+ if (peer->tcpfd >= FD_SETSIZE)
+ log_msg(L_FATAL, "%d >= FD_SETIZE(%d)", peer->tcpfd, FD_SETSIZE), exit(1);
+
+ FD_SET(peer->tcpfd, &rset);
+ if (peer->tcpfd > maxfd)
+ maxfd = peer->tcpfd;
+ pthread_mutex_unlock(&peer->mutex);
}
pthread_mutex_unlock(&peer_mutex_);
log_msg(L_DEBUG, "selecting...");
- if (select(maxfd + 1, &rset, NULL, NULL, NULL) == -1)
+ if ((maxfd = select(maxfd + 1, &rset, NULL, NULL, NULL)) == -1)
{
log_msg(L_FATAL, "select encountered error: \"%s\", restarting", strerror(errno));
continue;
@@ -334,54 +334,94 @@ void *socket_receiver(void *p)
if (FD_ISSET(lpfd_[0], &rset))
{
read(lpfd_[0], buf, FRAME_SIZE - 4);
- continue;
+ maxfd--;
}
//FIXME: should only run until num select returned
- for (peer = peer_; peer; peer = peer->next)
+ //for (peer = peer_; peer; peer = peer->next)
+
+ peer = NULL;
+ while (maxfd)
{
+ // the following 10 loc look somehow strange and someone may tend
+ // to write this as a for loop but it's necessary for thread locking!
pthread_mutex_lock(&peer_mutex_);
- state = peer->state;
- fd = peer->tcpfd;
+ if (!peer)
+ peer = peer_;
+ else if (!(peer = peer->next))
+ {
+ log_msg(L_FATAL, "fd %d ready but no peer found");
+ pthread_mutex_unlock(&peer_mutex_);
+ break;
+ }
+ pthread_mutex_lock(&peer->mutex);
pthread_mutex_unlock(&peer_mutex_);
- if (state != PEER_ACTIVE)
+ //state = peer->state;
+ //fd = peer->tcpfd;
+
+ if (peer->state != PEER_ACTIVE)
+ {
+ pthread_mutex_unlock(&peer->mutex);
continue;
+ }
- if (FD_ISSET(fd, &rset))
+ if (!FD_ISSET(peer->tcpfd, &rset))
{
- log_msg(L_DEBUG, "reading from %d", fd);
+ pthread_mutex_unlock(&peer->mutex);
+ continue;
+ }
+
+ maxfd--;
+
+ //if (FD_ISSET(fd, &rset))
+ //{
+ log_msg(L_DEBUG, "reading from %d", peer->tcpfd);
// read/append data to peer's fragment buffer
- if ((len = read(fd, peer->fragbuf + peer->fraglen, FRAME_SIZE - 4 - peer->fraglen)) == -1)
+ if ((len = read(peer->tcpfd, peer->fragbuf + peer->fraglen, FRAME_SIZE - 4 - peer->fraglen)) == -1)
{
// this might happen on linux, see SELECT(2)
- log_msg(L_DEBUG, "spurious wakup of %d: \"%s\"", fd, strerror(errno));
+ log_msg(L_DEBUG, "spurious wakup of %d: \"%s\"", peer->tcpfd, strerror(errno));
+ pthread_mutex_unlock(&peer->mutex);
continue;
}
- log_msg(L_DEBUG, "received %d bytes on %d", len, fd);
+ log_msg(L_DEBUG, "received %d bytes on %d", len, peer->tcpfd);
// if len == 0 EOF reached => close session
if (!len)
{
- log_msg(L_NOTICE, "fd %d reached EOF, closing.", fd);
- close(fd);
+ log_msg(L_NOTICE, "fd %d reached EOF, closing.", peer->tcpfd);
+ close(peer->tcpfd);
+ pthread_mutex_unlock(&peer->mutex);
pthread_mutex_lock(&peer_mutex_);
delete_peer(peer);
pthread_mutex_unlock(&peer_mutex_);
continue;
}
- pthread_mutex_lock(&peer_mutex_);
+ //pthread_mutex_lock(&peer_mutex_);
peer->fraglen += len;
// update timestamp
peer->time = time(NULL);
peer->in += len;
- pthread_mutex_unlock(&peer_mutex_);
+ //pthread_mutex_unlock(&peer_mutex_);
while (peer->fraglen >= IP6HLEN)
{
// check frame
plen = validate_frame((struct ip6_hdr*) peer->fragbuf, peer->fraglen);
+
+ // <FIXME> sometimes defragmentation looses sync due to currently unknown bug!
+ if (!plen)
+ {
+ log_msg(L_DEBUG, "FRAGBUF RESET!");
+ //pthread_mutex_lock(&peer_mutex_);
+ peer->fraglen = 0;
+ //pthread_mutex_unlock(&peer_mutex_);
+ break;
+ }
+ // </FIXME>
+
if (vrec_ && !plen)
{
log_msg(L_ERROR, "dropping frame");
@@ -395,24 +435,24 @@ void *socket_receiver(void *p)
break;
}
- pthread_mutex_lock(&peer_mutex_);
+ //pthread_mutex_lock(&peer_mutex_);
// set IP address if it is not set yet and frame is valid
if (plen && !memcmp(&peer->addr, &in6addr_any, sizeof(struct in6_addr)))
{
memcpy(&peer->addr, &((struct ip6_hdr*)peer->fragbuf)->ip6_src, sizeof(struct in6_addr));
- log_msg(L_NOTICE, "incoming connection on %d from %s is now identified", fd,
+ log_msg(L_NOTICE, "incoming connection on %d from %s is now identified", peer->tcpfd,
inet_ntop(AF_INET6, &peer->addr, addr, INET6_ADDRSTRLEN));
}
- pthread_mutex_unlock(&peer_mutex_);
+ //pthread_mutex_unlock(&peer_mutex_);
log_msg(L_DEBUG, "writing to tun %d framesize %d + 4", tunfd_[1], len);
if (write(tunfd_[1], &peer->fraghdr, len + 4) != (len + 4))
log_msg(L_ERROR, "could not write %d bytes to tunnel %d", len + 4, tunfd_[1]);
- pthread_mutex_lock(&peer_mutex_);
+ //pthread_mutex_lock(&peer_mutex_);
peer->fraglen -= len;
- pthread_mutex_unlock(&peer_mutex_);
+ //pthread_mutex_unlock(&peer_mutex_);
if (peer->fraglen)
{
@@ -421,13 +461,15 @@ void *socket_receiver(void *p)
}
else
log_msg(L_DEBUG, "fragbuf empty");
- }
- }
- }
- }
+ } // while (peer->fraglen >= IP6HLEN)
+ //}
+ pthread_mutex_unlock(&peer->mutex);
+ } // while (maxfd)
+ } // for (;;)
}
+//k
void set_nonblock(int fd)
{
long flags;
@@ -444,7 +486,8 @@ void set_nonblock(int fd)
}
-OcatPeer_t *insert_peer(int fd, const struct in6_addr *addr)
+//k
+int insert_peer(int fd, const struct in6_addr *addr, time_t dly)
{
OcatPeer_t *peer;
@@ -457,11 +500,15 @@ OcatPeer_t *insert_peer(int fd, const struct in6_addr *addr)
{
pthread_mutex_unlock(&peer_mutex_);
log_msg(L_ERROR, "could not get new empty peer");
- return NULL;
- }
+ return 0;
+ }
+ pthread_mutex_lock(&peer->mutex);
+ pthread_mutex_unlock(&peer_mutex_);
+
peer->tcpfd = fd;
peer->state = PEER_ACTIVE;
peer->otime = peer->time = time(NULL);
+ peer->sdelay = dly;
if (addr)
{
memcpy(&peer->addr, addr, sizeof(struct in6_addr));
@@ -469,17 +516,18 @@ OcatPeer_t *insert_peer(int fd, const struct in6_addr *addr)
}
else
peer->dir = PEER_INCOMING;
- pthread_mutex_unlock(&peer_mutex_);
+ pthread_mutex_unlock(&peer->mutex);
// wake up socket_receiver
log_msg(L_DEBUG, "waking up socket_receiver");
if (write(lpfd_[1], &fd, 1) != 1)
log_msg(L_FATAL, "couldn't write to socket_receiver pipe: \"%s\"", strerror(errno));
- return peer;
+ return 1;
}
+//k
void *socket_acceptor(void *p)
{
int fd;
@@ -511,20 +559,21 @@ void *socket_acceptor(void *p)
perror("onion_receiver:accept"), exit(1);
log_msg(L_NOTICE, "connection %d accepted on listener %d", fd, sockfd_);
- insert_peer(fd, NULL);
+ insert_peer(fd, NULL, 0);
}
return NULL;
}
+//k
int socks_connect(const struct in6_addr *addr)
{
- struct sockaddr_in in /* = {AF_INET, htons(tor_socks_port_), {htonl(INADDR_LOOPBACK)}}*/;
+ struct sockaddr_in in;
int fd, t;
char buf[FRAME_SIZE], onion[ONION_NAME_SIZE];
SocksHdr_t *shdr = (SocksHdr_t*) buf;
- OcatPeer_t *ohd;
+// OcatPeer_t *ohd;
log_msg(L_DEBUG, "[socks_connect] called");
@@ -547,13 +596,12 @@ int socks_connect(const struct in6_addr *addr)
t = time(NULL);
if (connect(fd, (struct sockaddr*) &in, sizeof(in)) < 0)
{
- log_msg(L_ERROR, "connect() failed");
+ log_msg(L_ERROR, "connect() to TOR failed");
close(fd);
return E_SOCKS_CONN;
}
- t = time(NULL) - t;
- log_msg(L_DEBUG, "connect()");
+ log_msg(L_DEBUG, "connected to TOR, doing SOCKS handshake");
shdr->ver = 4;
shdr->cmd = 1;
@@ -563,6 +611,7 @@ int socks_connect(const struct in6_addr *addr)
strcpy(buf + sizeof(SocksHdr_t) + 5, onion);
if (write(fd, shdr, sizeof(SocksHdr_t) + strlen(onion) + 6) != sizeof(SocksHdr_t) + strlen(onion) + 6)
+ // FIXME: there should be some additional error handling
log_msg(L_ERROR, "couldn't write %d bytes to SOCKS connection %d", sizeof(SocksHdr_t) + strlen(onion) + 6, fd);
log_msg(L_DEBUG, "connect request sent");
@@ -572,7 +621,7 @@ int socks_connect(const struct in6_addr *addr)
close(fd);
return E_SOCKS_REQ;
}
- log_msg(L_DEBUG, "socks response received");
+ log_msg(L_DEBUG, "SOCKS response received");
if (shdr->ver || (shdr->cmd != 90))
{
@@ -582,15 +631,13 @@ int socks_connect(const struct in6_addr *addr)
}
log_msg(L_NOTICE, "connection to %s successfully opened on fd %d", onion, fd);
- ohd = insert_peer(fd, addr);
- pthread_mutex_lock(&peer_mutex_);
- ohd->sdelay = t;
- pthread_mutex_unlock(&peer_mutex_);
+ insert_peer(fd, addr, time(NULL) - t);
return fd;
}
+//k
void socks_queue(const struct in6_addr *addr)
{
SocksQueue_t *squeue;
@@ -616,6 +663,7 @@ void socks_queue(const struct in6_addr *addr)
}
+//k
void *socks_connector(void *p)
{
OcatPeer_t *peer;
@@ -681,6 +729,7 @@ void *socks_connector(void *p)
}
+//k
void packet_forwarder(void)
{
char buf[FRAME_SIZE];
@@ -724,23 +773,30 @@ void packet_forwarder(void)
}
-void *socket_cleaner(void *p)
+//k
+void *socket_cleaner(void *ptr)
{
- OcatPeer_t *peer;
+ OcatPeer_t *peer, **p;
for (;;)
{
sleep(CLEANER_WAKEUP);
log_msg(L_DEBUG, "wakeup");
pthread_mutex_lock(&peer_mutex_);
- for (peer = peer_; peer; peer = peer->next)
+ for (p = &peer_; *p; p = &(*p)->next)
{
- if (peer->state && peer->time + MAX_IDLE_TIME < time(NULL))
+ pthread_mutex_lock(&(*p)->mutex);
+ if ((*p)->state && (*p)->time + MAX_IDLE_TIME < time(NULL))
{
+ peer = *p;
+ *p = peer->next;
log_msg(L_NOTICE, "peer %d timed out, closing.", peer->tcpfd);
close(peer->tcpfd);
+ pthread_mutex_unlock(&peer->mutex);
delete_peer(peer);
+ continue;
}
+ pthread_mutex_unlock(&(*p)->mutex);
}
pthread_mutex_unlock(&peer_mutex_);
}
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/onioncat.git
More information about the Pkg-privacy-commits
mailing list