[Python-modules-commits] [python-asyncssh] 01/06: Import python-asyncssh_1.7.3.orig.tar.gz

Vincent Bernat bernat at moszumanska.debian.org
Sun Dec 25 09:32:41 UTC 2016


This is an automated email from the git hooks/post-receive script.

bernat pushed a commit to branch master
in repository python-asyncssh.

commit 46e25dbe90822ebc8bc2aef1fd21acb0ccce11bd
Author: Vincent Bernat <bernat at debian.org>
Date:   Sun Dec 25 10:25:46 2016 +0100

    Import python-asyncssh_1.7.3.orig.tar.gz
---
 .travis.yml                   |  18 +--
 asyncssh/version.py           |   2 +-
 docs/changes.rst              |  13 ++
 docs/index.rst                |  12 +-
 tests/server.py               |  29 ++--
 tests/test_agent.py           |   9 +-
 tests/test_channel.py         |   3 +
 tests/test_connection_auth.py |  24 ++-
 tests/test_kex.py             |  10 +-
 tests/test_process.py         |   2 +-
 tests/test_public_key.py      | 355 ++++++++++++++++++++++++++----------------
 11 files changed, 310 insertions(+), 167 deletions(-)

diff --git a/.travis.yml b/.travis.yml
index e4227f7..d471d7d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,7 +5,7 @@ install:
 
 matrix:
   allow_failures:
-    - python: nightly
+    - python: 3.6-dev
   fast_finish: true
   include:
     - os: linux
@@ -27,40 +27,38 @@ matrix:
     - os: linux
       sudo: required
       dist: trusty
-      python: nightly
+      python: 3.6-dev
       before_install:
         - .travis/build-libsodium.sh
       env:
         - TOXENV=py36
     - os: osx
-      osx_image: xcode7.3
+      osx_image: xcode8.1
       language: generic
       env:
         - CPPFLAGS=-I/usr/local/opt/openssl/include
         - LDFLAGS=-L/usr/local/opt/openssl/lib -L/usr/local/opt/libffi/lib
-        - PATH=$HOME/.pyenv/bin:$PATH
+        - PATH=$HOME/.pyenv/bin:/usr/local/opt/openssl/bin:$PATH
         - TOXENV=py34
       before_install:
         - brew update
-        - brew install libffi libsodium openssl pyenv pyenv-virtualenv
+        - brew install libffi libsodium
         - eval "$(pyenv init -)"
-        - eval "$(pyenv virtualenv-init -)"
         - pyenv install 3.4.5
         - pyenv local 3.4.5
         - pyenv rehash
     - os: osx
-      osx_image: xcode7.3
+      osx_image: xcode8.1
       language: generic
       env:
         - CPPFLAGS=-I/usr/local/opt/openssl/include
         - LDFLAGS=-L/usr/local/opt/openssl/lib -L/usr/local/opt/libffi/lib
-        - PATH=$HOME/.pyenv/bin:$PATH
+        - PATH=$HOME/.pyenv/bin:/usr/local/opt/openssl/bin:$PATH
         - TOXENV=py35
       before_install:
         - brew update
-        - brew install libffi libsodium openssl pyenv pyenv-virtualenv
+        - brew install libffi libsodium
         - eval "$(pyenv init -)"
-        - eval "$(pyenv virtualenv-init -)"
         - pyenv install 3.5.2
         - pyenv local 3.5.2
         - pyenv rehash
diff --git a/asyncssh/version.py b/asyncssh/version.py
index dcb77b0..e46454b 100644
--- a/asyncssh/version.py
+++ b/asyncssh/version.py
@@ -18,4 +18,4 @@ __author_email__ = 'ronf at timeheart.net'
 
 __url__ = 'http://asyncssh.timeheart.net'
 
-__version__ = '1.7.2'
+__version__ = '1.7.3'
diff --git a/docs/changes.rst b/docs/changes.rst
index fffa15f..8593784 100644
--- a/docs/changes.rst
+++ b/docs/changes.rst
@@ -3,6 +3,19 @@
 Change Log
 ==========
 
+Release 1.7.3 (22 Nov 2016)
+---------------------------
+
+* Updated unit tests to run properly in environments where OpenSSH
+  and OpenSSL are not installed.
+
+* Updated a process unit test to not depend on the system's default
+  file encoding being UTF-8.
+
+* Updated Mac TravisCI builds to use Xcode 8.1.
+
+* Cleaned up some wording in the documentation.
+
 Release 1.7.2 (28 Oct 2016)
 ---------------------------
 
diff --git a/docs/index.rst b/docs/index.rst
index 30aa010..bd285fa 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -33,8 +33,8 @@ This example only uses the output on stdout, but output on stderr is also
 collected as another attribute in the returned :class:`SSHCompletedProcess`
 object.
 
-To check against a different set of server host keys, they can be read
-and provided in the known_hosts argument when the connection is opened:
+To check against a different set of server host keys, they can be provided
+in the known_hosts argument when the connection is opened:
 
    .. code::
 
@@ -53,7 +53,7 @@ provided:
      async with asyncssh.connect('localhost', username='user123') as conn:
 
 To use a different set of client keys for authentication, they can be
