[Python-modules-commits] [python-asyncssh] 07/14: Import python-asyncssh_1.6.2.orig.tar.gz
Vincent Bernat
bernat at moszumanska.debian.org
Sat Sep 10 12:10:10 UTC 2016
This is an automated email from the git hooks/post-receive script.
bernat pushed a commit to branch master
in repository python-asyncssh.
commit c29d1479a0acc9e21fd871c7bc0d4121d76da1fe
Author: Vincent Bernat <bernat at debian.org>
Date: Sat Sep 10 13:54:56 2016 +0200
Import python-asyncssh_1.6.2.orig.tar.gz
---
asyncssh/dsa.py | 2 +-
asyncssh/ecdsa.py | 2 +-
asyncssh/ed25519.py | 2 +-
asyncssh/public_key.py | 534 +++++++++++++++++++++++++++++++++++++++++------
asyncssh/rsa.py | 2 +-
asyncssh/version.py | 2 +-
docs/api.rst | 41 +++-
docs/changes.rst | 12 ++
tests/test_editor.py | 11 +-
tests/test_public_key.py | 100 ++++++++-
tests/util.py | 9 +-
11 files changed, 629 insertions(+), 88 deletions(-)
diff --git a/asyncssh/dsa.py b/asyncssh/dsa.py
index c70c916..1f25e39 100644
--- a/asyncssh/dsa.py
+++ b/asyncssh/dsa.py
@@ -225,5 +225,5 @@ class _DSAKey(SSHKey):
register_public_key_alg(b'ssh-dss', _DSAKey)
-register_certificate_alg(b'ssh-dss-cert-v01 at openssh.com',
+register_certificate_alg(1, b'ssh-dss-cert-v01 at openssh.com',
_DSAKey, SSHCertificateV01)
diff --git a/asyncssh/ecdsa.py b/asyncssh/ecdsa.py
index 45645b7..9fe9011 100644
--- a/asyncssh/ecdsa.py
+++ b/asyncssh/ecdsa.py
@@ -290,4 +290,4 @@ for _curve_id, _oid in {(b'nistp521', '1.3.132.0.35'),
_alg_oid_map[_oid] = _curve_id
register_public_key_alg(_algorithm, _ECKey)
- register_certificate_alg(_cert_algorithm, _ECKey, SSHCertificateV01)
+ register_certificate_alg(1, _cert_algorithm, _ECKey, SSHCertificateV01)
diff --git a/asyncssh/ed25519.py b/asyncssh/ed25519.py
index d29cb99..c7ad39f 100644
--- a/asyncssh/ed25519.py
+++ b/asyncssh/ed25519.py
@@ -123,5 +123,5 @@ except (ImportError, OSError): # pragma: no cover
else:
register_public_key_alg(b'ssh-ed25519', _Ed25519Key)
- register_certificate_alg(b'ssh-ed25519-cert-v01 at openssh.com',
+ register_certificate_alg(1, b'ssh-ed25519-cert-v01 at openssh.com',
_Ed25519Key, SSHCertificateV01)
diff --git a/asyncssh/public_key.py b/asyncssh/public_key.py
index 8a200f3..691d8ce 100644
--- a/asyncssh/public_key.py
+++ b/asyncssh/public_key.py
@@ -13,7 +13,9 @@
"""SSH asymmetric encryption handlers"""
import binascii
+from datetime import datetime, timedelta
import os
+import re
import time
try:
@@ -25,7 +27,8 @@ except ImportError: # pragma: no cover
from .asn1 import ASN1DecodeError, BitString, der_encode, der_decode
from .cipher import get_encryption_params, get_cipher
from .misc import ip_network
-from .packet import String, UInt32, UInt64, PacketDecodeError, SSHPacket
+from .packet import NameList, String, UInt32, UInt64
+from .packet import PacketDecodeError, SSHPacket
from .pbe import KeyEncryptionError, pkcs1_encrypt, pkcs8_encrypt
from .pbe import pkcs1_decrypt, pkcs8_decrypt
@@ -34,9 +37,18 @@ _public_key_algs = []
_certificate_algs = []
_public_key_alg_map = {}
_certificate_alg_map = {}
+_certificate_version_map = {}
_pem_map = {}
_pkcs8_oid_map = {}
+_abs_date_pattern = re.compile(r'\d{8}')
+_abs_time_pattern = re.compile(r'\d{14}')
+_rel_time_pattern = re.compile(r'(?:(?P<weeks>[+-]?\d+)[Ww]|'
+ r'(?P<days>[+-]?\d+)[Dd]|'
+ r'(?P<hours>[+-]?\d+)[Hh]|'
+ r'(?P<minutes>[+-]?\d+)[Mm]|'
+ r'(?P<seconds>[+-]?\d+)[Ss])+')
+
# SSH certificate types
CERT_TYPE_USER = 1
CERT_TYPE_HOST = 2
@@ -46,6 +58,35 @@ _OPENSSH_SALT_LEN = 16
_OPENSSH_WRAP_LEN = 70
+def _parse_time(t):
+ """Parse a time value"""
+
+ if isinstance(t, int):
+ return t
+ elif isinstance(t, float):
+ return int(t)
+ elif isinstance(t, datetime):
+ return int(t.timestamp())
+ elif isinstance(t, str):
+ if t == 'now':
+ return int(time.time())
+
+ match = _abs_date_pattern.fullmatch(t)
+ if match:
+ return int(datetime.strptime(t, '%Y%m%d').timestamp())
+
+ match = _abs_time_pattern.fullmatch(t)
+ if match:
+ return int(datetime.strptime(t, '%Y%m%d%H%M%S').timestamp())
+
+ match = _rel_time_pattern.fullmatch(t)
+ if match:
+ delta = {k: int(v) for k, v in match.groupdict(0).items()}
+ return int(time.time() + timedelta(**delta).total_seconds())
+
+ raise ValueError('Unrecognized time value')
+
+
def _wrap_base64(data, wrap=64):
"""Break a Base64 value into multiple lines."""
@@ -57,8 +98,11 @@ def _wrap_base64(data, wrap=64):
class KeyGenerationError(ValueError):
"""Key generation error
- This exception is raised by :func:`generate_private_key` when
- the requested algorithm or parameters are unsupported.
+ This exception is raised by :func:`generate_private_key`,
+ :meth:`generate_user_certificate() <SSHKey.generate_user_certificate>`
+ or :meth:`generate_host_certificate()
+ <SSHKey.generate_host_certificate>` when the requested parameters are
+ unsupported.
"""
@@ -89,6 +133,21 @@ class SSHKey:
pem_name = None
pkcs8_oid = None
+ def _generate_certificate(self, key, version, serial, cert_type,
+ key_id, principals, valid_after,
+ valid_before, cert_options):
+ """Generate a new SSH certificate"""
+
+ try:
+ algorithm, cert_handler = _certificate_version_map[type(key),
+ version]
+ except KeyError:
+ raise KeyGenerationError('Unknown certificate version') from None
+
+ return cert_handler.generate(self, algorithm, key, serial, cert_type,
+ key_id, principals, valid_after,
+ valid_before, cert_options)
+
def encode_pkcs1_private(self):
"""Export parameters associated with a PKCS#1 private key"""
@@ -130,12 +189,161 @@ class SSHKey:
return String(self.algorithm) + self.encode_ssh_public()
+ def convert_to_public(self):
+ """Return public key corresponding to this key
+
+ This method converts an :class:`SSHKey` object which contains
+ a private key into one which contains only the corresponding
+ public key. If it is called on something which is already
+ a public key, it has no effect.
+
+ """
+
+ return decode_ssh_public_key(self.get_ssh_public_key())
+
+ def generate_user_certificate(self, user_key, key_id, version=1,
+ serial=0, principals=(), valid_after=0,
+ valid_before=0xffffffffffffffff,
+ force_command=None, source_address=None,
+ permit_x11_forwarding=True,
+ permit_agent_forwarding=True,
+ permit_port_forwarding=True,
+ permit_pty=True, permit_user_rc=True):
+ """Generate a new SSH user certificate
+
+ This method returns a SSH user certifcate with the requested
+ attributes signed by this private key.
+
+ :param user_key:
+ The user's public key.
+ :param str key_id:
+ The key identifier associated with this certificate.
+ :param int version: (optional)
+ The version of certificate to create, defaulting to 1.
+ :param int serial: (optional)
+ The serial number of the certificate, defaulting to 0.
+ :param principals: (optional)
+ The user names this certificate is valid for. By default,
+ it can be used with any user name.
+ :param valid_after: (optional)
+ The earliest time the certificate is valid for, defaulting to
+ no restriction on when the certificate starts being valid.
+ See :ref:`SpecifyingTimeValues` for allowed time specifications.
+ :param valid_before: (optional)
+ The latest time the certificate is valid for, defaulting to
+ no restriction on when the certificate stops being valid.
+ See :ref:`SpecifyingTimeValues` for allowed time specifications.
+ :param force_command: (optional)
+ The command (if any) to force a session to run when this
+ certificate is used.
+ :param source_address: (optional)
+ A list of source addresses and networks for which the
+ certificate is valid, defaulting to all addresses.
+ :param bool permit_x11_forwarding: (optional)
+ Whether or not to allow this user to use X11 forwarding,
+ defaulting to ``True``.
+ :param bool permit_agent_forwarding: (optional)
+ Whether or not to allow this user to use agent forwarding,
+ defaulting to ``True``.
+ :param bool permit_port_forwarding: (optional)
+ Whether or not to allow this user to use port forwarding,
+ defaulting to ``True``.
+ :param bool permit_pty: (optional)
+ Whether or not to allow this user to allocate a
+ pseudo-terminal, defaulting to ``True``.
+ :param bool permit_user_rc: (optional)
+ Whether or not to run the user rc file when this certificate
+ is used, defaulting to ``True``.
+ :type user_key: :class:`SSHKey`
+ :type principals: list of strings
+ :type force_command: `str` or ``None``
+ :type source_address: list of ip_address and ip_network values
+
+ :returns: :class:`SSHCertificate`
+
+ :raises: | :exc:`ValueError` if the validity times are invalid
+ | :exc:`KeyGenerationError` if the requested certificate
+ parameters are unsupported
+
+ """
+
+ cert_options = {}
+
+ if force_command:
+ cert_options['force-command'] = force_command
+
+ if source_address:
+ cert_options['source-address'] = [ip_network(addr)
+ for addr in source_address]
+
+ if permit_x11_forwarding:
+ cert_options['permit-X11-forwarding'] = True
+
+ if permit_agent_forwarding:
+ cert_options['permit-agent-forwarding'] = True
+
+ if permit_port_forwarding:
+ cert_options['permit-port-forwarding'] = True
+
+ if permit_pty:
+ cert_options['permit-pty'] = True
+
+ if permit_user_rc:
+ cert_options['permit-user-rc'] = True
+
+ return self._generate_certificate(user_key, version, serial,
+ CERT_TYPE_USER, key_id,
+ principals, valid_after,
+ valid_before, cert_options)
+
+ def generate_host_certificate(self, host_key, key_id, version=1,
+ serial=0, principals=(), valid_after=0,
+ valid_before=0xffffffffffffffff):
+ """Generate a new SSH host certificate
+
+ This method returns a SSH host certifcate with the requested
+ attributes signed by this private key.
+
+ :param host_key:
+ The host's public key.
+ :param str key_id:
+ The key identifier associated with this certificate.
+ :param int version: (optional)
+ The version of certificate to create, defaulting to 1.
+ :param int serial: (optional)
+ The serial number of the certificate, defaulting to 0.
+ :param principals: (optional)
+ The host names this certificate is valid for. By default,
+ it can be used with any host name.
+ :param valid_after: (optional)
+ The earliest time the certificate is valid for, defaulting to
+ no restriction on when the certificate starts being valid.
+ See :ref:`SpecifyingTimeValues` for allowed time specifications.
+ :param valid_before: (optional)
+ The latest time the certificate is valid for, defaulting to
+ no restriction on when the certificate stops being valid.
+ See :ref:`SpecifyingTimeValues` for allowed time specifications.
+ :type host_key: :class:`SSHKey`
+ :type principals: list of strings
+
+ :returns: :class:`SSHCertificate`
+
+ :raises: | :exc:`ValueError` if the validity times are invalid
+ | :exc:`KeyGenerationError` if the requested certificate
+ parameters are unsupported
+ """
+
+ return self._generate_certificate(host_key, version, serial,
+ CERT_TYPE_HOST, key_id,
+ principals, valid_after,
+ valid_before, {})
+
def export_private_key(self, format_name, passphrase=None,
cipher_name='aes256-cbc', hash_name='sha256',
pbe_version=2, rounds=16):
"""Export a private key in the requested format
- This function returns this object's private key encoded in the
+ This method returns this object's private key encoded in the
requested format. If a passphrase is specified, the key will
be exported in encrypted form.
@@ -299,7 +507,7 @@ class SSHKey:
def export_public_key(self, format_name):
"""Export a public key in the requested format
- This function returns this object's public key encoded in the
+ This method returns this object's public key encoded in the
requested format. Available formats include:
pkcs1-der, pkcs1-pem, pkcs8-der, pkcs8-pem, openssh, rfc4716
@@ -346,14 +554,14 @@ class SSHKey:
def write_private_key(self, filename, *args, **kwargs):
"""Write a private key to a file in the requested format
- This function is a simple wrapper around export_private_key
+ This method is a simple wrapper around :meth:`export_private_key`
which writes the exported key data to a file.
:param str filename:
The filename to write the private key to.
:param \\*args,\\ \\*\\*kwargs:
Additional arguments to pass through to
- :func:`export_private_key`.
+ :meth:`export_private_key`.
"""
@@ -363,14 +571,14 @@ class SSHKey:
def write_public_key(self, filename, *args, **kwargs):
"""Write a public key to a file in the requested format
- This function is a simple wrapper around export_public_key
+ This method is a simple wrapper around :meth:`export_public_key`
which writes the exported key data to a file.
:param str filename:
The filename to write the public key to.
:param \\*args,\\ \\*\\*kwargs:
Additional arguments to pass through to
- :func:`export_public_key`.
+ :meth:`export_public_key`.
"""
@@ -381,61 +589,182 @@ class SSHKey:
class SSHCertificate:
"""Parent class which holds an SSH certificate"""
- # pylint: disable=bad-whitespace
+ _user_option_encoders = []
+ _user_extension_encoders = []
+ _host_option_encoders = []
+ _host_extension_encoders = []
- _user_critical_options = {}
- _user_extensions = {}
- _host_critical_options = {}
- _host_extensions = {}
+ _user_option_decoders = {}
+ _user_extension_decoders = {}
+ _host_option_decoders = {}
+ _host_extension_decoders = {}
- # pylint: enable=bad-whitespace
+ def __init__(self, algorithm, key, data, principals, options, signing_key,
+ serial, cert_type, key_id, valid_after, valid_before):
+ self.algorithm = algorithm
+ self.key = key
+ self.data = data
+ self.principals = principals
+ self.options = options
+ self.signing_key = signing_key
+
+ self._serial = serial
+ self._cert_type = cert_type
+ self._key_id = key_id
+ self._valid_after = valid_after
+ self._valid_before = valid_before
+
+ @classmethod
+ def generate(cls, signing_key, algorithm, key, serial, cert_type, key_id,
+ principals, valid_after, valid_before, options):
+ """Generate a new SSH certificate"""
+
+ principals = list(principals)
+ valid_after = _parse_time(valid_after)
+ valid_before = _parse_time(valid_before)
+
+ if valid_before <= valid_after:
+ raise ValueError('Valid before time must be later than '
+ 'valid after time')
+
+ cert_principals = b''.join(String(p) for p in principals)
+
+ if cert_type == CERT_TYPE_USER:
+ cert_options = cls._encode_options(options,
+ cls._user_option_encoders)
+ cert_extensions = cls._encode_options(options,
+ cls._user_extension_encoders)
+ else:
+ cert_options = cls._encode_options(options,
+ cls._host_option_encoders)
+ cert_extensions = cls._encode_options(options,
+ cls._host_extension_encoders)
+
+ data = b''.join((String(algorithm),
+ cls._encode(key, serial, cert_type, key_id,
+ cert_principals, valid_after,
+ valid_before, cert_options,
+ cert_extensions),
+ String(signing_key.get_ssh_public_key())))
+
+ data += String(signing_key.sign(data))
+
+ key = key.convert_to_public()
+ signing_key = signing_key.convert_to_public()
+
+ return cls(algorithm, key, data, principals, options, signing_key,
+ serial, cert_type, key_id, valid_after, valid_before)
+
+ @classmethod
+ def construct(cls, packet, algorithm, key_handler):
+ """Construct an SSH certificate"""
+
+ key_params, serial, cert_type, key_id, \
+ principals, valid_after, valid_before, \
+ options, extensions = cls._decode(packet, key_handler)
- def __init__(self, packet, algorithm, key_handler, key_params, serial,
- cert_type, key_id, valid_principals, valid_after,
- valid_before, options, extensions):
signing_key = decode_ssh_public_key(packet.get_string())
- msg = packet.get_consumed_payload()
+ data = packet.get_consumed_payload()
signature = packet.get_string()
packet.check_end()
- if not signing_key.verify(msg, signature):
+ if not signing_key.verify(data, signature):
raise KeyImportError('Invalid certificate signature')
- self.algorithm = algorithm
- self.key = key_handler.make_public(*key_params)
- self.data = packet.get_consumed_payload()
- self.options = {}
- self.principals = []
- self.signing_key = signing_key
+ key = key_handler.make_public(*key_params)
+ data = packet.get_consumed_payload()
- self._serial = serial
- self._cert_type = cert_type
- self._key_id = key_id
+ try:
+ key_id = key_id.decode('utf-8')
+ except UnicodeDecodeError:
+ raise KeyImportError('Invalid characters in key ID')
+
+ packet = SSHPacket(principals)
+ principals = []
- packet = SSHPacket(valid_principals)
while packet:
try:
principal = packet.get_string().decode('utf-8')
except UnicodeDecodeError:
raise KeyImportError('Invalid characters in principal name')
- self.principals.append(principal)
-
- self._valid_after = valid_after
- self._valid_before = valid_before
+ principals.append(principal)
if cert_type == CERT_TYPE_USER:
- self._decode_options(options, self._user_critical_options, True)
- self._decode_options(extensions, self._user_extensions, False)
+ options = cls._decode_options(options, cls._user_option_decoders,
+ True)
+ options.update(cls._decode_options(extensions,
+ cls._user_extension_decoders,
+ False))
elif cert_type == CERT_TYPE_HOST:
- self._decode_options(options, self._host_critical_options, True)
- self._decode_options(extensions, self._host_extensions, False)
+ options = cls._decode_options(options, cls._host_option_decoders,
+ True)
+ options.update(cls._decode_options(extensions,
+ cls._host_extension_decoders,
+ False))
else:
raise KeyImportError('Unknown certificate type')
+ return cls(algorithm, key, data, principals, options, signing_key,
+ serial, cert_type, key_id, valid_after, valid_before)
+
+ @classmethod
+ def _encode(cls, key, serial, cert_type, key_id, principals,
+ valid_after, valid_before, options, extensions):
+ """Encode an SSH certificate"""
+
+ raise NotImplementedError
+
+ @classmethod
+ def _decode(cls, packet, key_handler):
+ """Decode an SSH certificate"""
+
+ raise NotImplementedError
+
@staticmethod
- def _parse_force_command(packet):
- """Parse a force-command option"""
+ def _encode_options(options, encoders):
+ """Encode options found in this certificate"""
+
+ result = []
+
+ for name, encoder in encoders:
+ value = options.get(name)
+ if value:
+ result.append(String(name) + String(encoder(value)))
+
+ return b''.join(result)
+
+ @staticmethod
+ def _encode_bool(value):
+ """Encode a boolean option value"""
+
+ # pylint: disable=unused-argument
+
+ return b''
+
+ @staticmethod
+ def _encode_force_command(force_command):
+ """Encode a force-command option"""
+
+ return String(force_command)
+
+ @staticmethod
+ def _encode_source_address(source_address):
+ """Encode a source-address option"""
+
+ return NameList(str(addr).encode('ascii') for addr in source_address)
+
+ @staticmethod
+ def _decode_bool(packet):
+ """Decode a boolean option value"""
+
+ # pylint: disable=unused-argument
+
+ return True
+
+ @staticmethod
+ def _decode_force_command(packet):
+ """Decode a force-command option"""
try:
return packet.get_string().decode('utf-8')
@@ -443,8 +772,8 @@ class SSHCertificate:
raise KeyImportError('Invalid characters in command') from None
@staticmethod
- def _parse_source_address(packet):
- """Parse a source-address option"""
+ def _decode_source_address(packet):
+ """Decode a source-address option"""
try:
return [ip_network(addr.decode('ascii'))
@@ -452,23 +781,66 @@ class SSHCertificate:
except (UnicodeDecodeError, ValueError):
raise KeyImportError('Invalid source address') from None
- def _decode_options(self, options, valid_options, critical=True):
+ @staticmethod
+ def _decode_options(options, decoders, critical=True):
"""Decode options found in this certificate"""
packet = SSHPacket(options)
+ result = {}
+
while packet:
name = packet.get_string()
- data_packet = SSHPacket(packet.get_string())
- decoder = valid_options.get(name)
+ decoder = decoders.get(name)
if decoder:
- data = decoder(data_packet) if callable(decoder) else True
+ data_packet = SSHPacket(packet.get_string())
+ result[name.decode('ascii')] = decoder(data_packet)
data_packet.check_end()
- self.options[name.decode('ascii')] = data
elif critical:
raise KeyImportError('Unrecognized critical option: %s' %
name.decode('ascii', errors='replace'))
+ return result
+
+ def export_certificate(self, format_name):
+ """Export a certificate in the requested format
+
+ This function returns this certificate encoded in the requested
+ format. Available formats include:
+
+ openssh, rfc4716
+
+ :param str format:
+ The format to export the certificate in.
+
+ """
+
+ if format_name == 'openssh':
+ return self.algorithm + b' ' + binascii.b2a_base64(self.data)
+ elif format_name == 'rfc4716':
+ return (b'---- BEGIN SSH2 PUBLIC KEY ----\n' +
+ _wrap_base64(self.data) +
+ b'---- END SSH2 PUBLIC KEY ----\n')
+ else:
+ raise KeyExportError('Unknown export format')
+
+ def write_certificate(self, filename, *args, **kwargs):
+ """Write a certificate to a file in the requested format
+
+ This function is a simple wrapper around export_certificate
+ which writes the exported certificate to a file.
+
+ :param str filename:
+ The filename to write the certificate to.
+ :param \\*args,\\ \\*\\*kwargs:
+ Additional arguments to pass through to
+ :meth:`export_certificate`.
+
+ """
+
+ with open(filename, 'wb') as f:
+ f.write(self.export_certificate(*args, **kwargs))
+
def validate(self, cert_type, principal):
"""Validate the certificate type, validity period, and principal
@@ -502,37 +874,67 @@ class SSHCertificate:
class SSHCertificateV01(SSHCertificate):
- """Decoder class for version 01 SSH certificates"""
+ """Encoder/decoder class for version 01 SSH certificates"""
+
+ # pylint: disable=bad-whitespace
- _user_critical_options = {
- b'force-command': SSHCertificate._parse_force_command,
- b'source-address': SSHCertificate._parse_source_address
+ _user_option_encoders = (
+ ('force-command', SSHCertificate._encode_force_command),
+ ('source-address', SSHCertificate._encode_source_address)
+ )
+
+ _user_extension_encoders = (
+ ('permit-X11-forwarding', SSHCertificate._encode_bool),
+ ('permit-agent-forwarding', SSHCertificate._encode_bool),
+ ('permit-port-forwarding', SSHCertificate._encode_bool),
+ ('permit-pty', SSHCertificate._encode_bool),
+ ('permit-user-rc', SSHCertificate._encode_bool)
+ )
+
+ _user_option_decoders = {
+ b'force-command': SSHCertificate._decode_force_command,
+ b'source-address': SSHCertificate._decode_source_address
}
- _user_extensions = {
- b'permit-X11-forwarding': True,
- b'permit-agent-forwarding': True,
- b'permit-port-forwarding': True,
- b'permit-pty': True,
- b'permit-user-rc': True
+ _user_extension_decoders = {
+ b'permit-X11-forwarding': SSHCertificate._decode_bool,
+ b'permit-agent-forwarding': SSHCertificate._decode_bool,
+ b'permit-port-forwarding': SSHCertificate._decode_bool,
+ b'permit-pty': SSHCertificate._decode_bool,
+ b'permit-user-rc': SSHCertificate._decode_bool
}
- def __init__(self, packet, algorithm, key_handler):
+ # pylint: enable=bad-whitespace
+
+ @classmethod
+ def _encode(cls, key, serial, cert_type, key_id, principals,
+ valid_after, valid_before, options, extensions):
+ """Encode a version 01 SSH certificate"""
+
+ return b''.join((String(os.urandom(32)), key.encode_ssh_public(),
+ UInt64(serial), UInt32(cert_type), String(key_id),
+ String(principals), UInt64(valid_after),
+ UInt64(valid_before), String(options),
+ String(extensions), String('')))
+
+ @classmethod
+ def _decode(cls, packet, key_handler):
+ """Decode a version 01 SSH certificate"""
+
_ = packet.get_string() # nonce
key_params = key_handler.decode_ssh_public(packet)
serial = packet.get_uint64()
cert_type = packet.get_uint32()
key_id = packet.get_string()
- valid_principals = packet.get_string()
+ principals = packet.get_string()
valid_after = packet.get_uint64()
valid_before = packet.get_uint64()
options = packet.get_string()
extensions = packet.get_string()
_ = packet.get_string() # reserved
- super().__init__(packet, algorithm, key_handler, key_params, serial,
- cert_type, key_id, valid_principals, valid_after,
- valid_before, options, extensions)
+ return (key_params, serial, cert_type, key_id, principals,
+ valid_after, valid_before, options, extensions)
class SSHKeyPair:
@@ -1021,12 +1423,14 @@ def register_public_key_alg(algorithm, handler):
_pkcs8_oid_map[handler.pkcs8_oid] = handler
-def register_certificate_alg(algorithm, key_handler, cert_handler):
+def register_certificate_alg(version, algorithm, key_handler, cert_handler):
"""Register a new certificate algorithm"""
_certificate_alg_map[algorithm] = (key_handler, cert_handler)
_certificate_algs.append(algorithm)
+ _certificate_version_map[key_handler, version] = (algorithm, cert_handler)
+
def get_public_key_algs():
"""Return supported public key algorithms"""
@@ -1071,7 +1475,7 @@ def decode_ssh_certificate(data):
key_handler, cert_handler = _certificate_alg_map.get(alg, (None, None))
if cert_handler:
- return cert_handler(packet, alg, key_handler)
+ return cert_handler.construct(packet, alg, key_handler)
else:
raise KeyImportError('Unknown certificate algorithm: %s' %
alg.decode('ascii', errors='replace'))
@@ -1114,6 +1518,8 @@ def generate_private_key(alg_name, **kwargs):
:returns: An :class:`SSHKey` private key
+ :raises: :exc:`KeyGenerationError` if the requested key parameters
+ are unsupported
"""
algorithm = alg_name.encode('utf-8')
diff --git a/asyncssh/rsa.py b/asyncssh/rsa.py
index 9f72413..a125e44 100644
--- a/asyncssh/rsa.py
+++ b/asyncssh/rsa.py
@@ -204,5 +204,5 @@ class _RSAKey(SSHKey):
register_public_key_alg(b'ssh-rsa', _RSAKey)
-register_certificate_alg(b'ssh-rsa-cert-v01 at openssh.com',
+register_certificate_alg(1, b'ssh-rsa-cert-v01 at openssh.com',
_RSAKey, SSHCertificateV01)
diff --git a/asyncssh/version.py b/asyncssh/version.py
index 3647dd1..a1e3cc9 100644
--- a/asyncssh/version.py
+++ b/asyncssh/version.py
@@ -18,4 +18,4 @@ __author_email__ = 'ronf at timeheart.net'
__url__ = 'http://asyncssh.timeheart.net'
-__version__ = '1.6.1'
+__version__ = '1.6.2'
diff --git a/docs/api.rst b/docs/api.rst
index 010d694..6314c0d 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -1021,17 +1021,45 @@ to import it from, or an already loaded :class:`SSHKey` public key.
See the function :func:`import_public_key` for the list of supported
public key formats.
+.. index:: Specifying time values
+.. _SpecifyingTimeValues:
+
+Specifying time values
+----------------------
+
+When generating certificates, an optional validity interval can be
+specified using the ``valid_after`` and ``valid_before`` parameters
+to the :meth:`generate_user_certificate() <SSHKey.generate_user_certificate>`
+and :meth:`generate_host_certificate() <SSHKey.generate_host_certificate>`
+methods. These values can be specified in any of the following ways:
+
+ * An int or float UNIX epoch time, such as what is returned by
+ :func:`time.time`.
+ * A :class:`datetime.datetime` value.
+ * A string value of ``now`` to request the current time.
+ * A string value in the form ``YYYYMMDD`` to specify an absolute date.
+ * A string value in the form ``YYYYMMDDHHMMSS`` to specify an
+ absolute date and time.
+ * A relative time made up of a mix of positive or negative numbers and
+ the letters 'w', 'd', 'h', 'm', and 's', representing an offset from
+ the current time in weeks, days, hours, minutes, or seconds,
+ respectively. Multiple of these values can be included. For
+ instance, '+1w2d3h' means 1 week, 2 days, and 3 hours in the future.
+
SSHKey
------
.. autoclass:: SSHKey()
- ================================== =
+ ========================================= =
+ .. automethod:: convert_to_public
+ .. automethod:: generate_user_certificate
+ .. automethod:: generate_host_certificate
.. automethod:: export_private_key
.. automethod:: export_public_key
.. automethod:: write_private_key
.. automethod:: write_public_key
- ================================== =
+ ========================================= =
SSHKeyPair
----------
@@ -1045,7 +1073,11 @@ SSHCertificate
.. autoclass:: SSHCertificate()
+ ================================== =
+ .. automethod:: export_certificate
+ .. automethod:: write_certificate
.. automethod:: validate
+ ================================== =
generate_private_key
--------------------
@@ -1272,6 +1304,11 @@ KeyEncryptionError
.. autoexception:: KeyEncryptionError
+KeyGenerationError
+------------------
+
+.. autoexception:: KeyGenerationError
+
.. index:: Supported algorithms
.. _SupportedAlgorithms:
diff --git a/docs/changes.rst b/docs/changes.rst
index 9e9b1e5..d18e943 100644
--- a/docs/changes.rst
+++ b/docs/changes.rst
@@ -3,6 +3,18 @@
Change Log
==========
+Release 1.6.2 (4 Sep 2016)
+--------------------------
+
+* Added generate_user_certificate() and generate_host_certificate() methods
+ to SSHKey class to generate SSH certificates, and export_certificate()
+ and write_certificate() methods on SSHCertificate class to export
+ certificates for use in other tools.
+
+* Improved editor unit tests to eliminate timing dependency.
+
+* Cleaned up a few minor documentation issues.
+
Release 1.6.1 (27 Aug 2016)
---------------------------
diff --git a/tests/test_editor.py b/tests/test_editor.py
index 8e0ccc8..fbd770c 100644
--- a/tests/test_editor.py
+++ b/tests/test_editor.py
@@ -34,6 +34,7 @@ def _handle_session(stdin, stdout, stderr):
data += yield from stdin.readline()
except asyncssh.BreakReceived:
break_count += 1
+ stdout.write('B')
if break_count == 1:
# Set twice to get coverage of when echo isn't changing
@@ -170,7 +171,7 @@ class _TestEditor(_CheckEditor):
process = yield from conn.create_process(term_type='ansi')
process.stdin.write('\x03')
- yield from asyncio.sleep(0.1)
+ yield from process.stdout.readexactly(1)
process.stdin.write('abcd\x08\n')
process.stdin.write_eof()
@@ -186,12 +187,12 @@ class _TestEditor(_CheckEditor):
process = yield from conn.create_process(term_type='ansi')
process.stdin.write('\x03')
- yield from asyncio.sleep(0.1)
+ yield from process.stdout.readexactly(1)
process.stdin.write('abc')
process.stdin.write('\x03')
- yield from asyncio.sleep(0.1)
+ yield from process.stdout.readexactly(1)
process.stdin.write('\n')
process.stdin.write_eof()
@@ -207,10 +208,10 @@ class _TestEditor(_CheckEditor):
process = yield from conn.create_process(term_type='ansi')
process.stdin.write('\x03\x03')
- yield from asyncio.sleep(0.1)
+ yield from process.stdout.readexactly(2)
process.stdin.write('abc\x03')
- yield from asyncio.sleep(0.1)
+ yield from process.stdout.readexactly(15)
process.stdin.write('\n')
process.stdin.write_eof()
diff --git a/tests/test_public_key.py b/tests/test_public_key.py
index 2cdc2e3..f5be853 100644
--- a/tests/test_public_key.py
+++ b/tests/test_public_key.py
@@ -18,6 +18,7 @@
"""
import binascii
+from datetime import datetime
import os
from .util import bcrypt_available, libnacl_available
@@ -35,7 +36,6 @@ from asyncssh.asn1 import TaggedDERObject
from asyncssh.packet import MPInt, String, UInt32
from asyncssh.pbe import pkcs1_decrypt
from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST, SSHKey
-from asyncssh.public_key import decode_ssh_public_key
from asyncssh.public_key import get_public_key_algs, get_certificate_algs
@@ -876,6 +876,11 @@ class _TestPublicKey(TempDirTestCase):
cert = read_certificate('cert')
self.assertEqual(cert.key, self.pubkey)
+ with self.subTest('Export certificate'):
+ cert.write_certificate('cert2', fmt)
+ cert2 = read_certificate('cert2')
+ self.assertEqual(cert.key, cert2.key)
+
with self.subTest('Validate certificate'):
self.assertIsNone(cert.validate(cert_type, 'name'))
@@ -908,23 +913,30 @@ class _TestPublicKey(TempDirTestCase):
with self.subTest('Invalid certificate critical option'):
with self.assertRaises(KeyImportError):
cert = self.make_certificate(cert_type, self.pubkey,
- self.privca, 'name',
+ self.privca, ('name',),
options={b'xxx': b''})
import_certificate(cert)
with self.subTest('Ignored certificate extension'):
cert = self.make_certificate(cert_type, self.pubkey,
- self.privca, 'name',
+ self.privca, ('name',),
... 153 lines suppressed ...
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asyncssh.git
More information about the Python-modules-commits
mailing list