[Python-modules-commits] [python-asyncssh] 01/06: Import python-asyncssh_1.7.3.orig.tar.gz
Vincent Bernat
bernat at moszumanska.debian.org
Sun Dec 25 09:32:41 UTC 2016
This is an automated email from the git hooks/post-receive script.
bernat pushed a commit to branch master
in repository python-asyncssh.
commit 46e25dbe90822ebc8bc2aef1fd21acb0ccce11bd
Author: Vincent Bernat <bernat at debian.org>
Date: Sun Dec 25 10:25:46 2016 +0100
Import python-asyncssh_1.7.3.orig.tar.gz
---
.travis.yml | 18 +--
asyncssh/version.py | 2 +-
docs/changes.rst | 13 ++
docs/index.rst | 12 +-
tests/server.py | 29 ++--
tests/test_agent.py | 9 +-
tests/test_channel.py | 3 +
tests/test_connection_auth.py | 24 ++-
tests/test_kex.py | 10 +-
tests/test_process.py | 2 +-
tests/test_public_key.py | 355 ++++++++++++++++++++++++++----------------
11 files changed, 310 insertions(+), 167 deletions(-)
diff --git a/.travis.yml b/.travis.yml
index e4227f7..d471d7d 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -5,7 +5,7 @@ install:
matrix:
allow_failures:
- - python: nightly
+ - python: 3.6-dev
fast_finish: true
include:
- os: linux
@@ -27,40 +27,38 @@ matrix:
- os: linux
sudo: required
dist: trusty
- python: nightly
+ python: 3.6-dev
before_install:
- .travis/build-libsodium.sh
env:
- TOXENV=py36
- os: osx
- osx_image: xcode7.3
+ osx_image: xcode8.1
language: generic
env:
- CPPFLAGS=-I/usr/local/opt/openssl/include
- LDFLAGS=-L/usr/local/opt/openssl/lib -L/usr/local/opt/libffi/lib
- - PATH=$HOME/.pyenv/bin:$PATH
+ - PATH=$HOME/.pyenv/bin:/usr/local/opt/openssl/bin:$PATH
- TOXENV=py34
before_install:
- brew update
- - brew install libffi libsodium openssl pyenv pyenv-virtualenv
+ - brew install libffi libsodium
- eval "$(pyenv init -)"
- - eval "$(pyenv virtualenv-init -)"
- pyenv install 3.4.5
- pyenv local 3.4.5
- pyenv rehash
- os: osx
- osx_image: xcode7.3
+ osx_image: xcode8.1
language: generic
env:
- CPPFLAGS=-I/usr/local/opt/openssl/include
- LDFLAGS=-L/usr/local/opt/openssl/lib -L/usr/local/opt/libffi/lib
- - PATH=$HOME/.pyenv/bin:$PATH
+ - PATH=$HOME/.pyenv/bin:/usr/local/opt/openssl/bin:$PATH
- TOXENV=py35
before_install:
- brew update
- - brew install libffi libsodium openssl pyenv pyenv-virtualenv
+ - brew install libffi libsodium
- eval "$(pyenv init -)"
- - eval "$(pyenv virtualenv-init -)"
- pyenv install 3.5.2
- pyenv local 3.5.2
- pyenv rehash
diff --git a/asyncssh/version.py b/asyncssh/version.py
index dcb77b0..e46454b 100644
--- a/asyncssh/version.py
+++ b/asyncssh/version.py
@@ -18,4 +18,4 @@ __author_email__ = 'ronf at timeheart.net'
__url__ = 'http://asyncssh.timeheart.net'
-__version__ = '1.7.2'
+__version__ = '1.7.3'
diff --git a/docs/changes.rst b/docs/changes.rst
index fffa15f..8593784 100644
--- a/docs/changes.rst
+++ b/docs/changes.rst
@@ -3,6 +3,19 @@
Change Log
==========
+Release 1.7.3 (22 Nov 2016)
+---------------------------
+
+* Updated unit tests to run properly in environments where OpenSSH
+ and OpenSSL are not installed.
+
+* Updated a process unit test to not depend on the system's default
+ file encoding being UTF-8.
+
+* Updated Mac TravisCI builds to use Xcode 8.1.
+
+* Cleaned up some wording in the documentation.
+
Release 1.7.2 (28 Oct 2016)
---------------------------
diff --git a/docs/index.rst b/docs/index.rst
index 30aa010..bd285fa 100644
--- a/docs/index.rst
+++ b/docs/index.rst
@@ -33,8 +33,8 @@ This example only uses the output on stdout, but output on stderr is also
collected as another attribute in the returned :class:`SSHCompletedProcess`
object.
-To check against a different set of server host keys, they can be read
-and provided in the known_hosts argument when the connection is opened:
+To check against a different set of server host keys, they can be provided
+in the known_hosts argument when the connection is opened:
.. code::
@@ -53,7 +53,7 @@ provided:
async with asyncssh.connect('localhost', username='user123') as conn:
To use a different set of client keys for authentication, they can be
-read and provided in the client_keys argument:
+provided in the client_keys argument:
.. code::
@@ -132,9 +132,9 @@ and the output written to '/tmp/stdout' should contain the reversed lines
The ``stdin``, ``stdout``, and ``stderr`` arguments support redirecting
to a variety of locations include local files, pipes, and sockets as
-well as an :class:`SSHReader` or :class:`SSHWriter` objects associated
-with other remote SSH processes. Here's an example of piping stdout from
-a local process to a remote process:
+well as :class:`SSHReader` or :class:`SSHWriter` objects associated with
+other remote SSH processes. Here's an example of piping stdout from a
+local process to a remote process:
.. include:: ../examples/redirect_local_pipe.py
:literal:
diff --git a/tests/server.py b/tests/server.py
index 8c198b7..f657d17 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -15,6 +15,7 @@
import asyncio
import os
import signal
+import subprocess
import asyncssh
from asyncssh.misc import async_context_manager
@@ -131,17 +132,21 @@ class ServerTestCase(AsyncTestCase):
cls._server_port))
run('cat skey.pub >> .ssh/known_hosts')
- output = run('ssh-agent -a agent 2>/dev/null')
- cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
+ os.environ['LOGNAME'] = 'guest'
+ os.environ['HOME'] = '.'
- os.environ['SSH_AUTH_SOCK'] = 'agent'
+ try:
+ output = run('ssh-agent -a agent 2>/dev/null')
+ except subprocess.CalledProcessError: # pragma: no cover
+ cls._agent_pid = None
+ else:
+ cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
- agent = yield from asyncssh.connect_agent()
- yield from agent.add_keys([ckey_dsa, (ckey, ckey_cert)])
- agent.close()
+ os.environ['SSH_AUTH_SOCK'] = 'agent'
- os.environ['LOGNAME'] = 'guest'
- os.environ['HOME'] = '.'
+ agent = yield from asyncssh.connect_agent()
+ yield from agent.add_keys([ckey_dsa, (ckey, ckey_cert)])
+ agent.close()
@classmethod
@asyncio.coroutine
@@ -154,10 +159,16 @@ class ServerTestCase(AsyncTestCase):
cls._server.close()
yield from cls._server.wait_closed()
- os.kill(cls._agent_pid, signal.SIGTERM)
+ if cls._agent_pid: # pragma: no branch
+ os.kill(cls._agent_pid, signal.SIGTERM)
# pylint: enable=invalid-name
+ def agent_available(self):
+ """Return whether SSH agent is available"""
+
+ return bool(self._agent_pid)
+
@asyncio.coroutine
def create_connection(self, client_factory, loop=(), **kwargs):
"""Create a connection to the test server"""
diff --git a/tests/test_agent.py b/tests/test_agent.py
index eb06911..966d839 100644
--- a/tests/test_agent.py
+++ b/tests/test_agent.py
@@ -16,6 +16,8 @@ import asyncio
import functools
import os
import signal
+import subprocess
+import unittest
import asyncssh
@@ -105,7 +107,12 @@ class _TestAPI(AsyncTestCase):
os.environ['DISPLAY'] = ''
os.environ['HOME'] = '.'
os.environ['SSH_ASKPASS'] = os.path.join(os.getcwd(), 'ssh-askpass')
- output = run('ssh-agent -a agent 2>/dev/null')
+
+ try:
+ output = run('ssh-agent -a agent 2>/dev/null')
+ except subprocess.CalledProcessError: # pragma: no cover
+ raise unittest.SkipTest('ssh-agent not available')
+
cls._agent_pid = int(output.splitlines()[2].split()[3][:-1])
os.environ['SSH_AUTH_SOCK'] = 'agent'
diff --git a/tests/test_channel.py b/tests/test_channel.py
index 487464a..46d0b3b 100644
--- a/tests/test_channel.py
+++ b/tests/test_channel.py
@@ -734,6 +734,9 @@ class _TestChannel(ServerTestCase):
def test_agent_forwarding(self):
"""Test SSH agent forwarding"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
with (yield from self.connect(username='ckey',
agent_forwarding=True)) as conn:
chan, session = yield from _create_session(conn, 'agent')
diff --git a/tests/test_connection_auth.py b/tests/test_connection_auth.py
index ab93faf..46e51e4 100644
--- a/tests/test_connection_auth.py
+++ b/tests/test_connection_auth.py
@@ -320,6 +320,9 @@ class _TestPublicKeyAuth(ServerTestCase):
def test_agent_auth(self):
"""Test connecting with ssh-agent authentication"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
with (yield from self.connect(username='ckey')) as conn:
pass
@@ -329,6 +332,9 @@ class _TestPublicKeyAuth(ServerTestCase):
def test_agent_signature_algs(self):
"""Test ssh-agent keys with specific signature algorithms"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
for alg in ('ssh-rsa', 'rsa-sha2-256', 'rsa-sha2-512'):
with (yield from self.connect(username='ckey',
signature_algs=[alg])) as conn:
@@ -340,6 +346,9 @@ class _TestPublicKeyAuth(ServerTestCase):
def test_agent_auth_failure(self):
"""Test failure connecting with ssh-agent authentication"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
with patch.dict(os.environ, HOME='xxx'):
with self.assertRaises(asyncssh.DisconnectError):
yield from self.connect(username='ckey', agent_path='xxx',
@@ -388,7 +397,8 @@ class _TestPublicKeyAuth(ServerTestCase):
with patch('asyncssh.connection.SSHConnection._get_ext_info_kex_alg',
skip_ext_info):
- with (yield from self.connect(username='ckey')) as conn:
+ with (yield from self.connect(username='ckey', client_keys='ckey',
+ agent_path=None)) as conn:
pass
yield from conn.wait_closed()
@@ -444,6 +454,9 @@ class _TestPublicKeyAuth(ServerTestCase):
def test_client_key_agent_keypairs(self):
"""Test client keys passed in as a list of SSHAgentKeyPairs"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
agent = yield from asyncssh.connect_agent()
for key in (yield from agent.get_keys()):
@@ -545,6 +558,9 @@ class _TestPublicKeyAuth(ServerTestCase):
def test_callback_sshkeypair(self):
"""Test client key passed in as an SSHKeyPair by callback"""
+ if not self.agent_available(): # pragma: no cover
+ self.skipTest('ssh-agent not available')
+
agent = yield from asyncssh.connect_agent()
keylist = yield from agent.get_keys()
@@ -568,7 +584,8 @@ class _TestPublicKeyAuth(ServerTestCase):
with patch('asyncssh.connection.SSHClientConnection',
_UnknownAuthClientConnection):
- with (yield from self.connect(username='ckey')) as conn:
+ with (yield from self.connect(username='ckey', client_keys='ckey',
+ agent_path=None)) as conn:
pass
yield from conn.wait_closed()
@@ -938,7 +955,8 @@ class _TestLoginTimeoutDisabled(ServerTestCase):
def test_login_timeout_disabled(self):
"""Test with login timeout disabled"""
- with (yield from self.connect()) as conn:
+ with (yield from self.connect(username='ckey',
+ client_keys='ckey')) as conn:
pass
yield from conn.wait_closed()
diff --git a/tests/test_kex.py b/tests/test_kex.py
index 9311eca..985670e 100644
--- a/tests/test_kex.py
+++ b/tests/test_kex.py
@@ -16,7 +16,7 @@ import asyncio
from hashlib import sha1
-from .util import asynctest, run, ConnectionStub, AsyncTestCase
+import asyncssh
from asyncssh.dh import MSG_KEXDH_INIT, MSG_KEXDH_REPLY
from asyncssh.dh import _KexDHGex, MSG_KEX_DH_GEX_GROUP
@@ -25,8 +25,9 @@ from asyncssh.ecdh import MSG_KEX_ECDH_INIT, MSG_KEX_ECDH_REPLY
from asyncssh.kex import register_kex_alg, get_kex_algs, get_kex
from asyncssh.misc import DisconnectError
from asyncssh.packet import SSHPacket, Byte, MPInt, String
-from asyncssh.public_key import decode_ssh_public_key, read_private_key
-from asyncssh.public_key import SSHLocalKeyPair
+from asyncssh.public_key import SSHLocalKeyPair, decode_ssh_public_key
+
+from .util import asynctest, ConnectionStub, AsyncTestCase
# Short variable names are used here, matching names in the specs
# pylint: disable=invalid-name
@@ -136,8 +137,7 @@ class _KexServerStub(_KexConnectionStub):
def __init__(self, alg, peer):
super().__init__(alg, peer, True)
- run('openssl genrsa -out priv 2048')
- priv_key = read_private_key('priv')
+ priv_key = asyncssh.generate_private_key('ssh-rsa')
self._server_host_key = SSHLocalKeyPair(priv_key)
def get_server_host_key(self):
diff --git a/tests/test_process.py b/tests/test_process.py
index 269f3d1..05f6674 100644
--- a/tests/test_process.py
+++ b/tests/test_process.py
@@ -782,7 +782,7 @@ class _TestProcess(ServerTestCase):
data = '\u2000test\u2000'
- with open('stdin', 'w') as file:
+ with open('stdin', 'w', encoding='utf-8') as file:
file.write(data)
with (yield from self.connect()) as conn:
diff --git a/tests/test_public_key.py b/tests/test_public_key.py
index 223483a..b1d932d 100644
--- a/tests/test_public_key.py
+++ b/tests/test_public_key.py
@@ -12,14 +12,17 @@
"""Unit tests for reading and writing public and private keys
- Note: These tests assume that the openssl and ssh-keygen commands are
- available on the system and in the user's path.
+ Note: These tests look for the openssl and ssh-keygen commands in
+ the user's path and will whenever possible use them to perform
+ interoperability tests. Otherwise, these tests will only test
+ AsyncSSH against itself.
"""
import binascii
from datetime import datetime
import os
+import subprocess
import asyncssh
@@ -41,6 +44,26 @@ _ES2_PBKDF2 = ObjectIdentifier('1.2.840.113549.1.5.12')
_ES2_AES128 = ObjectIdentifier('2.16.840.1.101.3.4.1.2')
_ES2_DES3 = ObjectIdentifier('1.2.840.113549.3.7')
+try:
+ _openssl_version = run('openssl version')
+except subprocess.CalledProcessError: # pragma: no cover
+ _openssl_version = b''
+
+_openssl_available = _openssl_version != b''
+
+# The openssl "-v2prf" option is only available in OpenSSL 1.0.2 or later
+_openssl_supports_v2prf = _openssl_version >= b'OpenSSL 1.0.2'
+
+try:
+ _openssh_version = run('ssh -V')
+except subprocess.CalledProcessError: # pragma: no cover
+ _openssh_version = b''
+
+_openssh_available = _openssh_version != b''
+
+# GCM & Chacha tests require OpenSSH 6.9 due to a bug in earlier versions:
+# https://bugzilla.mindrot.org/show_bug.cgi?id=2366
+_openssh_supports_gcm_chacha = _openssh_version >= b'OpenSSH_6.9'
# pylint: disable=bad-whitespace
@@ -50,54 +73,52 @@ pkcs1_ciphers = (('aes128-cbc', '-aes128'),
('des-cbc', '-des'),
('des3-cbc', '-des3'))
-pkcs8_ciphers = (('des-cbc', 'md5', 1, '-v1 PBE-MD5-DES'),
- ('des-cbc', 'sha1', 1, '-v1 PBE-SHA1-DES'),
- ('des2-cbc', 'sha1', 1, '-v1 PBE-SHA1-2DES'),
- ('des3-cbc', 'sha1', 1, '-v1 PBE-SHA1-3DES'),
- ('rc4-40', 'sha1', 1, '-v1 PBE-SHA1-RC4-40'),
- ('rc4-128', 'sha1', 1, '-v1 PBE-SHA1-RC4-128'),
- ('aes128-cbc', 'sha1', 2, '-v2 aes-128-cbc'),
- ('aes192-cbc', 'sha1', 2, '-v2 aes-192-cbc'),
- ('aes256-cbc', 'sha1', 2, '-v2 aes-256-cbc'),
- ('blowfish-cbc', 'sha1', 2, '-v2 bf-cbc'),
- ('cast128-cbc', 'sha1', 2, '-v2 cast-cbc'),
- ('des-cbc', 'sha1', 2, '-v2 des-cbc'),
- ('des3-cbc', 'sha1', 2, '-v2 des-ede3-cbc'))
-
-openssh_ciphers = ('aes128-cbc', 'aes192-cbc', 'aes256-cbc',
- 'aes128-ctr', 'aes192-ctr', 'aes256-ctr',
- 'arcfour', 'arcfour128', 'arcfour256',
- 'blowfish-cbc', 'cast128-cbc', '3des-cbc')
+pkcs8_ciphers = (
+ ('aes128-cbc', 'sha224', 2, '-v2 aes-128-cbc '
+ '-v2prf hmacWithSHA224', _openssl_supports_v2prf),
+ ('aes128-cbc', 'sha256', 2, '-v2 aes-128-cbc '
+ '-v2prf hmacWithSHA256', _openssl_supports_v2prf),
+ ('aes128-cbc', 'sha384', 2, '-v2 aes-128-cbc '
+ '-v2prf hmacWithSHA384', _openssl_supports_v2prf),
+ ('aes128-cbc', 'sha512', 2, '-v2 aes-128-cbc '
+ '-v2prf hmacWithSHA512', _openssl_supports_v2prf),
+ ('des-cbc', 'md5', 1, '-v1 PBE-MD5-DES', _openssl_available),
+ ('des-cbc', 'sha1', 1, '-v1 PBE-SHA1-DES', _openssl_available),
+ ('des2-cbc', 'sha1', 1, '-v1 PBE-SHA1-2DES', _openssl_available),
+ ('des3-cbc', 'sha1', 1, '-v1 PBE-SHA1-3DES', _openssl_available),
+ ('rc4-40', 'sha1', 1, '-v1 PBE-SHA1-RC4-40', _openssl_available),
+ ('rc4-128', 'sha1', 1, '-v1 PBE-SHA1-RC4-128', _openssl_available),
+ ('aes128-cbc', 'sha1', 2, '-v2 aes-128-cbc', _openssl_available),
+ ('aes192-cbc', 'sha1', 2, '-v2 aes-192-cbc', _openssl_available),
+ ('aes256-cbc', 'sha1', 2, '-v2 aes-256-cbc', _openssl_available),
+ ('blowfish-cbc', 'sha1', 2, '-v2 bf-cbc', _openssl_available),
+ ('cast128-cbc', 'sha1', 2, '-v2 cast-cbc', _openssl_available),
+ ('des-cbc', 'sha1', 2, '-v2 des-cbc', _openssl_available),
+ ('des3-cbc', 'sha1', 2, '-v2 des-ede3-cbc', _openssl_available))
+
+openssh_ciphers = (
+ ('aes128-gcm at openssh.com', _openssh_supports_gcm_chacha),
+ ('aes256-gcm at openssh.com', _openssh_supports_gcm_chacha),
+ ('aes128-cbc', _openssh_available),
+ ('aes192-cbc', _openssh_available),
+ ('aes256-cbc', _openssh_available),
+ ('aes128-ctr', _openssh_available),
+ ('aes192-ctr', _openssh_available),
+ ('aes256-ctr', _openssh_available),
+ ('arcfour', _openssh_available),
+ ('arcfour128', _openssh_available),
+ ('arcfour256', _openssh_available),
+ ('blowfish-cbc', _openssh_available),
+ ('cast128-cbc', _openssh_available),
+ ('3des-cbc', _openssh_available)
+)
# pylint: enable=bad-whitespace
-_openssl_version = run('openssl version')
-
-_pkcs1_public_supported = _openssl_version >= b'OpenSSL 1.0.0'
-
-if _openssl_version >= b'OpenSSL 1.0.2': # pragma: no branch
- # pylint: disable=bad-whitespace
-
- pkcs8_ciphers += (
- ('aes128-cbc', 'sha224', 2, '-v2 aes-128-cbc '
- '-v2prf hmacWithSHA224'),
- ('aes128-cbc', 'sha256', 2, '-v2 aes-128-cbc '
- '-v2prf hmacWithSHA256'),
- ('aes128-cbc', 'sha384', 2, '-v2 aes-128-cbc '
- '-v2prf hmacWithSHA384'),
- ('aes128-cbc', 'sha512', 2, '-v2 aes-128-cbc '
- '-v2prf hmacWithSHA512')
- )
-
-if run('ssh -V') >= b'OpenSSH_6.9': # pragma: no branch
- # GCM & Chacha tests require OpenSSH 6.9 due to a bug in earlier versions:
- # https://bugzilla.mindrot.org/show_bug.cgi?id=2366
-
- openssh_ciphers += ('aes128-gcm at openssh.com', 'aes256-gcm at openssh.com')
-
- # Only test Chacha if libnacl is installed
- if libnacl_available: # pragma: no branch
- openssh_ciphers += ('chacha20-poly1305 at openssh.com',)
+# Only test Chacha if libnacl is installed
+if libnacl_available: # pragma: no branch
+ openssh_ciphers += (('chacha20-poly1305 at openssh.com',
+ _openssh_supports_gcm_chacha),)
def select_passphrase(cipher, pbe_version=0):
@@ -201,12 +222,16 @@ class _TestPublicKey(TempDirTestCase):
def import_pkcs1_private(self, fmt, cipher=None, args=None):
"""Check import of a PKCS#1 private key"""
- if cipher:
- run('openssl %s %s -in priv -inform pem -out new -outform %s '
- '-passout pass:passphrase' % (self.keyclass, args, fmt))
- else:
- run('openssl %s -in priv -inform pem -out new -outform %s' %
- (self.keyclass, fmt))
+ if _openssl_available: # pragma: no branch
+ if cipher:
+ run('openssl %s %s -in priv -inform pem -out new -outform %s '
+ '-passout pass:passphrase' % (self.keyclass, args, fmt))
+ else:
+ run('openssl %s -in priv -inform pem -out new -outform %s' %
+ (self.keyclass, fmt))
+ else: # pragma: no cover
+ self.privkey.write_private_key('new', 'pkcs1-%s' % fmt,
+ select_passphrase(cipher), cipher)
self.check_private(select_passphrase(cipher))
@@ -216,21 +241,29 @@ class _TestPublicKey(TempDirTestCase):
self.privkey.write_private_key('privout', 'pkcs1-%s' % fmt,
select_passphrase(cipher), cipher)
- if cipher:
- run('openssl %s -in privout -inform %s -out new -outform pem '
- '-passin pass:passphrase' % (self.keyclass, fmt))
- else:
- run('openssl %s -in privout -inform %s -out new -outform pem' %
- (self.keyclass, fmt))
+ if _openssl_available: # pragma: no branch
+ if cipher:
+ run('openssl %s -in privout -inform %s -out new -outform pem '
+ '-passin pass:passphrase' % (self.keyclass, fmt))
+ else:
+ run('openssl %s -in privout -inform %s -out new -outform pem' %
+ (self.keyclass, fmt))
+ else: # pragma: no cover
+ priv = asyncssh.read_private_key('privout',
+ select_passphrase(cipher))
+ priv.write_private_key('new', 'pkcs1-%s' % fmt)
self.check_private()
def import_pkcs1_public(self, fmt):
"""Check import of a PKCS#1 public key"""
- if self.keyclass == 'dsa':
- # OpenSSL no longer has support for PKCS#1 DSA, so we can
- # only test against ourselves.
+ if (not _openssl_available or self.keyclass == 'dsa' or
+ _openssl_version < b'OpenSSL 1.0.0'): # pragma: no cover
+ # OpenSSL no longer has support for PKCS#1 DSA, and PKCS#1
+ # RSA is not supported before OpenSSL 1.0.0, so we only test
+ # against ourselves in these cases.
+
self.pubkey.write_public_key('new', 'pkcs1-%s' % fmt)
else:
run('openssl %s -pubin -in pub -inform pem -RSAPublicKey_out '
@@ -243,52 +276,68 @@ class _TestPublicKey(TempDirTestCase):
self.privkey.write_public_key('pubout', 'pkcs1-%s' % fmt)
- if self.keyclass == 'dsa':
+ if not _openssl_available or self.keyclass == 'dsa': # pragma: no cover
# OpenSSL no longer has support for PKCS#1 DSA, so we can
# only test against ourselves.
- asyncssh.read_public_key('pubout').write_public_key(
- 'new', 'pkcs1-%s' % fmt)
+
+ pub = asyncssh.read_public_key('pubout')
+ pub.write_public_key('new', 'pkcs1-%s' % fmt)
else:
run('openssl %s -RSAPublicKey_in -in pubout -inform %s -out new '
'-outform pem' % (self.keyclass, fmt))
self.check_public()
- def import_pkcs8_private(self, fmt, cipher=None, pbe_version=None,
- args=None):
+ def import_pkcs8_private(self, fmt, use_openssl, cipher=None,
+ hash_alg=None, pbe_version=None, args=None):
"""Check import of a PKCS#8 private key"""
- if cipher:
- run('openssl pkcs8 -topk8 %s -in priv -inform pem -out new '
- '-outform %s -passout pass:passphrase' % (args, fmt))
- else:
- run('openssl pkcs8 -topk8 -nocrypt -in priv -inform pem -out new '
- '-outform %s' % fmt)
+ if use_openssl: # pragma: no branch
+ if cipher:
+ run('openssl pkcs8 -topk8 %s -in priv -inform pem -out new '
+ '-outform %s -passout pass:passphrase' % (args, fmt))
+ else:
+ run('openssl pkcs8 -topk8 -nocrypt -in priv -inform pem '
+ '-out new -outform %s' % fmt)
+ else: # pragma: no cover
+ self.privkey.write_private_key('new', 'pkcs8-%s' % fmt,
+ select_passphrase(cipher,
+ pbe_version),
+ cipher, hash_alg, pbe_version)
self.check_private(select_passphrase(cipher, pbe_version))
- def export_pkcs8_private(self, fmt, cipher=None, hash_alg=None,
- pbe_version=None):
+ def export_pkcs8_private(self, fmt, use_openssl, cipher=None,
+ hash_alg=None, pbe_version=None):
"""Check export of a PKCS#8 private key"""
self.privkey.write_private_key('privout', 'pkcs8-%s' % fmt,
select_passphrase(cipher, pbe_version),
cipher, hash_alg, pbe_version)
- if cipher:
- run('openssl pkcs8 -in privout -inform %s -out new '
- '-outform pem -passin pass:passphrase' % fmt)
- else:
- run('openssl pkcs8 -nocrypt -in privout -inform %s -out new '
- '-outform pem' % fmt)
+ if use_openssl: # pragma: no branch
+ if cipher:
+ run('openssl pkcs8 -in privout -inform %s -out new '
+ '-outform pem -passin pass:passphrase' % fmt)
+ else:
+ run('openssl pkcs8 -nocrypt -in privout -inform %s -out new '
+ '-outform pem' % fmt)
+ else: # pragma: no cover
+ priv = asyncssh.read_private_key('privout',
+ select_passphrase(cipher,
+ pbe_version))
+ priv.write_private_key('new', 'pkcs8-%s' % fmt)
self.check_private()
def import_pkcs8_public(self, fmt):
"""Check import of a PKCS#8 public key"""
- run('openssl %s -pubin -in pub -inform pem -out new -outform %s' %
- (self.keyclass, fmt))
+ if _openssl_available: # pragma: no branch
+ run('openssl %s -pubin -in pub -inform pem -out new -outform %s' %
+ (self.keyclass, fmt))
+ else: # pragma: no cover
+ self.pubkey.write_public_key('new', 'pkcs8-%s' % fmt)
self.check_public()
@@ -297,35 +346,47 @@ class _TestPublicKey(TempDirTestCase):
self.privkey.write_public_key('pubout', 'pkcs8-%s' % fmt)
- run('openssl %s -pubin -in pubout -inform %s -out new -outform pem' %
- (self.keyclass, fmt))
+ if _openssl_available: # pragma: no branch
+ run('openssl %s -pubin -in pubout -inform %s -out new '
+ '-outform pem' % (self.keyclass, fmt))
+ else: # pragma: no cover
+ pub = asyncssh.read_public_key('pubout')
+ pub.write_public_key('new', 'pkcs8-%s' % fmt)
self.check_public()
- def import_openssh_private(self, cipher=None):
+ def import_openssh_private(self, use_openssh, cipher=None):
"""Check import of an OpenSSH private key"""
- run('cp -p priv new')
+ if use_openssh: # pragma: no branch
+ run('cp -p priv new')
- if cipher:
- run('ssh-keygen -p -N passphrase -Z %s -o -f new' % cipher)
- else:
- run('ssh-keygen -p -N "" -o -f new')
+ if cipher:
+ run('ssh-keygen -p -N passphrase -Z %s -o -f new' % cipher)
+ else:
+ run('ssh-keygen -p -N "" -o -f new')
+ else: # pragma: no cover
+ self.privkey.write_private_key('new', 'openssh',
+ select_passphrase(cipher), cipher)
self.check_private(select_passphrase(cipher))
- def export_openssh_private(self, cipher=None):
+ def export_openssh_private(self, use_openssh, cipher=None):
"""Check export of an OpenSSH private key"""
self.privkey.write_private_key('new', 'openssh',
select_passphrase(cipher), cipher)
- run('chmod 600 new')
+ if use_openssh: # pragma: no branch
+ run('chmod 600 new')
- if cipher:
- run('ssh-keygen -p -P passphrase -N "" -o -f new')
- else:
- run('ssh-keygen -p -N "" -o -f new')
+ if cipher:
+ run('ssh-keygen -p -P passphrase -N "" -o -f new')
+ else:
+ run('ssh-keygen -p -N "" -o -f new')
+ else: # pragma: no cover
+ priv = asyncssh.read_private_key('new', select_passphrase(cipher))
+ priv.write_private_key('new', 'openssh')
self.check_private()
@@ -341,7 +402,11 @@ class _TestPublicKey(TempDirTestCase):
self.privkey.write_public_key('pubout', 'openssh')
- run('ssh-keygen -e -f pubout -m rfc4716 > new')
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -e -f pubout -m rfc4716 > new')
+ else: # pragma: no cover
+ pub = asyncssh.read_public_key('pubout')
+ pub.write_public_key('new', 'rfc4716')
self.check_public()
@@ -357,30 +422,49 @@ class _TestPublicKey(TempDirTestCase):
cert.write_certificate('certout', 'openssh')
- run('ssh-keygen -e -f certout -m rfc4716 > cert')
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -e -f certout -m rfc4716 > cert')
+ else: # pragma: no cover
+ cert = asyncssh.read_certificate('certout')
+ cert.write_certificate('cert', 'rfc4716')
self.check_certificate(cert_type)
def import_rfc4716_public(self):
"""Check import of an RFC4716 public key"""
- run('ssh-keygen -e -f sshpub -m rfc4716 > new')
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -e -f sshpub -m rfc4716 > new')
+ else: # pragma: no cover
+ self.pubkey.write_public_key('new', 'rfc4716')
self.check_public()
def export_rfc4716_public(self):
"""Check export of an RFC4716 public key"""
- self.privkey.write_public_key('pubout', 'rfc4716')
+ self.pubkey.write_public_key('pubout', 'rfc4716')
- run('ssh-keygen -i -f pubout -m rfc4716 > new')
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -i -f pubout -m rfc4716 > new')
+ else: # pragma: no cover
+ pub = asyncssh.read_public_key('pubout')
+ pub.write_public_key('new', 'openssh')
self.check_public()
def import_rfc4716_certificate(self, cert_type, cert):
"""Check import of an RFC4716 certificate"""
- run('ssh-keygen -e -f %s -m rfc4716 > cert' % cert)
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -e -f %s -m rfc4716 > cert' % cert)
+ else: # pragma: no cover
+ if cert_type == CERT_TYPE_USER:
+ cert = self.usercert
+ else:
+ cert = self.hostcert
+
+ cert.write_certificate('cert', 'rfc4716')
self.check_certificate(cert_type)
@@ -389,7 +473,11 @@ class _TestPublicKey(TempDirTestCase):
cert.write_certificate('certout', 'rfc4716')
- run('ssh-keygen -i -f certout -m rfc4716 > cert')
+ if _openssh_available: # pragma: no branch
+ run('ssh-keygen -i -f certout -m rfc4716 > cert')
+ else: # pragma: no cover
+ cert = asyncssh.read_certificate('certout')
+ cert.write_certificate('cert', 'openssh')
self.check_certificate(cert_type)
@@ -902,33 +990,37 @@ class _TestPublicKey(TempDirTestCase):
"""Check PKCS#8 private key format"""
with self.subTest('Import PKCS#8 PEM private'):
- self.import_pkcs8_private('pem')
+ self.import_pkcs8_private('pem', _openssl_available)
with self.subTest('Export PKCS#8 PEM private'):
- self.export_pkcs8_private('pem')
+ self.export_pkcs8_private('pem', _openssl_available)
with self.subTest('Import PKCS#8 DER private'):
- self.import_pkcs8_private('der')
+ self.import_pkcs8_private('der', _openssl_available)
with self.subTest('Export PKCS#8 DER private'):
- self.export_pkcs8_private('der')
+ self.export_pkcs8_private('der', _openssl_available)
- for cipher, hash_alg, pbe_version, args in pkcs8_ciphers:
+ for cipher, hash_alg, pbe_version, args, use_openssl in pkcs8_ciphers:
with self.subTest('Import PKCS#8 PEM private (%s-%s-v%s)' %
(cipher, hash_alg, pbe_version)):
- self.import_pkcs8_private('pem', cipher, pbe_version, args)
+ self.import_pkcs8_private('pem', use_openssl, cipher,
+ hash_alg, pbe_version, args)
with self.subTest('Export PKCS#8 PEM private (%s-%s-v%s)' %
(cipher, hash_alg, pbe_version)):
- self.export_pkcs8_private('pem', cipher, hash_alg, pbe_version)
+ self.export_pkcs8_private('pem', use_openssl, cipher,
+ hash_alg, pbe_version)
with self.subTest('Import PKCS#8 DER private (%s-%s-v%s)' %
(cipher, hash_alg, pbe_version)):
- self.import_pkcs8_private('der', cipher, pbe_version, args)
+ self.import_pkcs8_private('der', use_openssl, cipher,
+ hash_alg, pbe_version, args)
with self.subTest('Export PKCS#8 DER private (%s-%s-v%s)' %
(cipher, hash_alg, pbe_version)):
- self.export_pkcs8_private('der', cipher, hash_alg, pbe_version)
+ self.export_pkcs8_private('der', use_openssl, cipher,
+ hash_alg, pbe_version)
def check_pkcs8_public(self):
"""Check PKCS#8 public key format"""
@@ -949,18 +1041,18 @@ class _TestPublicKey(TempDirTestCase):
"""Check OpenSSH private key format"""
with self.subTest('Import OpenSSH private'):
- self.import_openssh_private()
+ self.import_openssh_private(_openssh_available)
with self.subTest('Export OpenSSH private'):
- self.export_openssh_private()
+ self.export_openssh_private(_openssh_available)
if bcrypt_available: # pragma: no branch
- for cipher in openssh_ciphers:
+ for cipher, use_openssh in openssh_ciphers:
with self.subTest('Import OpenSSH private (%s)' % cipher):
- self.import_openssh_private(cipher)
+ self.import_openssh_private(use_openssh, cipher)
with self.subTest('Export OpenSSH private (%s)' % cipher):
- self.export_openssh_private(cipher)
+ self.export_openssh_private(use_openssh, cipher)
def check_openssh_public(self):
"""Check OpenSSH public key format"""
@@ -1204,7 +1296,7 @@ class _TestPublicKey(TempDirTestCase):
if 'pkcs1' in self.private_formats:
self.check_pkcs1_private()
- if 'pkcs1' in self.public_formats and _pkcs1_public_supported:
+ if 'pkcs1' in self.public_formats:
self.check_pkcs1_public()
if 'pkcs8' in self.private_formats:
@@ -1306,18 +1398,19 @@ class _TestPublicKeyTopLevel(TempDirTestCase):
def test_ec_explicit(self):
"""Test EC certificate with explcit parameters"""
- for curve in ('secp256r1', 'secp384r1', 'secp521r1'):
- with self.subTest('Import EC key with explicit parameters',
- curve=curve):
- run('openssl ecparam -out priv -noout -genkey -name %s '
- '-param_enc explicit' % curve)
- asyncssh.read_private_key('priv')
-
- with self.subTest('Import EC key with unknown explicit parameters'):
- run('openssl ecparam -out priv -noout -genkey -name secp112r1 '
- '-param_enc explicit')
- with self.assertRaises(asyncssh.KeyImportError):
- asyncssh.read_private_key('priv')
+ if _openssl_available: # pragma: no branch
+ for curve in ('secp256r1', 'secp384r1', 'secp521r1'):
+ with self.subTest('Import EC key with explicit parameters',
+ curve=curve):
+ run('openssl ecparam -out priv -noout -genkey -name %s '
+ '-param_enc explicit' % curve)
+ asyncssh.read_private_key('priv')
+
+ with self.subTest('Import EC key with unknown explicit parameters'):
+ run('openssl ecparam -out priv -noout -genkey -name secp112r1 '
+ '-param_enc explicit')
+ with self.assertRaises(asyncssh.KeyImportError):
+ asyncssh.read_private_key('priv')
def test_generate_errors(self):
"""Test errors in private key and certificate generation"""
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asyncssh.git
More information about the Python-modules-commits
mailing list