-read and provided in the client_keys argument:
+provided in the client_keys argument:
 
    .. code::
 
@@ -132,9 +132,9 @@ and the output written to '/tmp/stdout' should contain the reversed lines
 
 The ``stdin``, ``stdout``, and ``stderr`` arguments support redirecting
 to a variety of locations include local files, pipes, and sockets as
-well as an :class:`SSHReader` or :class:`SSHWriter` objects associated
-with other remote SSH processes. Here's an example of piping stdout from
-a local process to a remote process:
+well as :class:`SSHReader` or :class:`SSHWriter` objects associated with
+other remote SSH processes. Here's an example of piping stdout from a
+local process to a remote process:
 
    .. include:: ../examples/redirect_local_pipe.py
       :literal:
diff --git a/tests/server.py b/tests/server.py
index 8c198b7..f657d17 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -15,6 +15,7 @@
 import asyncio
 import os
 import signal
+import subprocess
 
 import asyncssh
 from asyncssh.misc import async_context_manager
@@ -131,17 +132,21 @@ class ServerTestCase(AsyncTestCase):
                                                       cls._server_port))
         run('cat skey.pub >> .ssh/known_hosts')
 
-        output = run('ssh-agent -a agent 2>/dev/null')
-        cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
+        os.environ['LOGNAME'] = 'guest'
+        os.environ['HOME'] = '.'
 
-        os.environ['SSH_AUTH_SOCK'] = 'agent'
+        try:
+            output = run('ssh-agent -a agent 2>/dev/null')
+        except subprocess.CalledProcessError: # pragma: no cover
+            cls._agent_pid = None
+        else:
+            cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
 
-        agent = yield from asyncssh.connect_agent()
-        yield from agent.add_keys([ckey_dsa, (ckey, ckey_cert)])
-        agent.close()
+            os.environ['SSH_AUTH_SOCK'] = 'agent'
 
-        os.environ['LOGNAME'] = 'guest'
-        os.environ['HOME'] = '.'
+            agent = yield from asyncssh.connect_agent()
+            yield from agent.add_keys([ckey_dsa, (ckey, ckey_cert)])
+            agent.close()
 
     @classmethod
     @asyncio.coroutine
@@ -154,10 +159,16 @@ class ServerTestCase(AsyncTestCase):
         cls._server.close()
         yield from cls._server.wait_closed()
 
-        os.kill(cls._agent_pid, signal.SIGTERM)
+        if cls._agent_pid: # pragma: no branch
+            os.kill(cls._agent_pid, signal.SIGTERM)
 
     # pylint: enable=invalid-name
 
+    def agent_available(self):
+        """Return whether SSH agent is available"""
+
+        return bool(self._agent_pid)
+
     @asyncio.coroutine
     def create_connection(self, client_factory, loop=(), **kwargs):
         """Create a connection to the test server"""
diff --git a/tests/test_agent.py b/tests/test_agent.py
index eb06911..966d839 100644
--- a/tests/test_agent.py
+++ b/tests/test_agent.py
@@ -16,6 +16,8 @@ import asyncio
 import functools
 import os
 import signal
+import subprocess
+import unittest
 
 import asyncssh
 
@@ -105,7 +107,12 @@ class _TestAPI(AsyncTestCase):
         os.environ['DISPLAY'] = ''
         os.environ['HOME'] = '.'
         os.environ['SSH_ASKPASS'] = os.path.join(os.getcwd(), 'ssh-askpass')
-        output = run('ssh-agent -a agent 2>/dev/null')
+
+        try:
+            output = run('ssh-agent -a agent 2>/dev/null')
+        except subprocess.CalledProcessError: # pragma: no cover
+            raise unittest.SkipTest('ssh-agent not available')
+
         cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
         os.environ['SSH_AUTH_SOCK'] = 'agent'
 
diff --git a/tests/test_channel.py b/tests/test_channel.py
index 487464a..46d0b3b 100644
--- a/tests/test_channel.py
+++ b/tests/test_channel.py
@@ -734,6 +734,9 @@ class _TestChannel(ServerTestCase):
     def test_agent_forwarding(self):
         """Test SSH agent forwarding"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         with (yield from self.connect(username='ckey',
                                       agent_forwarding=True)) as conn:
             chan, session = yield from _create_session(conn, 'agent')
diff --git a/tests/test_connection_auth.py b/tests/test_connection_auth.py
index ab93faf..46e51e4 100644
--- a/tests/test_connection_auth.py
+++ b/tests/test_connection_auth.py
@@ -320,6 +320,9 @@ class _TestPublicKeyAuth(ServerTestCase):
     def test_agent_auth(self):
         """Test connecting with ssh-agent authentication"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         with (yield from self.connect(username='ckey')) as conn:
             pass
 
