[Pkg-privacy-commits] [obfsproxy] 62/353: Introduce Twisted networking system.

Ximin Luo infinity0 at moszumanska.debian.org
Sat Aug 22 13:01:40 UTC 2015


This is an automated email from the git hooks/post-receive script.

infinity0 pushed a commit to branch master
in repository obfsproxy.

commit 69a13a4f5389c1902616409970fb73bf704e3a7e
Author: George Kadianakis <desnacked at riseup.net>
Date:   Mon Sep 17 04:06:04 2012 +0300

    Introduce Twisted networking system.
---
 src/obfsproxy/framework/network.py | 318 +++++++++++++++++++++++++++++++++++++
 src/obfsproxy/framework/socks.py   | 158 +++++++++---------
 2 files changed, 397 insertions(+), 79 deletions(-)

diff --git a/src/obfsproxy/framework/network.py b/src/obfsproxy/framework/network.py
new file mode 100644
index 0000000..d9e1f79
--- /dev/null
+++ b/src/obfsproxy/framework/network.py
@@ -0,0 +1,318 @@
+# XXX fix imports
+from twisted.python import failure
+from twisted.internet import reactor, error, address, tcp
+from twisted.internet.protocol import Protocol, Factory, ClientFactory
+
+import obfsproxy.common.log as log
+
+"""
+Networking subsystem:
+
+A "Connection" is a bidirectional communications channel, usually
+backed by a network socket. For example, the communication channel
+between tor and obfsproxy is a 'connection'. In the code, it's
+represented by a Twisted's twisted.internet.protocol.Protocol.
+
+A 'Circuit' is a pair of connections, referred to as the 'upstream'
+and 'downstream' connections. The upstream connection of a circuit
+communicates in cleartext with the higher-level program that wishes to
+make use of our obfuscation service. The downstream connection
+communicates in an obfuscated fashion with the remote peer that the
+higher-level client wishes to contact. In the code, it's represented
+by the custom Circuit class.
+
+The diagram below might help demonstrate the relationship between
+connections and circuits:
+
+                                   Downstream
+
+       'Circuit C'      'Connection CD'   'Connection SD'     'Circuit S'
+                     +-----------+          +-----------+
+     Upstream    +---|Obfsproxy c|----------|Obfsproxy s|----+   Upstream
+                 |   +-----------+    ^     +-----------+    |
+ 'Connection CU' |                    |                      | 'Connection SU'
+           +------------+           Sent over       +--------------+
+           | Tor Client |           the net         |  Tor Bridge  |
+           +------------+                           +--------------+
+
+In the above diagram, "Obfsproxy c" is the client-side obfsproxy, and
+"Obfsproxy s" is the server-side obfsproxy. "Connection CU" is the
+Client's Upstream connection, the communication channel between tor
+and obfsproxy. "Connection CD" is the Client's Downstream connection,
+the communication channel between obfsproxy and the remote peer. These
+two connections form the client's circuit "Circuit C".
+
+A 'listener' is a listening socket bound to a particular obfuscation
+protocol, represented using Twisted's t.i.p.Factory. Connecting to a
+listener creates one connection of a circuit, and causes this program
+to initiate the other connection (possibly after receiving in-band
+instructions about where to connect to). A listener is said to be a
+'client' listener if connecting to it creates the upstream connection,
+and a 'server' listener if connecting to it creates the downstream
+connection.
+
+There are two kinds of client listeners: a 'simple' client listener
+always connects to the same remote peer every time it needs to
+initiate a downstream connection; a 'socks' client listener can be
+told to connect to an arbitrary remote peer using the SOCKS protocol.
+"""
+
+class Circuit(Protocol):
+    """
+    A Circuit holds a pair of connections. The upstream connection and
+    the downstream. The circuit proxies data from one connection to
+    the other.
+
+    Attributes:
+    transport: the pluggable transport we should use to
+               obfuscate traffic on this circuit.
+
+    downstream: the downstream connection
+    upstream: the upstream connection
+    """
+
+    def __init__(self, transport):
+        self.transport = transport # takes a transport
+
+        self.downstream = None # takes a twisted transport
+        self.upstream = None # takes a twisted transport
+
+        self.name = "circ_%s" % hex(id(self))
+
+    def setDownstreamConnection(self, conn): # XXX merge set{Downstream,Upstream}Connection.
+        """
+        Set the downstream connection of a circuit.
+        """
+
+        log.debug("%s: Setting downstream connection (%s)." % (self.name, conn.name))
+        assert(not self.downstream)
+        self.downstream = conn
+
+        """We just completed the circuit, do a dummy dataReceived on
+        the initiating connection in case it had any buffered data
+        that must be flushed to the network."""
+        if self.circuitIsReady():
+            self.upstream.dataReceived('')
+
+    def setUpstreamConnection(self, conn):
+        """
+        Set the upstream connection of a circuit.
+        """
+
+        log.debug("%s: Setting upstream connection (%s)." % (self.name, conn.name))
+        assert(not self.upstream)
+        self.upstream = conn
+
+        """We just completed the circuit, do a dummy dataReceived on
+        the initiating connection in case it had any buffered data
+        that must be flushed to the network."""
+        if self.circuitIsReady():
+            self.downstream.dataReceived('')
+
+    def circuitIsReady(self):
+        """
+        Return True if the circuit is completed.
+        """
+
+        return self.downstream and self.upstream
+
+    def dataReceived(self, data, conn):
+        """
+        We received 'data' on 'conn'. Pass the data to our transport,
+        and then proxy it to the other side.
+
+        Requires both downstream and upstream connections to be set.
+
+        Returns the bytes that were not used by the transport.
+        """
+        log.debug("%s: Received %d bytes." % (self.name, len(data)))
+
+        assert(self.downstream and self.upstream)
+        assert((conn is self.downstream) or (conn is self.upstream))
+
+        if conn is self.downstream:
+            leftovers = self.transport.receivedDownstream(data, self)
+        else:
+            leftovers = self.transport.receivedUpstream(data, self)
+
+        return leftovers
+
+    def close(self):
+        """
+        Tear down the circuit.
+        """
+
+        log.info("%s: Closing down circuit." % self.name)
+        if self.downstream: self.downstream.close()
+        if self.upstream: self.upstream.close()
+
+class StaticDestinationProtocol(Protocol):
+    """
+    Represents a connection to a static destination (as opposed to a
+    SOCKS connection).
+
+    Attributes:
+    mode: 'server' or 'client'
+    circuit: The circuit this connection belongs to.
+    direction: 'downstream' or 'upstream'
+
+    buffer: Buffer that holds data that can't be proxied right
+            away. This can happen because the circuit is not yet
+            complete, or because the pluggable transport needs more
+            data before deciding what to do.
+    """
+
+    def __init__(self, circuit, mode):
+        self.mode = mode
+        self.circuit = circuit
+        self.direction = None # XXX currently unused
+
+        self.name = "conn_%s" % hex(id(self))
+
+    def connectionMade(self):
+        """
+        Callback for when a connection is successfully established.
+
+        Find the connection's direction in the circuit, and register
+        it in our circuit.
+        """
+
+        # Initialize the buffer for this connection:
+        self.buffer = ''
+
+        # Find the connection's direction and register it in the circuit.
+        if self.mode == 'client' and not self.circuit.upstream:
+            log.info("%s: connectionMade (client): " \
+                     "Setting it as upstream on our circuit." % self.name)
+
+            self.circuit.setUpstreamConnection(self)
+            self.direction = 'upstream'
+        elif self.mode == 'client':
+            log.info("%s: connectionMade (client): " \
+                     "Setting it as downstream on our circuit." % self.name)
+
+            self.circuit.setDownstreamConnection(self)
+            self.direction = 'downstream'
+        elif self.mode == 'server' and not self.circuit.downstream:
+            log.info("%s: connectionMade (server): " \
+                     "Setting it as downstream on our circuit." % self.name)
+
+            self.circuit.setDownstreamConnection(self)
+            self.direction = 'downstream'
+        elif self.mode == 'server':
+            log.info("%s: connectionMade (server): " \
+                     "Setting it as upstream on our circuit." % self.name)
+
+            self.circuit.setUpstreamConnection(self)
+            self.direction = 'upstream'
+
+    def connectionLost(self, reason):
+        log.info("%s: Connection was lost (%s)." % (self.name, reason.getErrorMessage()))
+
+    def connectionFailed(self, reason):
+        log.info("%s: Connection failed to connect (%s)." % (self.name, reason.getErrorMessage()))
+
+    def dataReceived(self, data):
+        """
+        We received some data from the network. See if we have a
+        complete circuit, and pass the data to it they get proxied.
+
+        XXX: Can also be called with empty 'data' because of
+        Circuit.setDownstreamConnection(). Document or split function.
+        """
+        if (not self.buffer) and (not data):
+            log.info("%s: dataReceived called without a reason.")
+            return
+
+        log.debug("%s: Received %d bytes (and %d cached):\n%s" \
+                  % (self.name, len(data), len(self.buffer), str(data)))
+
+        # Circuit is not fully connected yet, cache what we got.
+        if not self.circuit.circuitIsReady():
+            log.debug("%s: Caching them %d bytes." % (self.name, len(data)))
+            self.buffer += data
+            return
+
+        # Send received and buffered data to the circuit. Buffer the
+        # data that the transport could not use.
+        self.buffer = self.circuit.dataReceived(self.buffer + data, self)
+
+    def write(self, buf):
+        log.debug("%s: Writing %d bytes." % (self.name, len(buf)))
+        self.transport.write(buf)
+
+    def close(self):
+        log.debug("%s: Closing connection." % self.name)
+        self.transport.loseConnection()
+
+class StaticDestinationClientFactory(Factory):
+    """
+    Created when our listener receives a client connection. Makes the
+    connection that connects to the other end of the circuit.
+    """
+
+    def __init__(self, circuit, mode):
+        self.circuit = circuit
+        self.mode = mode
+
+        self.name = "fact_c_%s" % hex(id(self))
+
+    def buildProtocol(self, addr):
+        return StaticDestinationProtocol(self.circuit, self.mode)
+
+    def startedConnecting(self, connector):
+        log.debug("%s: Client factory started connecting." % self.name)
+
+    def clientConnectionLost(self, connector, reason):
+        log.info("%s: Connection was lost (%s)." % (self.name, reason.getErrorMessage()))
+        self.circuit.close()
+
+    def clientConnectionFailed(self, connector, reason):
+        log.debug("%s: Connection failed to connect (%s)." % (self.name, reason.getErrorMessage()))
+        self.circuit.close()
+
+class StaticDestinationServerFactory(Factory):
+    """
+    Represents a listener. Upon receiving a connection, it creates a
+    circuit and tries to establish the other side of the circuit. It
+    then listens for data to obfuscate and proxy.
+
+    Attributes:
+
+    remote_host: The IP/DNS information of the host on the other side
+                 of the circuit.
+    remote_port: The TCP port fo the host on the other side of the circuit.
+    mode: 'server' or 'client'
+    transport: the pluggable transport we should use to
+               obfuscate traffic on this connection.
+    circuits: A list with all the circuits this listener has created.
+    """
+
+    def __init__(self, remote_addrport, mode, transport):
+        self.remote_host = remote_addrport[0]
+        self.remote_port = int(remote_addrport[1])
+        self.mode = mode
+        self.transport = transport
+
+        self.name = "fact_s_%s" % hex(id(self))
+
+        # XXX currently unused. Remove to reduce memory fpr?
+        self.circuits = []
+
+        assert(self.mode == 'client' or self.mode == 'server')
+
+    def startFactory(self):
+        log.info("%s: Starting up static destination server factory." % self.name)
+
+    def buildProtocol(self, addr):
+        log.info("%s: New connection." % self.name)
+
+        circuit = Circuit(self.transport)
+        self.circuits.append(circuit) # XXX when do we remove this?
+
+        # XXX instantiates a new factory for each client
+        clientFactory = StaticDestinationClientFactory(circuit, self.mode)
+        reactor.connectTCP(self.remote_host, self.remote_port, clientFactory)
+
+        return StaticDestinationProtocol(circuit, self.mode)
+
diff --git a/src/obfsproxy/framework/socks.py b/src/obfsproxy/framework/socks.py
index 1b3cabc..debd185 100644
--- a/src/obfsproxy/framework/socks.py
+++ b/src/obfsproxy/framework/socks.py
@@ -1,112 +1,112 @@
-#!/usr/bin/python
-# -*- coding: utf-8 -*-
+from twisted.protocols import socks
+from twisted.internet.protocol import Protocol, Factory, ClientFactory
 
