[Pkg-privacy-commits] [obfsproxy] 262/353: Add unit tests for socks5.py

Ximin Luo infinity0 at moszumanska.debian.org
Sat Aug 22 13:02:08 UTC 2015


This is an automated email from the git hooks/post-receive script.

infinity0 pushed a commit to branch master
in repository obfsproxy.

commit fe3a0b2820e8e68543ef1c7aa6f3ffa1d572fb9c
Author: Yawning Angel <yawning at schwanenlied.me>
Date:   Mon Mar 10 10:26:42 2014 +0000

    Add unit tests for socks5.py
---
 obfsproxy/test/test_socks5.py | 438 ++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 438 insertions(+)

diff --git a/obfsproxy/test/test_socks5.py b/obfsproxy/test/test_socks5.py
new file mode 100644
index 0000000..8c710c0
--- /dev/null
+++ b/obfsproxy/test/test_socks5.py
@@ -0,0 +1,438 @@
+from twisted.internet import defer, error
+from twisted.trial import unittest
+from twisted.test import proto_helpers
+from twisted.python.failure import Failure
+
+from obfsproxy.network import socks5
+
+import binascii
+import struct
+
+class SOCKSv5Protocol_testMethodSelect(unittest.TestCase):
+
+    proto = None
+    tr = None
+
+    def _sendMsg(self, msg):
+        msg = binascii.unhexlify(msg)
+        self.proto.dataReceived(msg)
+
+    def _recvMsg(self, expected):
+        self.assertEqual(self.tr.value(), binascii.unhexlify(expected))
+        self.tr.clear()
+
+    def setUp(self):
+        factory = socks5.SOCKSv5Factory()
+        self.proto = factory.buildProtocol(('127.0.0.1', 0))
+        self.tr = proto_helpers.StringTransportWithDisconnection()
+        self.tr.protocol = self.proto
+        self.proto.makeConnection(self.tr)
+
+    def test_InvalidVersion(self):
+        """
+        Test Method Select message containing a invalid VER.
+        """
+
+        # VER = 03, NMETHODS = 01, METHODS = ['00']
+        self._sendMsg("030100")
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidNMethods(self):
+        """
+        Test Method Select message containing no methods.
+        """
+
+        # VER = 05, NMETHODS = 00
+        self._sendMsg("0500")
+        self.assertFalse(self.tr.connected)
+
+    def test_NoAuth(self):
+        """
+        Test Method Select message containing NO AUTHENTICATION REQUIRED.
+        """
+
+        # VER = 05, NMETHODS = 01, METHODS = ['00']
+        self._sendMsg("050100")
+
+        # VER = 05, METHOD = 00
+        self._recvMsg("0500")
+        self.assertEqual(self.proto.authMethod, socks5._SOCKS_AUTH_NO_AUTHENTICATION_REQUIRED)
+        self.assertEqual(self.proto.state, self.proto.ST_AUTHENTICATING)
+        self.assertTrue(self.tr.connected)
+
+        # Send the first byte of the request to prod it into ST_READ_REQUEST
+        self._sendMsg("05")
+        self.assertEqual(self.proto.state, self.proto.ST_READ_REQUEST)
+
+    def test_UsernamePasswd(self):
+        """
+        Test Method Select message containing USERNAME/PASSWORD.
+        """
+
+        # VER = 05, NMETHODS = 01, METHODS = ['02']
+        self._sendMsg("050102")
+
+        # VER = 05, METHOD = 02
+        self._recvMsg("0502")
+        self.assertEqual(self.proto.authMethod, socks5._SOCKS_AUTH_USERNAME_PASSWORD)
+        self.assertEqual(self.proto.state, self.proto.ST_AUTHENTICATING)
+        self.assertTrue(self.tr.connected)
+
+    def test_Both(self):
+        """
+        Test Method Select message containing both NO AUTHENTICATION REQUIRED
+        and USERNAME/PASSWORD.
+        """
+
+        # VER = 05, NMETHODS = 02, METHODS = [00, 02]
+        self._sendMsg("05020002")
+
+        # VER = 05, METHOD = 02
+        self._recvMsg("0502")
+        self.assertEqual(self.proto.authMethod, socks5._SOCKS_AUTH_USERNAME_PASSWORD)
+        self.assertEqual(self.proto.state, self.proto.ST_AUTHENTICATING)
+        self.assertTrue(self.tr.connected)
+
+    def test_Unknown(self):
+        """
+        Test Method Select message containing a unknown auth method.
+        """
+
+        # VER = 05, NMETHODS = 01, METHODS = [01]
+        self._sendMsg("050101")
+
+        # VER = 05, METHOD = ff
+        self._recvMsg("05ff")
+        self.assertEqual(self.proto.authMethod, socks5._SOCKS_AUTH_NO_ACCEPTABLE_METHODS)
+        self.assertFalse(self.tr.connected)
+
+    def test_BothUnknown(self):
+        """
+        Test Method Select message containing supported and unknown methods.
+        """
+
+        # VER = 05, NMETHODS = 03, METHODS = [00, 02, ff]
+        self._sendMsg("05030002ff")
+
+        # VER = 05, METHOD = 02
+        self._recvMsg("0502")
+        self.assertEqual(self.proto.authMethod, socks5._SOCKS_AUTH_USERNAME_PASSWORD)
+        self.assertEqual(self.proto.state, self.proto.ST_AUTHENTICATING)
+        self.assertTrue(self.tr.connected)
+
+    def test_TrailingGarbage(self):
+        """
+        Test Method Select message with a impatient client.
+        """
+
+        # VER = 05, NMETHODS = 01, METHODS = ['00'], Garbage= deadbabe
+        self._sendMsg("050100deadbabe")
+       
+        self.assertFalse(self.tr.connected)
+
+class SOCKSv5Protocol_testRfc1929Auth(unittest.TestCase):
+
+    proto = None
+    tr = None
+
+    def _sendMsg(self, msg):
+        msg = binascii.unhexlify(msg)
+        self.proto.dataReceived(msg)
+
+    def _recvMsg(self, expected):
+        self.assertEqual(self.tr.value(), binascii.unhexlify(expected))
+        self.tr.clear()
+
+    def _processAuthTrue(self, uname, passwd):
+        self.assertEqual(uname, "ABCDE")
+        self.assertEqual(passwd, "abcde")
+        return True
+
+    def _processAuthFalse(self, uname, passwd):
+        self.assertEqual(uname, "ABCDE")
+        self.assertEqual(passwd, "abcde")
+        return False
+
+    def setUp(self):
+        factory = socks5.SOCKSv5Factory()
+        self.proto = factory.buildProtocol(('127.0.0.1', 0))
+        self.tr = proto_helpers.StringTransportWithDisconnection()
+        self.tr.protocol = self.proto
+        self.proto.makeConnection(self.tr)
+
+        # Get things to where the next step is the client sends the auth message
+        self._sendMsg("050102")
+        self._recvMsg("0502")
+
+    def test_InvalidVersion(self):
+        """
+        Test auth request containing a invalid VER.
+        """
+
+        # VER = 03, ULEN = 5, UNAME = "ABCDE", PLEN = 5, PASSWD = "abcde"
+        self._sendMsg("03054142434445056162636465")
+
+        # VER = 01, STATUS = 01
+        self._recvMsg("0101")
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidUlen(self):
+        """
+        Test auth request with a invalid ULEN.
+        """
+
+        # VER = 01, ULEN = 0, UNAME = "", PLEN = 5, PASSWD = "abcde"
+        self._sendMsg("0100056162636465")
+
+        # VER = 01, STATUS = 01
+        self._recvMsg("0101")
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidPlen(self):
+        """
+        Test auth request with a invalid PLEN.
+        """
+
+        # VER = 01, ULEN = 5, UNAME = "ABCDE", PLEN = 0, PASSWD = ""
+        self._sendMsg("0105414243444500")
+
+        # VER = 01, STATUS = 01
+        self._recvMsg("0101")
+        self.assertFalse(self.tr.connected)
+
+    def test_ValidAuthSuccess(self):
+        """
+        Test auth request that is valid and successful at authenticating.
+        """
+
+        self.proto.processRfc1929Auth = self._processAuthTrue
+
+        # VER = 01, ULEN = 5, UNAME = "ABCDE", PLEN = 5, PASSWD = "abcde"
+        self._sendMsg("01054142434445056162636465")
+
+        # VER = 01, STATUS = 00
+        self._recvMsg("0100")
+        self.assertEqual(self.proto.state, self.proto.ST_READ_REQUEST)
+        self.assertTrue(self.tr.connected)
+
+    def test_ValidAuthFailure(self):
+        """
+        Test auth request that is valid and failed at authenticating.
+        """
+
+        self.proto.processRfc1929Auth = self._processAuthFalse
+
+        # VER = 01, ULEN = 5, UNAME = "ABCDE", PLEN = 5, PASSWD = "abcde"
+        self._sendMsg("01054142434445056162636465")
+
+        # VER = 01, STATUS = 01
+        self._recvMsg("0101")
+        self.assertFalse(self.tr.connected)
+
+    def test_TrailingGarbage(self):
+        """
+        Test auth request with a impatient client.
+        """
+
+        # VER = 01, ULEN = 5, UNAME = "ABCDE", PLEN = 5, PASSWD = "abcde", Garbage = deadbabe
+        self._sendMsg("01054142434445056162636465deadbabe")
+
+        self.assertFalse(self.tr.connected)
+
+
+class SOCKSv5Protocol_testRequest(unittest.TestCase):
+
+    proto = None
+    tr = None
+    connectDeferred = None
+
+    def _sendMsg(self, msg):
+        msg = binascii.unhexlify(msg)
+        self.proto.dataReceived(msg)
+
+    def _recvMsg(self, expected):
+        self.assertEqual(self.tr.value(), binascii.unhexlify(expected))
+        self.tr.clear()
+
+    def _recvFailureResponse(self, expected):
+        # VER = 05, REP = expected, RSV = 00, ATYPE = 01, BND.ADDR = 0.0.0.0, BND.PORT = 0000
+        fail_msg = "05" + binascii.hexlify(chr(expected)) + "0001000000000000"
+        self._recvMsg(fail_msg)
+
+    def _connectClassIPv4(self, addr, port, klass, *args):
+        self.assertEqual(addr, "127.0.0.1")
+        self.assertEqual(port, 9050)
+        self.connectDeferred = defer.Deferred()
+        self.connectDeferred.addCallback(self._connectIPv4)
+        return self.connectDeferred
+
+    def _connectIPv4(self, unused):
+        self.proto.sendReply(socks5.SOCKSv5Reply.Succeeded, struct.pack("!I", 0x7f000001), 9050)
+
+    def _connectClassIPv6(self, addr, port, klass, *args):
+        self.assertEqual(addr, "102:304:506:708:90a:b0c:d0e:f10")
+        self.assertEqual(port, 9050)
+        self.connectDeferred = defer.Deferred()
+        self.connectDeferred.addCallback(self._connectIPv6)
+        return self.connectDeferred
+
+    def _connectIPv6(self, unused):
+        addr = binascii.unhexlify("0102030405060708090a0b0c0d0e0f10")
+        self.proto.sendReply(socks5.SOCKSv5Reply.Succeeded, addr, 9050, socks5._SOCKS_ATYP_IP_V6)
+
+    def _connectClassDomainname(self, addr, port, klass, *args):
+        self.assertEqual(addr, "example.com")
+        self.assertEqual(port, 9050)
+        self.connectDeferred = defer.Deferred()
+        self.connectDeferred.addCallback(self._connectIPv4)
+        return self.connectDeferred
+
+    def setUp(self):
+        factory = socks5.SOCKSv5Factory()
+        self.proto = factory.buildProtocol(('127.0.0.1', 0))
+        self.tr = proto_helpers.StringTransportWithDisconnection()
+        self.tr.protocol = self.proto
+        self.proto.makeConnection(self.tr)
+        self.connectDeferred = None
+
+        # Get things to where the next step is the client sends the auth message
+        self._sendMsg("050100")
+        self._recvMsg("0500")
+
+    def test_InvalidVersion(self):
+        """
+        Test Request with a invalid VER.
+        """
+
+        # VER = 03, CMD = 01, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("030100017f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.GeneralFailure)
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidCommand(self):
+        """
+        Test Request with a invalid CMD.
+        """
+
+        # VER = 05, CMD = 05, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050500017f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.CommandNotSupported)
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidRsv(self):
+        """
+        Test Request with a invalid RSV.
+        """
+
+        # VER = 05, CMD = 01, RSV = 30, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050130017f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.GeneralFailure)
+        self.assertFalse(self.tr.connected)
+
+    def test_InvalidAtyp(self):
+        """
+        Test Request with a invalid ATYP.
+        """
+
+        # VER = 05, CMD = 01, RSV = 01, ATYPE = 05, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050100057f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.AddressTypeNotSupported)
+        self.assertFalse(self.tr.connected)
+
+    def test_CmdBind(self): 
+        """
+        Test Request with a BIND CMD.
+        """
+
+        # VER = 05, CMD = 02, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050200017f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.CommandNotSupported)
+        self.assertFalse(self.tr.connected)
+
+    def test_CmdUdpAssociate(self): 
+        """
+        Test Request with a UDP ASSOCIATE CMD.
+        """
+
+        # VER = 05, CMD = 03, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050300017f000001235a")
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.CommandNotSupported)
+        self.assertFalse(self.tr.connected)
+
+    def test_CmdConnectIPv4(self):
+        """
+        Test Successful Request with a IPv4 CONNECT.
+        """
+
+        self.proto.connectClass = self._connectClassIPv4
+
+        # VER = 05, CMD = 01, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050100017f000001235a")
+        self.connectDeferred.callback(self)
+
+        # VER = 05, REP = 00, RSV = 00, ATYPE = 01, BND.ADDR = 127.0.0.1, BND.PORT = 9050
+        self._recvMsg("050000017f000001235a")
+        self.assertEqual(self.proto.state, self.proto.ST_ESTABLISHED)
+        self.assertTrue(self.tr.connected)
+
+    def test_CmdConnectIPv6(self):
+        """
+        Test Successful Request with a IPv6 CONNECT.
+        """
+
+        self.proto.connectClass = self._connectClassIPv6
+
+        # VER = 05, CMD = 01, RSV = 00, ATYPE = 04, DST.ADDR = 0102:0304:0506:0708:090a:0b0c:0d0e:0f10, DST.PORT = 9050
+        self._sendMsg("050100040102030405060708090a0b0c0d0e0f10235a")
+        self.connectDeferred.callback(self)
+
+        # VER = 05, REP = 00, RSV = 00, ATYPE = 04, BND.ADDR = 0102:0304:0506:0708:090a:0b0c:0d0e:0f10, DST.PORT = 9050
+        self._recvMsg("050000040102030405060708090a0b0c0d0e0f10235a")
+        self.assertEqual(self.proto.state, self.proto.ST_ESTABLISHED)
+        self.assertTrue(self.tr.connected)
+
+    def test_CmdConnectDomainName(self):
+        """
+        Test Sucessful request with a DOMAINNAME CONNECT.
+        """
+
+        self.proto.connectClass = self._connectClassDomainname
+
+        # VER = 05, CMD = 01, RSV = 00, ATYPE = 04, DST.ADDR = example.com, DST.PORT = 9050
+        self._sendMsg("050100030b6578616d706c652e636f6d235a")
+        self.connectDeferred.callback(self)
+        
+        # VER = 05, REP = 00, RSV = 00, ATYPE = 01, BND.ADDR = 127.0.0.1, BND.PORT = 9050
+        self._recvMsg("050000017f000001235a")
+        self.assertEqual(self.proto.state, self.proto.ST_ESTABLISHED)
+        self.assertTrue(self.tr.connected)
+
+    def test_TrailingGarbage(self):
+        """
+        Test request with a impatient client.
+        """
+
+        # VER = 05, CMD = 01, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050, Garbage = deadbabe
+        self._sendMsg("050100017f000001235adeadbabe")
+
+        self.assertFalse(self.tr.connected)
+
+    def test_CmdConnectErrback(self):
+        """
+        Test Unsuccessful Request with a IPv4 CONNECT.
+        """
+
+        self.proto.connectClass = self._connectClassIPv4
+
+        # VER = 05, CMD = 01, RSV = 00, ATYPE = 01, DST.ADDR = 127.0.0.1, DST.PORT = 9050
+        self._sendMsg("050100017f000001235a")
+        self.connectDeferred.errback(Failure(error.ConnectionRefusedError("Foo")))
+
+        self._recvFailureResponse(socks5.SOCKSv5Reply.ConnectionRefused)
+        self.assertFalse(self.tr.connected)

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/obfsproxy.git



More information about the Pkg-privacy-commits mailing list