@@ -329,6 +332,9 @@ class _TestPublicKeyAuth(ServerTestCase):
     def test_agent_signature_algs(self):
         """Test ssh-agent keys with specific signature algorithms"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'):
             with (yield from self.connect(username='ckey',
                                           signature_algs=[alg])) as conn:
@@ -340,6 +346,9 @@ class _TestPublicKeyAuth(ServerTestCase):
     def test_agent_auth_failure(self):
         """Test failure connecting with ssh-agent authentication"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         with patch.dict(os.environ, HOME='xxx'):
             with self.assertRaises(asyncssh.DisconnectError):
                 yield from self.connect(username='ckey', agent_path='xxx',
@@ -388,7 +397,8 @@ class _TestPublicKeyAuth(ServerTestCase):
 
         with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg',
                    skip_ext_info):
-            with (yield from self.connect(username='ckey')) as conn:
+            with (yield from self.connect(username='ckey', client_keys='ckey',
+                                          agent_path=None)) as conn:
                 pass
 
         yield from conn.wait_closed()
@@ -444,6 +454,9 @@ class _TestPublicKeyAuth(ServerTestCase):
     def test_client_key_agent_keypairs(self):
         """Test client keys passed in as a list of SSHAgentKeyPairs"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         agent = yield from asyncssh.connect_agent()
 
         for key in (yield from agent.get_keys()):
@@ -545,6 +558,9 @@ class _TestPublicKeyAuth(ServerTestCase):
     def test_callback_sshkeypair(self):
         """Test client key passed in as an SSHKeyPair by callback"""
 
+        if not self.agent_available(): # pragma: no cover
+            self.skipTest('ssh-agent not available')
+
         agent = yield from asyncssh.connect_agent()
         keylist = yield from agent.get_keys()
 
@@ -568,7 +584,8 @@ class _TestPublicKeyAuth(ServerTestCase):
 
         with patch('asyncssh.connection.SSHClientConnection',
                    _UnknownAuthClientConnection):
-            with (yield from self.connect(username='ckey')) as conn:
+            with (yield from self.connect(username='ckey', client_keys='ckey',
+                                          agent_path=None)) as conn:
                 pass
 
         yield from conn.wait_closed()
@@ -938,7 +955,8 @@ class _TestLoginTimeoutDisabled(ServerTestCase):
     def test_login_timeout_disabled(self):
         """Test with login timeout disabled"""
 
-        with (yield from self.connect()) as conn:
+        with (yield from self.connect(username='ckey',
+                                      client_keys='ckey')) as conn:
             pass
 
         yield from conn.wait_closed()
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 9311eca..985670e 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -16,7 +16,7 @@ import asyncio
 
 from hashlib import sha1
 
-from .util import asynctest, run, ConnectionStub, AsyncTestCase
+import asyncssh
 
 from asyncssh.dh import MSG_KEXDH_INIT, MSG_KEXDH_REPLY
 from asyncssh.dh import _KexDHGex, MSG_KEX_DH_GEX_GROUP
@@ -25,8 +25,9 @@ from asyncssh.ecdh import MSG_KEX_ECDH_INIT, MSG_KEX_ECDH_REPLY
 from asyncssh.kex import register_kex_alg, get_kex_algs, get_kex
 from asyncssh.misc import DisconnectError
 from asyncssh.packet import SSHPacket, Byte, MPInt, String
-from asyncssh.public_key import decode_ssh_public_key, read_private_key
-from asyncssh.public_key import SSHLocalKeyPair
+from asyncssh.public_key import SSHLocalKeyPair, decode_ssh_public_key
+
+from .util import asynctest, ConnectionStub, AsyncTestCase
 
 # Short variable names are used here, matching names in the specs
 # pylint: disable=invalid-name
@@ -136,8 +137,7 @@ class _KexServerStub(_KexConnectionStub):
     def __init__(self, alg, peer):
         super().__init__(alg, peer, True)
 
-        run('openssl genrsa -out priv 2048')
-        priv_key = read_private_key('priv')
+        priv_key = asyncssh.generate_private_key('ssh-rsa')
         self._server_host_key = SSHLocalKeyPair(priv_key)
 
     def get_server_host_key(self):
diff --git a/tests/test_process.py b/tests/test_process.py
index 269f3d1..05f6674 100644
--- a/tests/test_process.py
+++ b/tests/test_process.py
@@ -782,7 +782,7 @@ class _TestProcess(ServerTestCase):
 
         data = '\u2000test\u2000'
 
-        with open('stdin', 'w') as file:
+        with open('stdin', 'w', encoding='utf-8') as file:
             file.write(data)
 
         with (yield from self.connect()) as conn:
diff --git a/tests/test_public_key.py b/tests/test_public_key.py
index 223483a..b1d932d 100644
--- a/tests/test_public_key.py
+++ b/tests/test_public_key.py
@@ -12,14 +12,17 @@
 
 """Unit tests for reading and writing public and private keys
 