-"""
-The socks module contains the SocksHandler class, which implements the client-side handling of pluggable transports.
-"""
-
-from struct import unpack
-from socket import inet_ntoa
+import obfsproxy.common.log as log
+import obfsproxy.framework.network as network
 
-import monocle
-monocle.init('tornado')
+class MySOCKSv4Outgoing(socks.SOCKSv4Outgoing, object):
+    """
+    Represents a downstream connection from the SOCKS server to the
+    destination.
+
+    It monkey-patches socks.SOCKSv4Outgoing, because we need to pass
+    our data to the pluggable transport before proxying them
+    (Twisted's socks module did not support that).
+
+    Attributes:
+    circuit: The circuit this connection belongs to.
+    buffer: Buffer that holds data that can't be proxied right
+            away. This can happen because the circuit is not yet
+            complete, or because the pluggable transport needs more
+            data before deciding what to do.
+    """
 
-from monocle import _o, Return
-from monocle.stack.network import Client
+    def __init__(self, socksProtocol):
+        """
+        Constructor.
 
-from obfsproxy.util import encode
-from obfsproxy.framework.pump import Pump
+        'socksProtocol' is a 'SOCKSv4Protocol' object.
+        """
 
-import obfsproxy.common.log as log
+        self.circuit = socksProtocol.circuit
+        self.buffer = ''
 
