[Pkg-privacy-commits] [obfsproxy] 246/353: Import Philipp's scramblesuit unittests.

Ximin Luo infinity0 at moszumanska.debian.org
Sat Aug 22 13:02:05 UTC 2015


This is an automated email from the git hooks/post-receive script.

infinity0 pushed a commit to branch master
in repository obfsproxy.

commit e4913faf8af5888b9d0834abd5032d7393599d69
Author: George Kadianakis <desnacked at riseup.net>
Date:   Sun Mar 2 14:44:57 2014 +0000

    Import Philipp's scramblesuit unittests.
---
 obfsproxy/test/transports/test_scramblesuit.py | 318 +++++++++++++++++++++++++
 1 file changed, 318 insertions(+)

diff --git a/obfsproxy/test/transports/test_scramblesuit.py b/obfsproxy/test/transports/test_scramblesuit.py
new file mode 100644
index 0000000..24feb7c
--- /dev/null
+++ b/obfsproxy/test/transports/test_scramblesuit.py
@@ -0,0 +1,318 @@
+import unittest
+
+import os
+import util
+import const
+import mycrypto
+import uniformdh
+import scramblesuit
+import base64
+import shutil
+import tempfile
+
+import message
+
+import Crypto.Hash.SHA256
+import Crypto.Hash.HMAC
+
+import obfsproxy.common.log as logging
+import obfsproxy.network.buffer as obfs_buf
+import obfsproxy.common.transport_config as transport_config
+import obfsproxy.transports.base as base
+
+class CryptoTest( unittest.TestCase ):
+
+    """
+    The HKDF test cases are taken from the appendix of RFC 5869:
+    https://tools.ietf.org/html/rfc5869
+    """
+
+    def setUp( self ):
+        pass
+
+    def extract( self, salt, ikm ):
+        return Crypto.Hash.HMAC.new(salt, ikm, Crypto.Hash.SHA256).digest()
+
+    def runHKDF( self, ikm, salt, info, prk, okm ):
+        myprk = self.extract(salt, ikm)
+        self.failIf(myprk != prk)
+        myokm = mycrypto.HKDF_SHA256(myprk, info).expand()
+        self.failUnless(myokm in okm)
+
+    def test1_HKDF_TestCase1( self ):
+
+        ikm = "0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b".decode('hex')
+        salt = "000102030405060708090a0b0c".decode('hex')
+        info = "f0f1f2f3f4f5f6f7f8f9".decode('hex')
+        prk = ("077709362c2e32df0ddc3f0dc47bba6390b6c73bb50f9c3122e" + \
+              "c844ad7c2b3e5").decode('hex')
+        okm = ("3cb25f25faacd57a90434f64d0362f2a2d2d0a90cf1a5a4c5db" + \
+              "02d56ecc4c5bf34007208d5b887185865").decode('hex')
+
+        self.runHKDF(ikm, salt, info, prk, okm)
+
+    def test2_HKDF_TestCase2( self ):
+
+        ikm = ("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c" + \
+               "1d1e1f202122232425262728292a2b2c2d2e2f30313233343536373839" + \
+               "3a3b3c3d3e3f404142434445464748494a4b4c4d4e4f").decode('hex')
+        salt =("606162636465666768696a6b6c6d6e6f707172737475767778797a7b7c" + \
+               "7d7e7f808182838485868788898a8b8c8d8e8f90919293949596979899" + \
+               "9a9b9c9d9e9fa0a1a2a3a4a5a6a7a8a9aaabacadaeaf").decode('hex')
+        info =("b0b1b2b3b4b5b6b7b8b9babbbcbdbebfc0c1c2c3c4c5c6c7c8c9cacbcc" + \
+               "cdcecfd0d1d2d3d4d5d6d7d8d9dadbdcdddedfe0e1e2e3e4e5e6e7e8e9" + \
+               "eaebecedeeeff0f1f2f3f4f5f6f7f8f9fafbfcfdfeff").decode('hex')
+        prk = ("06a6b88c5853361a06104c9ceb35b45cef760014904671014a193f40c1" + \
+               "5fc244").decode('hex')
+        okm = ("b11e398dc80327a1c8e7f78c596a49344f012eda2d4efad8a050cc4c19" + \
+               "afa97c59045a99cac7827271cb41c65e590e09da3275600c2f09b83677" + \
+               "93a9aca3db71cc30c58179ec3e87c14c01d5c1" + \
+               "f3434f1d87").decode('hex')
+
+        self.runHKDF(ikm, salt, info, prk, okm)
+
+    def test3_HKDF_TestCase3( self ):
+        ikm = "0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b0b".decode('hex')
+        salt = ""
+        info = ""
+        prk = ("19ef24a32c717b167f33a91d6f648bdf96596776afdb6377a" + \
+               "c434c1c293ccb04").decode('hex')
+        okm = ("8da4e775a563c18f715f802a063c5a31b8a11f5c5ee1879ec" + \
+               "3454e5f3c738d2d9d201395faa4b61a96c8").decode('hex')
+
+        self.runHKDF(ikm, salt, info, prk, okm)
+
+    def test4_HKDF_TestCase4( self ):
+
+        self.assertRaises(ValueError,
+                          mycrypto.HKDF_SHA256, "x" * 40, length=(32*255)+1)
+
+        self.assertRaises(ValueError,
+                          mycrypto.HKDF_SHA256, "tooShort")
+
+        # Accidental re-use should raise an exception.
+        hkdf = mycrypto.HKDF_SHA256("x" * 40)
+        hkdf.expand()
+        self.assertRaises(base.PluggableTransportError, hkdf.expand)
+
+    def test4_CSPRNG( self ):
+        self.failIf(mycrypto.strongRandom(10) == mycrypto.strongRandom(10))
+        self.failIf(len(mycrypto.strongRandom(100)) != 100)
+
+    def test5_AES( self ):
+        plain = "this is a test"
+        key = os.urandom(16)
+        iv = os.urandom(8)
+
+        crypter1 = mycrypto.PayloadCrypter()
+        crypter1.setSessionKey(key, iv)
+        crypter2 = mycrypto.PayloadCrypter()
+        crypter2.setSessionKey(key, iv)
+
+        cipher = crypter1.encrypt(plain)
+
+        self.failIf(cipher == plain)
+        self.failUnless(crypter2.decrypt(cipher) == plain)
+
+    def test6_HMAC_SHA256_128( self ):
+        self.assertRaises(AssertionError, mycrypto.HMAC_SHA256_128,
+                          "x" * (const.SHARED_SECRET_LENGTH - 1), "test")
+
+        self.failUnless(len(mycrypto.HMAC_SHA256_128("x" * \
+                        const.SHARED_SECRET_LENGTH, "test")) == 16)
+
+
+class UniformDHTest( unittest.TestCase ):
+
+    def setUp( self ):
+        weAreServer = True
+        self.udh = uniformdh.new("A" * const.SHARED_SECRET_LENGTH, weAreServer)
+
+    def test1_createHandshake( self ):
+        handshake = self.udh.createHandshake()
+        self.failUnless((const.PUBLIC_KEY_LENGTH +
+                         const.MARK_LENGTH +
+                         const.HMAC_SHA256_128_LENGTH) <= len(handshake) <=
+                        (const.MARK_LENGTH +
+                         const.HMAC_SHA256_128_LENGTH +
+                         const.MAX_PADDING_LENGTH))
+
+    def test2_receivePublicKey( self ):
+        buf = obfs_buf.Buffer(self.udh.createHandshake())
+
+        def callback( masterKey ):
+            self.failUnless(len(masterKey) == const.MASTER_KEY_LENGTH)
+
+        self.failUnless(self.udh.receivePublicKey(buf, callback) == True)
+
+        publicKey = self.udh.getRemotePublicKey()
+        self.failUnless(len(publicKey) == const.PUBLIC_KEY_LENGTH)
+
+    def test3_invalidHMAC( self ):
+        # Make the HMAC invalid.
+        handshake = self.udh.createHandshake()
+        if handshake[-1] != 'a':
+            handshake = handshake[:-1] + 'a'
+        else:
+            handshake = handshake[:-1] + 'b'
+
+        buf = obfs_buf.Buffer(handshake)
+
+        self.failIf(self.udh.receivePublicKey(buf, lambda x: x) == True)
+
+class UtilTest( unittest.TestCase ):
+
+    def test1_isValidHMAC( self ):
+        self.failIf(util.isValidHMAC("A" * const.HMAC_SHA256_128_LENGTH,
+                                     "B" * const.HMAC_SHA256_128_LENGTH,
+                                     "X" * const.SHA256_LENGTH) == True)
+        self.failIf(util.isValidHMAC("A" * const.HMAC_SHA256_128_LENGTH,
+                                     "A" * const.HMAC_SHA256_128_LENGTH,
+                                     "X" * const.SHA256_LENGTH) == False)
+
+    def test2_locateMark( self ):
+        self.failIf(util.locateMark("D", "ABC") != None)
+
+        hmac = "X" * const.HMAC_SHA256_128_LENGTH
+        mark = "A" * const.MARK_LENGTH
+        payload = mark + hmac
+
+        self.failIf(util.locateMark(mark, payload) == None)
+        self.failIf(util.locateMark(mark, payload[:-1]) != None)
+
+    def test3_sanitiseBase32( self ):
+        self.failUnless(util.sanitiseBase32("abc") == "ABC")
+        self.failUnless(util.sanitiseBase32("ABC1XYZ") == "ABCIXYZ")
+        self.failUnless(util.sanitiseBase32("ABC1XYZ0") == "ABCIXYZO")
+
+    def test4_setStateLocation( self ):
+        name = (const.TRANSPORT_NAME).lower()
+
+        util.setStateLocation("/tmp")
+        self.failUnless(const.STATE_LOCATION == "/tmp/%s/" % name)
+
+        # Nothing should change if we pass "None".
+        util.setStateLocation(None)
+        self.failUnless(const.STATE_LOCATION == "/tmp/%s/" % name)
+
+        # Check if function creates non-existant directories.
+        d = tempfile.mkdtemp()
+        util.setStateLocation(d)
+        self.failUnless(const.STATE_LOCATION == "%s/%s/" % (d, name))
+        self.failUnless(os.path.exists("%s/%s/" % (d, name)))
+        shutil.rmtree(d)
+
+    def test5_getEpoch( self ):
+        e = util.getEpoch()
+        self.failUnless(isinstance(e, basestring))
+
+    def test6_writeToFile( self ):
+        f = tempfile.mktemp()
+        content = "ThisIsATest\n"
+        util.writeToFile(content, f)
+        self.failUnless(util.readFromFile(f) == content)
+        os.unlink(f)
+
+    def test7_readFromFile( self ):
+
+        # Read from non-existant file.
+        self.failUnless(util.readFromFile(tempfile.mktemp()) == None)
+
+        # Read file where we (hopefully) don't have permissions.
+        self.failUnless(util.readFromFile("/etc/shadow") == None)
+
+
+class MockArgs( object ):
+    uniformDHSecret = sharedSecret = ext_cookie_file = dest = None
+    mode = 'socks'
+
+
+class ScrambleSuitTransportTest( unittest.TestCase ):
+
+    def setUp( self ):
+        config = transport_config.TransportConfig( )
+        config.state_location = const.STATE_LOCATION
+        args = MockArgs( )
+        suit = scramblesuit.ScrambleSuitTransport
+        suit.weAreServer = False
+
+        self.suit = suit
+        self.args = args
+        self.config = config
+
+        self.validSecret = base64.b32encode( 'A' * const.SHARED_SECRET_LENGTH )
+        self.invalidSecret = 'a' * const.SHARED_SECRET_LENGTH
+
+    def test1_validateExternalModeCli( self ):
+        """Test with valid scramblesuit args and valid obfsproxy args."""
+        self.args.uniformDHSecret = self.validSecret
+
+        self.assertTrue(
+            super( scramblesuit.ScrambleSuitTransport,
+                   self.suit ).validate_external_mode_cli( self.args ))
+
+        self.assertIsNone( self.suit.validate_external_mode_cli( self.args ) )
+
+    def test2_validateExternalModeCli( self ):
+        """Test with invalid scramblesuit args and valid obfsproxy args."""
+        self.args.uniformDHSecret = self.invalidSecret
+
+        with self.assertRaises( base.PluggableTransportError ):
+            self.suit.validate_external_mode_cli( self.args )
+
+    def test3_get_public_server_options( self ):
+        scramblesuit.ScrambleSuitTransport.setup(transport_config.TransportConfig())
+        options = scramblesuit.ScrambleSuitTransport.get_public_server_options("")
+        self.failUnless("password" in options)
+
+        d = { "password": "3X5BIA2MIHLZ55UV4VAEGKZIQPPZ4QT3" }
+        options = scramblesuit.ScrambleSuitTransport.get_public_server_options(d)
+        self.failUnless("password" in options)
+        self.failUnless(options["password"] == "3X5BIA2MIHLZ55UV4VAEGKZIQPPZ4QT3")
+
+class MessageTest( unittest.TestCase ):
+
+    def test1_createProtocolMessages( self ):
+        # An empty message consists only of a header.
+        self.failUnless(len(message.createProtocolMessages("")[0]) == \
+                        const.HDR_LENGTH)
+
+        msg = message.createProtocolMessages('X' * const.MPU)
+        self.failUnless((len(msg) == 1) and (len(msg[0]) == const.MTU))
+
+        msg = message.createProtocolMessages('X' * (const.MPU + 1))
+        self.failUnless((len(msg) == 2) and \
+                        (len(msg[0]) == const.MTU) and \
+                        (len(msg[1]) == (const.HDR_LENGTH + 1)))
+
+    def test2_getFlagNames( self ):
+        self.failUnless(message.getFlagNames(0) == "Undefined")
+        self.failUnless(message.getFlagNames(1) == "PAYLOAD")
+        self.failUnless(message.getFlagNames(2) == "NEW_TICKET")
+        self.failUnless(message.getFlagNames(4) == "PRNG_SEED")
+
+    def test3_isSane( self ):
+        self.failUnless(message.isSane(0, 0, const.FLAG_NEW_TICKET) == True)
+        self.failUnless(message.isSane(const.MPU, const.MPU,
+                                       const.FLAG_PRNG_SEED) == True)
+        self.failUnless(message.isSane(const.MPU + 1, 0,
+                                       const.FLAG_PAYLOAD) == False)
+        self.failUnless(message.isSane(0, 0, 1234) == False)
+        self.failUnless(message.isSane(0, 1, const.FLAG_PAYLOAD) == False)
+
+    def test4_ProtocolMessage( self ):
+        flags = [const.FLAG_NEW_TICKET,
+                 const.FLAG_PAYLOAD,
+                 const.FLAG_PRNG_SEED]
+
+        self.assertRaises(base.PluggableTransportError,
+                          message.ProtocolMessage, "1", paddingLen=const.MPU)
+
+
+if __name__ == '__main__':
+    # Disable all logging as it would yield plenty of warning and error
+    # messages.
+    log = logging.get_obfslogger()
+    log.disable_logs()
+
+    unittest.main()

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/pkg-privacy/packages/obfsproxy.git



More information about the Pkg-privacy-commits mailing list