-   Note: These tests assume that the openssl and ssh-keygen commands are
-         available on the system and in the user's path.
+   Note: These tests look for the openssl and ssh-keygen commands in
+         the user's path and will whenever possible use them to perform
+         interoperability tests. Otherwise, these tests will only test
+         AsyncSSH against itself.
 
 """
 
 import binascii
 from datetime import datetime
 import os
+import subprocess
 
 import asyncssh
 
@@ -41,6 +44,26 @@ _ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
 _ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
 _ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
 
+try:
+    _openssl_version = run('openssl version')
+except subprocess.CalledProcessError: # pragma: no cover
+    _openssl_version = b''
+
+_openssl_available = _openssl_version != b''
+
+# The openssl "-v2prf" option is only available in OpenSSL 1.0.2 or later
+_openssl_supports_v2prf = _openssl_version >= b'OpenSSL 1.0.2'
+
+try:
+    _openssh_version = run('ssh -V')
+except subprocess.CalledProcessError: # pragma: no cover
+    _openssh_version = b''
+
+_openssh_available = _openssh_version != b''
+
+# GCM & Chacha tests require OpenSSH 6.9 due to a bug in earlier versions:
+#     https://bugzilla.mindrot.org/show_bug.cgi?id=2366
+_openssh_supports_gcm_chacha = _openssh_version >= b'OpenSSH_6.9'
 
 # pylint: disable=bad-whitespace
 
@@ -50,54 +73,52 @@ pkcs1_ciphers = (('aes128-cbc', '-aes128'),
                  ('des-cbc',    '-des'),
                  ('des3-cbc',   '-des3'))
 
-pkcs8_ciphers = (('des-cbc',      'md5',    1, '-v1 PBE-MD5-DES'),
-                 ('des-cbc',      'sha1',   1, '-v1 PBE-SHA1-DES'),
-                 ('des2-cbc',     'sha1',   1, '-v1 PBE-SHA1-2DES'),
-                 ('des3-cbc',     'sha1',   1, '-v1 PBE-SHA1-3DES'),
-                 ('rc4-40',       'sha1',   1, '-v1 PBE-SHA1-RC4-40'),
-                 ('rc4-128',      'sha1',   1, '-v1 PBE-SHA1-RC4-128'),
-                 ('aes128-cbc',   'sha1',   2, '-v2 aes-128-cbc'),
-                 ('aes192-cbc',   'sha1',   2, '-v2 aes-192-cbc'),
-                 ('aes256-cbc',   'sha1',   2, '-v2 aes-256-cbc'),
-                 ('blowfish-cbc', 'sha1',   2, '-v2 bf-cbc'),
-                 ('cast128-cbc',  'sha1',   2, '-v2 cast-cbc'),
-                 ('des-cbc',      'sha1',   2, '-v2 des-cbc'),
-                 ('des3-cbc',     'sha1',   2, '-v2 des-ede3-cbc'))
-
-openssh_ciphers = ('aes128-cbc', 'aes192-cbc', 'aes256-cbc',
-                   'aes128-ctr', 'aes192-ctr', 'aes256-ctr',
-                   'arcfour', 'arcfour128', 'arcfour256',
-                   'blowfish-cbc', 'cast128-cbc', '3des-cbc')
+pkcs8_ciphers = (
+    ('aes128-cbc',   'sha224', 2, '-v2 aes-128-cbc '
+     '-v2prf hmacWithSHA224', _openssl_supports_v2prf),
+    ('aes128-cbc',   'sha256', 2, '-v2 aes-128-cbc '
+     '-v2prf hmacWithSHA256', _openssl_supports_v2prf),
+    ('aes128-cbc',   'sha384', 2, '-v2 aes-128-cbc '
+     '-v2prf hmacWithSHA384', _openssl_supports_v2prf),
+    ('aes128-cbc',   'sha512', 2, '-v2 aes-128-cbc '
+     '-v2prf hmacWithSHA512', _openssl_supports_v2prf),
+    ('des-cbc',      'md5',    1, '-v1 PBE-MD5-DES',       _openssl_available),
+    ('des-cbc',      'sha1',   1, '-v1 PBE-SHA1-DES',      _openssl_available),
+    ('des2-cbc',     'sha1',   1, '-v1 PBE-SHA1-2DES',     _openssl_available),
+    ('des3-cbc',     'sha1',   1, '-v1 PBE-SHA1-3DES',     _openssl_available),
+    ('rc4-40',       'sha1',   1, '-v1 PBE-SHA1-RC4-40',   _openssl_available),
+    ('rc4-128',      'sha1',   1, '-v1 PBE-SHA1-RC4-128',  _openssl_available),
+    ('aes128-cbc',   'sha1',   2, '-v2 aes-128-cbc',       _openssl_available),
+    ('aes192-cbc',   'sha1',   2, '-v2 aes-192-cbc',       _openssl_available),
+    ('aes256-cbc',   'sha1',   2, '-v2 aes-256-cbc',       _openssl_available),
+    ('blowfish-cbc', 'sha1',   2, '-v2 bf-cbc',            _openssl_available),
+    ('cast128-cbc',  'sha1',   2, '-v2 cast-cbc',          _openssl_available),
+    ('des-cbc',      'sha1',   2, '-v2 des-cbc',           _openssl_available),
+    ('des3-cbc',     'sha1',   2, '-v2 des-ede3-cbc',      _openssl_available))
+
+openssh_ciphers = (
+    ('aes128-gcm at openssh.com',  _openssh_supports_gcm_chacha),
+    ('aes256-gcm at openssh.com',  _openssh_supports_gcm_chacha),
+    ('aes128-cbc',              _openssh_available),
+    ('aes192-cbc',              _openssh_available),
+    ('aes256-cbc',              _openssh_available),
+    ('aes128-ctr',              _openssh_available),
+    ('aes192-ctr',              _openssh_available),
+    ('aes256-ctr',              _openssh_available),
+    ('arcfour',                 _openssh_available),
+    ('arcfour128',              _openssh_available),
+    ('arcfour256',              _openssh_available),
+    ('blowfish-cbc',            _openssh_available),
+    ('cast128-cbc',             _openssh_available),
+    ('3des-cbc',                _openssh_available)
+)
 
 # pylint: enable=bad-whitespace
 
-_openssl_version = run('openssl version')
-
-_pkcs1_public_supported = _openssl_version >= b'OpenSSL 1.0.0'
-
-if _openssl_version >= b'OpenSSL 1.0.2': # pragma: no branch
-    # pylint: disable=bad-whitespace
-
-    pkcs8_ciphers += (
-        ('aes128-cbc',   'sha224', 2, '-v2 aes-128-cbc '
-                                      '-v2prf hmacWithSHA224'),
-        ('aes128-cbc',   'sha256', 2, '-v2 aes-128-cbc '
-                                      '-v2prf hmacWithSHA256'),
-        ('aes128-cbc',   'sha384', 2, '-v2 aes-128-cbc '
-                                      '-v2prf hmacWithSHA384'),
-        ('aes128-cbc',   'sha512', 2, '-v2 aes-128-cbc '
-                                      '-v2prf hmacWithSHA512')
-    )
-
-if run('ssh -V') >= b'OpenSSH_6.9': # pragma: no branch
-    # GCM & Chacha tests require OpenSSH 6.9 due to a bug in earlier versions:
-    #     https://bugzilla.mindrot.org/show_bug.cgi?id=2366
-
-    openssh_ciphers += ('aes128-gcm at openssh.com', 'aes256-gcm at openssh.com')
-
-    # Only test Chacha if libnacl is installed
-    if libnacl_available: # pragma: no branch
-        openssh_ciphers += ('chacha20-poly1305 at openssh.com',)
+# Only test Chacha if libnacl is installed
+if libnacl_available: # pragma: no branch
+    openssh_ciphers += (('chacha20-poly1305 at openssh.com',
+                         _openssh_supports_gcm_chacha),)
 
 
 def select_passphrase(cipher, pbe_version=0):
@@ -201,12 +222,16 @@ class _TestPublicKey(TempDirTestCase):
     def import_pkcs1_private(self, fmt, cipher=None, args=None):
         """Check import of a PKCS#1 private key"""
 