+        self.name = "socks_down_%s" % hex(id(self))
 
-def uncompact(x):
-    """ uncompact is a convenience method for unpacking an IPv4 address from its byte representation. """
+        return super(MySOCKSv4Outgoing, self).__init__(socksProtocol)
 
-    (ip, port) = unpack('!4sH', x)
-    return (inet_ntoa(ip), port)
+    def dataReceived(self, data):
+        log.debug("%s: Received %d bytes:\n%s" \
+                  % (self.name, len(data), str(data)))
 
+        assert(self.circuit.circuitIsReady()) # XXX Is this always true?
 
- at _o
-def readHandshake(conn):
-    """ readHandshake reads the SOCKS handshake information to the SOCKS client. """
+        self.buffer = self.circuit.dataReceived(self.buffer + data, self)
 
-    version = (yield conn.read(1))
-    log.info('version: %s' % encode(str(version)))
-    nauth = (yield conn.read(1))
-    nauth = unpack('B', nauth)[0]
-    auths = []
-    for x in range(nauth):
-        auth = (yield conn.read(1))
-        auth = unpack('B', auth)[0]
-        auths.append(auth)
+# Monkey patches socks.SOCKSv4Outgoing with our own class.
+socks.SOCKSv4Outgoing = MySOCKSv4Outgoing
 
+class SOCKSv4Protocol(socks.SOCKSv4):
+    """
+    Represents an upstream connection from a SOCKS client to our SOCKS
+    server.
 
- at _o
-def sendHandshake(output):
-    """ sendHandshake sends the SOCKS handshake information to the SOCKS client. """
+    It overrides socks.SOCKSv4 because py-obfsproxy's connections need
+    to have a circuit and obfuscate traffic before proxying it.
+    """
 
-    yield output.write('\x05\x00')
+    def __init__(self, circuit):
+        self.circuit = circuit
+        self.buffer = ''
 
+        self.name = "socks_up_%s" % hex(id(self))
 
- at _o
-def readRequest(conn):
-    """ readRequest reads the SOCKS request information from the client and returns the bytes represneting the IPv4 destination. """
+        return socks.SOCKSv4.__init__(self)
 
-    version = (yield conn.read(1))
-    command = (yield conn.read(1))
-    reserved = (yield conn.read(1))
-    addrtype = (yield conn.read(1))
-    dest = (yield conn.read(6))
+    def dataReceived(self, data):
+        """
+        Received some 'data'. They might be SOCKS handshake data, or
+        actual upstream traffic. Figure out what it is and either
+        complete the SOCKS handshake or proxy the traffic.
+        """
 
-    yield Return(dest)
+        # SOCKS handshake not completed yet: let the overriden socks
+        # module complete the handshake.
+        if not self.otherConn:
+            log.debug("%s: Received SOCKS handshake data." % self.name)
+            return socks.SOCKSv4.dataReceived(self, data)
 