-        if cipher:
-            run('openssl %s %s -in priv -inform pem -out new -outform %s '
-                '-passout pass:passphrase' % (self.keyclass, args, fmt))
-        else:
-            run('openssl %s -in priv -inform pem -out new -outform %s' %
-                (self.keyclass, fmt))
+        if _openssl_available: # pragma: no branch
+            if cipher:
+                run('openssl %s %s -in priv -inform pem -out new -outform %s '
+                    '-passout pass:passphrase' % (self.keyclass, args, fmt))
+            else:
+                run('openssl %s -in priv -inform pem -out new -outform %s' %
+                    (self.keyclass, fmt))
+        else: # pragma: no cover
+            self.privkey.write_private_key('new', 'pkcs1-%s' % fmt,
+                                           select_passphrase(cipher), cipher)
 
         self.check_private(select_passphrase(cipher))
 
@@ -216,21 +241,29 @@ class _TestPublicKey(TempDirTestCase):
         self.privkey.write_private_key('privout', 'pkcs1-%s' % fmt,
                                        select_passphrase(cipher), cipher)
 
-        if cipher:
-            run('openssl %s -in privout -inform %s -out new -outform pem '
-                '-passin pass:passphrase' % (self.keyclass, fmt))
-        else:
-            run('openssl %s -in privout -inform %s -out new -outform pem' %
-                (self.keyclass, fmt))
+        if _openssl_available: # pragma: no branch
+            if cipher:
+                run('openssl %s -in privout -inform %s -out new -outform pem '
+                    '-passin pass:passphrase' % (self.keyclass, fmt))
+            else:
+                run('openssl %s -in privout -inform %s -out new -outform pem' %
+                    (self.keyclass, fmt))
+        else: # pragma: no cover
+            priv = asyncssh.read_private_key('privout',
+                                             select_passphrase(cipher))
+            priv.write_private_key('new', 'pkcs1-%s' % fmt)
 
         self.check_private()
 
     def import_pkcs1_public(self, fmt):
         """Check import of a PKCS#1 public key"""
 
-        if self.keyclass == 'dsa':
-            # OpenSSL no longer has support for PKCS#1 DSA, so we can
-            # only test against ourselves.
+        if (not _openssl_available or self.keyclass == 'dsa' or
+                _openssl_version < b'OpenSSL 1.0.0'): # pragma: no cover
+            # OpenSSL no longer has support for PKCS#1 DSA, and PKCS#1
+            # RSA is not supported before OpenSSL 1.0.0, so we only test
+            # against ourselves in these cases.
+
             self.pubkey.write_public_key('new', 'pkcs1-%s' % fmt)
         else:
             run('openssl %s -pubin -in pub -inform pem -RSAPublicKey_out '
@@ -243,52 +276,68 @@ class _TestPublicKey(TempDirTestCase):
 
         self.privkey.write_public_key('pubout', 'pkcs1-%s' % fmt)
 
-        if self.keyclass == 'dsa':
+        if not _openssl_available or self.keyclass == 'dsa': # pragma: no cover
             # OpenSSL no longer has support for PKCS#1 DSA, so we can
             # only test against ourselves.
-            asyncssh.read_public_key('pubout').write_public_key(
-                'new', 'pkcs1-%s' % fmt)
+
+            pub = asyncssh.read_public_key('pubout')
+            pub.write_public_key('new', 'pkcs1-%s' % fmt)
         else:
             run('openssl %s -RSAPublicKey_in -in pubout -inform %s -out new '
                 '-outform pem' % (self.keyclass, fmt))
 
         self.check_public()
 
-    def import_pkcs8_private(self, fmt, cipher=None, pbe_version=None,
-                             args=None):
+    def import_pkcs8_private(self, fmt, use_openssl, cipher=None,
+                             hash_alg=None, pbe_version=None, args=None):
         """Check import of a PKCS#8 private key"""
 
-        if cipher:
-            run('openssl pkcs8 -topk8 %s -in priv -inform pem -out new '
-                '-outform %s -passout pass:passphrase' % (args, fmt))
-        else:
-            run('openssl pkcs8 -topk8 -nocrypt -in priv -inform pem -out new '
-                '-outform %s' % fmt)
+        if use_openssl: # pragma: no branch
+            if cipher:
+                run('openssl pkcs8 -topk8 %s -in priv -inform pem -out new '
+                    '-outform %s -passout pass:passphrase' % (args, fmt))
+            else:
+                run('openssl pkcs8 -topk8 -nocrypt -in priv -inform pem '
+                    '-out new -outform %s' % fmt)
+        else: # pragma: no cover
+            self.privkey.write_private_key('new', 'pkcs8-%s' % fmt,
+                                           select_passphrase(cipher,
+                                                             pbe_version),
+                                           cipher, hash_alg, pbe_version)
 
         self.check_private(select_passphrase(cipher, pbe_version))
 
-    def export_pkcs8_private(self, fmt, cipher=None, hash_alg=None,
-                             pbe_version=None):
+    def export_pkcs8_private(self, fmt, use_openssl, cipher=None,
+                             hash_alg=None, pbe_version=None):
         """Check export of a PKCS#8 private key"""
 
         self.privkey.write_private_key('privout', 'pkcs8-%s' % fmt,
                                        select_passphrase(cipher, pbe_version),
                                        cipher, hash_alg, pbe_version)
 
-        if cipher:
-            run('openssl pkcs8 -in privout -inform %s -out new '
-                '-outform pem -passin pass:passphrase' % fmt)
-        else:
-            run('openssl pkcs8 -nocrypt -in privout -inform %s -out new '
-                '-outform pem' % fmt)
+        if use_openssl: # pragma: no branch
+            if cipher:
+                run('openssl pkcs8 -in privout -inform %s -out new '
+                    '-outform pem -passin pass:passphrase' % fmt)
+            else:
+                run('openssl pkcs8 -nocrypt -in privout -inform %s -out new '
+                    '-outform pem' % fmt)
+        else: # pragma: no cover
+            priv = asyncssh.read_private_key('privout',
+                                             select_passphrase(cipher,
+                                                               pbe_version))
+            priv.write_private_key('new', 'pkcs8-%s' % fmt)
 
         self.check_private()
 
     def import_pkcs8_public(self, fmt):
         """Check import of a PKCS#8 public key"""
 
-        run('openssl %s -pubin -in pub -inform pem -out new -outform %s' %
-            (self.keyclass, fmt))
+        if _openssl_available: # pragma: no branch
+            run('openssl %s -pubin -in pub -inform pem -out new -outform %s' %
+                (self.keyclass, fmt))
+        else: # pragma: no cover
+            self.pubkey.write_public_key('new', 'pkcs8-%s' % fmt)
 
         self.check_public()
 
@@ -297,35 +346,47 @@ class _TestPublicKey(TempDirTestCase):
 
         self.privkey.write_public_key('pubout', 'pkcs8-%s' % fmt)
 
-        run('openssl %s -pubin -in pubout -inform %s -out new -outform pem' %
-            (self.keyclass, fmt))
+        if _openssl_available: # pragma: no branch
+            run('openssl %s -pubin -in pubout -inform %s -out new '
+                '-outform pem' % (self.keyclass, fmt))
+        else: # pragma: no cover
+            pub = asyncssh.read_public_key('pubout')
+            pub.write_public_key('new', 'pkcs8-%s' % fmt)
 
         self.check_public()
 
-    def import_openssh_private(self, cipher=None):
+    def import_openssh_private(self, use_openssh, cipher=None):
         """Check import of an OpenSSH private key"""
 
-        run('cp -p priv new')
+        if use_openssh: # pragma: no branch
+            run('cp -p priv new')
 
-        if cipher:
-            run('ssh-keygen -p -N passphrase -Z %s -o -f new' % cipher)
-        else:
-            run('ssh-keygen -p -N "" -o -f new')
+            if cipher:
+                run('ssh-keygen -p -N passphrase -Z %s -o -f new' % cipher)
+            else:
+                run('ssh-keygen -p -N "" -o -f new')
+        else: # pragma: no cover
+            self.privkey.write_private_key('new', 'openssh',
+                                           select_passphrase(cipher), cipher)
 
         self.check_private(select_passphrase(cipher))
 
-    def export_openssh_private(self, cipher=None):
+    def export_openssh_private(self, use_openssh, cipher=None):
         """Check export of an OpenSSH private key"""
 
         self.privkey.write_private_key('new', 'openssh',
                                        select_passphrase(cipher), cipher)
 
-        run('chmod 600 new')
+        if use_openssh: # pragma: no branch
+            run('chmod 600 new')
 
-        if cipher:
-            run('ssh-keygen -p -P passphrase -N "" -o -f new')
-        else:
-            run('ssh-keygen -p -N "" -o -f new')
+            if cipher:
+                run('ssh-keygen -p -P passphrase -N "" -o -f new')
+            else:
+                run('ssh-keygen -p -N "" -o -f new')
+        else: # pragma: no cover
+            priv = asyncssh.read_private_key('new', select_passphrase(cipher))
+            priv.write_private_key('new', 'openssh')
 
         self.check_private()
 
@@ -341,7 +402,11 @@ class _TestPublicKey(TempDirTestCase):
 
         self.privkey.write_public_key('pubout', 'openssh')
 
-        run('ssh-keygen -e -f pubout -m rfc4716 > new')
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -e -f pubout -m rfc4716 > new')
+        else: # pragma: no cover
+            pub = asyncssh.read_public_key('pubout')
+            pub.write_public_key('new', 'rfc4716')
 
         self.check_public()
 
@@ -357,30 +422,49 @@ class _TestPublicKey(TempDirTestCase):
 
         cert.write_certificate('certout', 'openssh')
 
-        run('ssh-keygen -e -f certout -m rfc4716 > cert')
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -e -f certout -m rfc4716 > cert')
+        else: # pragma: no cover
+            cert = asyncssh.read_certificate('certout')
+            cert.write_certificate('cert', 'rfc4716')
 
         self.check_certificate(cert_type)
 
     def import_rfc4716_public(self):
         """Check import of an RFC4716 public key"""
 
-        run('ssh-keygen -e -f sshpub -m rfc4716 > new')
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -e -f sshpub -m rfc4716 > new')
+        else: # pragma: no cover
+            self.pubkey.write_public_key('new', 'rfc4716')
 
         self.check_public()
 
     def export_rfc4716_public(self):
         """Check export of an RFC4716 public key"""
 
-        self.privkey.write_public_key('pubout', 'rfc4716')
+        self.pubkey.write_public_key('pubout', 'rfc4716')
 
-        run('ssh-keygen -i -f pubout -m rfc4716 > new')
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -i -f pubout -m rfc4716 > new')
+        else: # pragma: no cover
+            pub = asyncssh.read_public_key('pubout')
+            pub.write_public_key('new', 'openssh')
 
         self.check_public()
 
     def import_rfc4716_certificate(self, cert_type, cert):
         """Check import of an RFC4716 certificate"""
 
-        run('ssh-keygen -e -f %s -m rfc4716 > cert' % cert)
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -e -f %s -m rfc4716 > cert' % cert)
+        else: # pragma: no cover
+            if cert_type == CERT_TYPE_USER:
+                cert = self.usercert
+            else:
+                cert = self.hostcert
+
+            cert.write_certificate('cert', 'rfc4716')
 
         self.check_certificate(cert_type)
 
@@ -389,7 +473,11 @@ class _TestPublicKey(TempDirTestCase):
 
         cert.write_certificate('certout', 'rfc4716')
 
-        run('ssh-keygen -i -f certout -m rfc4716 > cert')
+        if _openssh_available: # pragma: no branch
+            run('ssh-keygen -i -f certout -m rfc4716 > cert')
+        else: # pragma: no cover
+            cert = asyncssh.read_certificate('certout')
+            cert.write_certificate('cert', 'openssh')
 
         self.check_certificate(cert_type)
 
@@ -902,33 +990,37 @@ class _TestPublicKey(TempDirTestCase):
         """Check PKCS#8 private key format"""
 
         with self.subTest('Import PKCS#8 PEM private'):
-            self.import_pkcs8_private('pem')
+            self.import_pkcs8_private('pem', _openssl_available)
 
         with self.subTest('Export PKCS#8 PEM private'):
-            self.export_pkcs8_private('pem')
+            self.export_pkcs8_private('pem', _openssl_available)
 
         with self.subTest('Import PKCS#8 DER private'):
-            self.import_pkcs8_private('der')
+            self.import_pkcs8_private('der', _openssl_available)
 
         with self.subTest('Export PKCS#8 DER private'):
-            self.export_pkcs8_private('der')
+            self.export_pkcs8_private('der', _openssl_available)
 
-        for cipher, hash_alg, pbe_version, args in pkcs8_ciphers:
+        for cipher, hash_alg, pbe_version, args, use_openssl in pkcs8_ciphers:
             with self.subTest('Import PKCS#8 PEM private (%s-%s-v%s)' %
                               (cipher, hash_alg, pbe_version)):
-                self.import_pkcs8_private('pem', cipher, pbe_version, args)
+                self.import_pkcs8_private('pem', use_openssl, cipher,
+                                          hash_alg, pbe_version, args)
 
             with self.subTest('Export PKCS#8 PEM private (%s-%s-v%s)' %
                               (cipher, hash_alg, pbe_version)):
-                self.export_pkcs8_private('pem', cipher, hash_alg, pbe_version)
+                self.export_pkcs8_private('pem', use_openssl, cipher,
+                                          hash_alg, pbe_version)
 
             with self.subTest('Import PKCS#8 DER private (%s-%s-v%s)' %
                               (cipher, hash_alg, pbe_version)):
-                self.import_pkcs8_private('der', cipher, pbe_version, args)
+                self.import_pkcs8_private('der', use_openssl, cipher,
+                                          hash_alg, pbe_version, args)
 
             with self.subTest('Export PKCS#8 DER private (%s-%s-v%s)' %
                               (cipher, hash_alg, pbe_version)):
-                self.export_pkcs8_private('der', cipher, hash_alg, pbe_version)
+                self.export_pkcs8_private('der', use_openssl, cipher,
+                                          hash_alg, pbe_version)
 
     def check_pkcs8_public(self):
         """Check PKCS#8 public key format"""
@@ -949,18 +1041,18 @@ class _TestPublicKey(TempDirTestCase):
         """Check OpenSSH private key format"""
 
         with self.subTest('Import OpenSSH private'):
-            self.import_openssh_private()
+            self.import_openssh_private(_openssh_available)
 
         with self.subTest('Export OpenSSH private'):
-            self.export_openssh_private()
+            self.export_openssh_private(_openssh_available)
 
         if bcrypt_available: # pragma: no branch
-            for cipher in openssh_ciphers:
+            for cipher, use_openssh in openssh_ciphers:
                 with self.subTest('Import OpenSSH private (%s)' % cipher):
-                    self.import_openssh_private(cipher)
+                    self.import_openssh_private(use_openssh, cipher)
 
                 with self.subTest('Export OpenSSH private (%s)' % cipher):
-                    self.export_openssh_private(cipher)
+                    self.export_openssh_private(use_openssh, cipher)
 
     def check_openssh_public(self):
         """Check OpenSSH public key format"""
@@ -1204,7 +1296,7 @@ class _TestPublicKey(TempDirTestCase):
                 if 'pkcs1' in self.private_formats:
                     self.check_pkcs1_private()
 
-                if 'pkcs1' in self.public_formats and _pkcs1_public_supported:
+                if 'pkcs1' in self.public_formats:
                     self.check_pkcs1_public()
 
                 if 'pkcs8' in self.private_formats:
@@ -1306,18 +1398,19 @@ class _TestPublicKeyTopLevel(TempDirTestCase):
     def test_ec_explicit(self):
         """Test EC certificate with explcit parameters"""
 
-        for curve in ('secp256r1', 'secp384r1', 'secp521r1'):
-            with self.subTest('Import EC key with explicit parameters',
-                              curve=curve):
-                run('openssl ecparam -out priv -noout -genkey -name %s '
-                    '-param_enc explicit' % curve)
-                asyncssh.read_private_key('priv')
-
-        with self.subTest('Import EC key with unknown explicit parameters'):
-            run('openssl ecparam -out priv -noout -genkey -name secp112r1 '
-                '-param_enc explicit')
-            with self.assertRaises(asyncssh.KeyImportError):
-                asyncssh.read_private_key('priv')
+        if _openssl_available: # pragma: no branch
+            for curve in ('secp256r1', 'secp384r1', 'secp521r1'):
+                with self.subTest('Import EC key with explicit parameters',
+                                  curve=curve):
+                    run('openssl ecparam -out priv -noout -genkey -name %s '
+                        '-param_enc explicit' % curve)
+                    asyncssh.read_private_key('priv')
+
+            with self.subTest('Import EC key with unknown explicit parameters'):
+                run('openssl ecparam -out priv -noout -genkey -name secp112r1 '
+                    '-param_enc explicit')
+                with self.assertRaises(asyncssh.KeyImportError):
+                    asyncssh.read_private_key('priv')
 
     def test_generate_errors(self):
         """Test errors in private key and certificate generation"""

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asyncssh.git



More information about the Python-modules-commits mailing list