+        log.debug("%s: Received %d bytes:\n%s" \
+                  % (self.name, len(data), str(data)))
 
- at _o
-def sendResponse(dest, output):
-    """ sendResponse sends the SOCKS response to the request. """
+        assert(self.otherConn)
 
-    yield output.write('\x05\x00\x00\x01' + dest)
+        if not self.circuit.circuitIsReady():
+            self.circuit.setDownstreamConnection(self.otherConn)
+            self.circuit.setUpstreamConnection(self)
 
+        self.buffer = self.circuit.dataReceived(self.buffer + data, self)
 
-class SocksHandler:
 
+class SOCKSv4Factory(Factory):
     """
-    The SocksHandler class implements the client-side handling of pluggable transports.
+    A SOCKSv4 factory.
     """
 
-    transport = None
-
-    def setTransport(self, transport):
-        """ setTransport sets the pluggable transport for this proxy server """
-
+    def __init__(self, transport):
+        # XXX self.logging = log
         self.transport = transport
+        self.circuits = []
 
-    @_o
-    def handle(self, conn):
-        """ handle is called by the framework to establish a new connection to the proxy server and start processing when an incoming SOCKS client connection is established. """
-
-        log.info('handle_socks')
-        yield readHandshake(conn)
-        log.error('read handshake')
-        yield sendHandshake(conn)
-        log.error('send handshake')
-        dest = (yield readRequest(conn))
-#        log.error('read request: %s' % str(dest))
-        yield sendResponse(dest, conn)
-        log.error('sent response')
-
-        (addr, port) = uncompact(dest)
-	log.error('connecting %s:%d' % (addr, port))
-
-        log.info(addr)
-        log.info(port)
+        self.name = "socks_fact_%s" % hex(id(self))
 
-        client = Client()
-        yield client.connect(addr, port)
-        log.info('connected %s:%d' % (addr, port))
+    def startFactory(self):
+        log.info("%s: Starting up SOCKS server factory." % self.name)
 
-        self.pump = Pump(conn, client, self.transport)
-        yield self.pump.run()
+    def buildProtocol(self, addr):
+        log.info("%s: New connection." % self.name)
 
+        circuit = network.Circuit(self.transport)
+        self.circuits.append(circuit)
 
+        return SOCKSv4Protocol(circuit)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/obfsproxy.git



More information about the Pkg-privacy-commits mailing list