[Python-modules-commits] [python-asyncssh] 07/14: Import python-asyncssh_1.6.2.orig.tar.gz

Vincent Bernat bernat at moszumanska.debian.org
Sat Sep 10 12:10:10 UTC 2016


This is an automated email from the git hooks/post-receive script.

bernat pushed a commit to branch master
in repository python-asyncssh.

commit c29d1479a0acc9e21fd871c7bc0d4121d76da1fe
Author: Vincent Bernat <bernat at debian.org>
Date:   Sat Sep 10 13:54:56 2016 +0200

    Import python-asyncssh_1.6.2.orig.tar.gz
---
 asyncssh/dsa.py          |   2 +-
 asyncssh/ecdsa.py        |   2 +-
 asyncssh/ed25519.py      |   2 +-
 asyncssh/public_key.py   | 534 +++++++++++++++++++++++++++++++++++++++++------
 asyncssh/rsa.py          |   2 +-
 asyncssh/version.py      |   2 +-
 docs/api.rst             |  41 +++-
 docs/changes.rst         |  12 ++
 tests/test_editor.py     |  11 +-
 tests/test_public_key.py | 100 ++++++++-
 tests/util.py            |   9 +-
 11 files changed, 629 insertions(+), 88 deletions(-)

diff --git a/asyncssh/dsa.py b/asyncssh/dsa.py
index c70c916..1f25e39 100644
--- a/asyncssh/dsa.py
+++ b/asyncssh/dsa.py
@@ -225,5 +225,5 @@ class _DSAKey(SSHKey):
 
 register_public_key_alg(b'ssh-dss', _DSAKey)
 
-register_certificate_alg(b'ssh-dss-cert-v01 at openssh.com',
+register_certificate_alg(1, b'ssh-dss-cert-v01 at openssh.com',
                          _DSAKey, SSHCertificateV01)
diff --git a/asyncssh/ecdsa.py b/asyncssh/ecdsa.py
index 45645b7..9fe9011 100644
--- a/asyncssh/ecdsa.py
+++ b/asyncssh/ecdsa.py
@@ -290,4 +290,4 @@ for _curve_id, _oid in {(b'nistp521', '1.3.132.0.35'),
     _alg_oid_map[_oid] = _curve_id
 
     register_public_key_alg(_algorithm, _ECKey)
-    register_certificate_alg(_cert_algorithm, _ECKey, SSHCertificateV01)
+    register_certificate_alg(1, _cert_algorithm, _ECKey, SSHCertificateV01)
diff --git a/asyncssh/ed25519.py b/asyncssh/ed25519.py
index d29cb99..c7ad39f 100644
--- a/asyncssh/ed25519.py
+++ b/asyncssh/ed25519.py
@@ -123,5 +123,5 @@ except (ImportError, OSError): # pragma: no cover
 else:
     register_public_key_alg(b'ssh-ed25519', _Ed25519Key)
 
-    register_certificate_alg(b'ssh-ed25519-cert-v01 at openssh.com',
+    register_certificate_alg(1, b'ssh-ed25519-cert-v01 at openssh.com',
                              _Ed25519Key, SSHCertificateV01)
diff --git a/asyncssh/public_key.py b/asyncssh/public_key.py
index 8a200f3..691d8ce 100644
--- a/asyncssh/public_key.py
+++ b/asyncssh/public_key.py
@@ -13,7 +13,9 @@
 """SSH asymmetric encryption handlers"""
 
 import binascii
+from datetime import datetime, timedelta
 import os
+import re
 import time
 
 try:
@@ -25,7 +27,8 @@ except ImportError: # pragma: no cover
 from .asn1 import ASN1DecodeError, BitString, der_encode, der_decode
 from .cipher import get_encryption_params, get_cipher
 from .misc import ip_network
-from .packet import String, UInt32, UInt64, PacketDecodeError, SSHPacket
+from .packet import NameList, String, UInt32, UInt64
+from .packet import PacketDecodeError, SSHPacket
 from .pbe import KeyEncryptionError, pkcs1_encrypt, pkcs8_encrypt
 from .pbe import pkcs1_decrypt, pkcs8_decrypt
 
@@ -34,9 +37,18 @@ _public_key_algs = []
 _certificate_algs = []
 _public_key_alg_map = {}
 _certificate_alg_map = {}
+_certificate_version_map = {}
 _pem_map = {}
 _pkcs8_oid_map = {}
 
+_abs_date_pattern = re.compile(r'\d{8}')
+_abs_time_pattern = re.compile(r'\d{14}')
+_rel_time_pattern = re.compile(r'(?:(?P<weeks>[+-]?\d+)[Ww]|'
+                               r'(?P<days>[+-]?\d+)[Dd]|'
+                               r'(?P<hours>[+-]?\d+)[Hh]|'
+                               r'(?P<minutes>[+-]?\d+)[Mm]|'
+                               r'(?P<seconds>[+-]?\d+)[Ss])+')
+
 # SSH certificate types
 CERT_TYPE_USER = 1
 CERT_TYPE_HOST = 2
@@ -46,6 +58,35 @@ _OPENSSH_SALT_LEN = 16
 _OPENSSH_WRAP_LEN = 70
 
 
+def _parse_time(t):
+    """Parse a time value"""
+
+    if isinstance(t, int):
+        return t
+    elif isinstance(t, float):
+        return int(t)
+    elif isinstance(t, datetime):
+        return int(t.timestamp())
+    elif isinstance(t, str):
+        if t == 'now':
+            return int(time.time())
+
+        match = _abs_date_pattern.fullmatch(t)
+        if match:
+            return int(datetime.strptime(t, '%Y%m%d').timestamp())
+
+        match = _abs_time_pattern.fullmatch(t)
+        if match:
+            return int(datetime.strptime(t, '%Y%m%d%H%M%S').timestamp())
+
+        match = _rel_time_pattern.fullmatch(t)
+        if match:
+            delta = {k: int(v) for k, v in match.groupdict(0).items()}
+            return int(time.time() + timedelta(**delta).total_seconds())
+
+    raise ValueError('Unrecognized time value')
+
+
 def _wrap_base64(data, wrap=64):
     """Break a Base64 value into multiple lines."""
 
@@ -57,8 +98,11 @@ def _wrap_base64(data, wrap=64):
 class KeyGenerationError(ValueError):
     """Key generation error
 
-       This exception is raised by :func:`generate_private_key` when
-       the requested algorithm or parameters are unsupported.
+       This exception is raised by :func:`generate_private_key`,
+       :meth:`generate_user_certificate() <SSHKey.generate_user_certificate>`
+       or :meth:`generate_host_certificate()
+       <SSHKey.generate_host_certificate>` when the requested parameters are
+       unsupported.
 
     """
 
@@ -89,6 +133,21 @@ class SSHKey:
     pem_name = None
     pkcs8_oid = None
 
+    def _generate_certificate(self, key, version, serial, cert_type,
+                              key_id, principals, valid_after,
+                              valid_before, cert_options):
+        """Generate a new SSH certificate"""
+
+        try:
+            algorithm, cert_handler = _certificate_version_map[type(key),
+                                                               version]
+        except KeyError:
+            raise KeyGenerationError('Unknown certificate version') from None
+
+        return cert_handler.generate(self, algorithm, key, serial, cert_type,
+                                     key_id, principals, valid_after,
+                                     valid_before, cert_options)
+
     def encode_pkcs1_private(self):
         """Export parameters associated with a PKCS#1 private key"""
 
@@ -130,12 +189,161 @@ class SSHKey:
 
         return String(self.algorithm) + self.encode_ssh_public()
 
+    def convert_to_public(self):
+        """Return public key corresponding to this key
+
+           This method converts an :class:`SSHKey` object which contains
+           a private key into one which contains only the corresponding
+           public key. If it is called on something which is already
+           a public key, it has no effect.
+
+        """
+
+        return decode_ssh_public_key(self.get_ssh_public_key())
+
+    def generate_user_certificate(self, user_key, key_id, version=1,
+                                  serial=0, principals=(), valid_after=0,
+                                  valid_before=0xffffffffffffffff,
+                                  force_command=None, source_address=None,
+                                  permit_x11_forwarding=True,
+                                  permit_agent_forwarding=True,
+                                  permit_port_forwarding=True,
+                                  permit_pty=True, permit_user_rc=True):
+        """Generate a new SSH user certificate
+
+           This method returns a SSH user certifcate with the requested
+           attributes signed by this private key.
+
+           :param user_key:
+               The user's public key.
+           :param str key_id:
+               The key identifier associated with this certificate.
+           :param int version: (optional)
+               The version of certificate to create, defaulting to 1.
+           :param int serial: (optional)
+               The serial number of the certificate, defaulting to 0.
+           :param principals: (optional)
+               The user names this certificate is valid for. By default,
+               it can be used with any user name.
+           :param valid_after: (optional)
+               The earliest time the certificate is valid for, defaulting to
+               no restriction on when the certificate starts being valid.
+               See :ref:`SpecifyingTimeValues` for allowed time specifications.
+           :param valid_before: (optional)
+               The latest time the certificate is valid for, defaulting to
+               no restriction on when the certificate stops being valid.
+               See :ref:`SpecifyingTimeValues` for allowed time specifications.
+           :param force_command: (optional)
+               The command (if any) to force a session to run when this
+               certificate is used.
+           :param source_address: (optional)
+               A list of source addresses and networks for which the
+               certificate is valid, defaulting to all addresses.
+           :param bool permit_x11_forwarding: (optional)
+               Whether or not to allow this user to use X11 forwarding,
+               defaulting to ``True``.
+           :param bool permit_agent_forwarding: (optional)
+               Whether or not to allow this user to use agent forwarding,
+               defaulting to ``True``.
+           :param bool permit_port_forwarding: (optional)
+               Whether or not to allow this user to use port forwarding,
+               defaulting to ``True``.
+           :param bool permit_pty: (optional)
+               Whether or not to allow this user to allocate a
+               pseudo-terminal, defaulting to ``True``.
+           :param bool permit_user_rc: (optional)
+               Whether or not to run the user rc file when this certificate
+               is used, defaulting to ``True``.
+           :type user_key: :class:`SSHKey`
+           :type principals: list of strings
+           :type force_command: `str` or ``None``
+           :type source_address: list of ip_address and ip_network values
+
+           :returns: :class:`SSHCertificate`
+
+           :raises: | :exc:`ValueError` if the validity times are invalid
+                    | :exc:`KeyGenerationError` if the requested certificate
+                      parameters are unsupported
+
+        """
+
+        cert_options = {}
+
+        if force_command:
+            cert_options['force-command'] = force_command
+
+        if source_address:
+            cert_options['source-address'] = [ip_network(addr)
+                                              for addr in source_address]
+
+        if permit_x11_forwarding:
+            cert_options['permit-X11-forwarding'] = True
+
+        if permit_agent_forwarding:
+            cert_options['permit-agent-forwarding'] = True
+
+        if permit_port_forwarding:
+            cert_options['permit-port-forwarding'] = True
+
+        if permit_pty:
+            cert_options['permit-pty'] = True
+
+        if permit_user_rc:
+            cert_options['permit-user-rc'] = True
+
+        return self._generate_certificate(user_key, version, serial,
+                                          CERT_TYPE_USER, key_id,
+                                          principals, valid_after,
+                                          valid_before, cert_options)
+
+    def generate_host_certificate(self, host_key, key_id, version=1,
+                                  serial=0, principals=(), valid_after=0,
+                                  valid_before=0xffffffffffffffff):
+        """Generate a new SSH host certificate
+
+           This method returns a SSH host certifcate with the requested
+           attributes signed by this private key.
+
+           :param host_key:
+               The host's public key.
+           :param str key_id:
+               The key identifier associated with this certificate.
+           :param int version: (optional)
+               The version of certificate to create, defaulting to 1.
+           :param int serial: (optional)
+               The serial number of the certificate, defaulting to 0.
+           :param principals: (optional)
+               The host names this certificate is valid for. By default,
+               it can be used with any host name.
+           :param valid_after: (optional)
+               The earliest time the certificate is valid for, defaulting to
+               no restriction on when the certificate starts being valid.
+               See :ref:`SpecifyingTimeValues` for allowed time specifications.
+           :param valid_before: (optional)
+               The latest time the certificate is valid for, defaulting to
+               no restriction on when the certificate stops being valid.
+               See :ref:`SpecifyingTimeValues` for allowed time specifications.
+           :type host_key: :class:`SSHKey`
+           :type principals: list of strings
+
+           :returns: :class:`SSHCertificate`
+
+           :raises: | :exc:`ValueError` if the validity times are invalid
+                    | :exc:`KeyGenerationError` if the requested certificate
+                      parameters are unsupported
+        """
+
+        return self._generate_certificate(host_key, version, serial,
+                                          CERT_TYPE_HOST, key_id,
+                                          principals, valid_after,
+                                          valid_before, {})
+
     def export_private_key(self, format_name, passphrase=None,
                            cipher_name='aes256-cbc', hash_name='sha256',
                            pbe_version=2, rounds=16):
         """Export a private key in the requested format
 
-           This function returns this object's private key encoded in the
+           This method returns this object's private key encoded in the
            requested format. If a passphrase is specified, the key will
            be exported in encrypted form.
 
@@ -299,7 +507,7 @@ class SSHKey:
     def export_public_key(self, format_name):
         """Export a public key in the requested format
 
-           This function returns this object's public key encoded in the
+           This method returns this object's public key encoded in the
            requested format. Available formats include:
 
                pkcs1-der, pkcs1-pem, pkcs8-der, pkcs8-pem, openssh, rfc4716
@@ -346,14 +554,14 @@ class SSHKey:
     def write_private_key(self, filename, *args, **kwargs):
         """Write a private key to a file in the requested format
 
-           This function is a simple wrapper around export_private_key
+           This method is a simple wrapper around :meth:`export_private_key`
            which writes the exported key data to a file.
 
            :param str filename:
                The filename to write the private key to.
            :param \\*args,\\ \\*\\*kwargs:
                Additional arguments to pass through to
-               :func:`export_private_key`.
+               :meth:`export_private_key`.
 
         """
 
@@ -363,14 +571,14 @@ class SSHKey:
     def write_public_key(self, filename, *args, **kwargs):
         """Write a public key to a file in the requested format
 
-           This function is a simple wrapper around export_public_key
+           This method is a simple wrapper around :meth:`export_public_key`
            which writes the exported key data to a file.
 
            :param str filename:
                The filename to write the public key to.
            :param \\*args,\\ \\*\\*kwargs:
                Additional arguments to pass through to
-               :func:`export_public_key`.
+               :meth:`export_public_key`.
 
         """
 
@@ -381,61 +589,182 @@ class SSHKey:
 class SSHCertificate:
     """Parent class which holds an SSH certificate"""
 
-    # pylint: disable=bad-whitespace
+    _user_option_encoders = []
+    _user_extension_encoders = []
+    _host_option_encoders = []
+    _host_extension_encoders = []
 
-    _user_critical_options = {}
-    _user_extensions =       {}
-    _host_critical_options = {}
-    _host_extensions =       {}
+    _user_option_decoders = {}
+    _user_extension_decoders = {}
+    _host_option_decoders = {}
+    _host_extension_decoders = {}
 
-    # pylint: enable=bad-whitespace
+    def __init__(self, algorithm, key, data, principals, options, signing_key,
+                 serial, cert_type, key_id, valid_after, valid_before):
+        self.algorithm = algorithm
+        self.key = key
+        self.data = data
+        self.principals = principals
+        self.options = options
+        self.signing_key = signing_key
+
+        self._serial = serial
+        self._cert_type = cert_type
+        self._key_id = key_id
+        self._valid_after = valid_after
+        self._valid_before = valid_before
+
+    @classmethod
+    def generate(cls, signing_key, algorithm, key, serial, cert_type, key_id,
+                 principals, valid_after, valid_before, options):
+        """Generate a new SSH certificate"""
+
+        principals = list(principals)
+        valid_after = _parse_time(valid_after)
+        valid_before = _parse_time(valid_before)
+
+        if valid_before <= valid_after:
+            raise ValueError('Valid before time must be later than '
+                             'valid after time')
+
+        cert_principals = b''.join(String(p) for p in principals)
+
+        if cert_type == CERT_TYPE_USER:
+            cert_options = cls._encode_options(options,
+                                               cls._user_option_encoders)
+            cert_extensions = cls._encode_options(options,
+                                                  cls._user_extension_encoders)
+        else:
+            cert_options = cls._encode_options(options,
+                                               cls._host_option_encoders)
+            cert_extensions = cls._encode_options(options,
+                                                  cls._host_extension_encoders)
+
+        data = b''.join((String(algorithm),
+                         cls._encode(key, serial, cert_type, key_id,
+                                     cert_principals, valid_after,
+                                     valid_before, cert_options,
+                                     cert_extensions),
+                         String(signing_key.get_ssh_public_key())))
+
+        data += String(signing_key.sign(data))
+
+        key = key.convert_to_public()
+        signing_key = signing_key.convert_to_public()
+
+        return cls(algorithm, key, data, principals, options, signing_key,
+                   serial, cert_type, key_id, valid_after, valid_before)
+
+    @classmethod
+    def construct(cls, packet, algorithm, key_handler):
+        """Construct an SSH certificate"""
+
+        key_params, serial, cert_type, key_id, \
+            principals, valid_after, valid_before, \
+            options, extensions = cls._decode(packet, key_handler)
 
-    def __init__(self, packet, algorithm, key_handler, key_params, serial,
-                 cert_type, key_id, valid_principals, valid_after,
-                 valid_before, options, extensions):
         signing_key = decode_ssh_public_key(packet.get_string())
-        msg = packet.get_consumed_payload()
+        data = packet.get_consumed_payload()
         signature = packet.get_string()
         packet.check_end()
 
-        if not signing_key.verify(msg, signature):
+        if not signing_key.verify(data, signature):
             raise KeyImportError('Invalid certificate signature')
 
-        self.algorithm = algorithm
-        self.key = key_handler.make_public(*key_params)
-        self.data = packet.get_consumed_payload()
-        self.options = {}
-        self.principals = []
-        self.signing_key = signing_key
+        key = key_handler.make_public(*key_params)
+        data = packet.get_consumed_payload()
 
-        self._serial = serial
-        self._cert_type = cert_type
-        self._key_id = key_id
+        try:
+            key_id = key_id.decode('utf-8')
+        except UnicodeDecodeError:
+            raise KeyImportError('Invalid characters in key ID')
+
+        packet = SSHPacket(principals)
+        principals = []
 
-        packet = SSHPacket(valid_principals)
         while packet:
             try:
                 principal = packet.get_string().decode('utf-8')
             except UnicodeDecodeError:
                 raise KeyImportError('Invalid characters in principal name')
 
-            self.principals.append(principal)
-
-        self._valid_after = valid_after
-        self._valid_before = valid_before
+            principals.append(principal)
 
         if cert_type == CERT_TYPE_USER:
-            self._decode_options(options, self._user_critical_options, True)
-            self._decode_options(extensions, self._user_extensions, False)
+            options = cls._decode_options(options, cls._user_option_decoders,
+                                          True)
+            options.update(cls._decode_options(extensions,
+                                               cls._user_extension_decoders,
+                                               False))
         elif cert_type == CERT_TYPE_HOST:
-            self._decode_options(options, self._host_critical_options, True)
-            self._decode_options(extensions, self._host_extensions, False)
+            options = cls._decode_options(options, cls._host_option_decoders,
+                                          True)
+            options.update(cls._decode_options(extensions,
+                                               cls._host_extension_decoders,
+                                               False))
         else:
             raise KeyImportError('Unknown certificate type')
 
+        return cls(algorithm, key, data, principals, options, signing_key,
+                   serial, cert_type, key_id, valid_after, valid_before)
+
+    @classmethod
+    def _encode(cls, key, serial, cert_type, key_id, principals,
+                valid_after, valid_before, options, extensions):
+        """Encode an SSH certificate"""
+
+        raise NotImplementedError
+
+    @classmethod
+    def _decode(cls, packet, key_handler):
+        """Decode an SSH certificate"""
+
+        raise NotImplementedError
+
     @staticmethod
-    def _parse_force_command(packet):
-        """Parse a force-command option"""
+    def _encode_options(options, encoders):
+        """Encode options found in this certificate"""
+
+        result = []
+
+        for name, encoder in encoders:
+            value = options.get(name)
+            if value:
+                result.append(String(name) + String(encoder(value)))
+
+        return b''.join(result)
+
+    @staticmethod
+    def _encode_bool(value):
+        """Encode a boolean option value"""
+
+        # pylint: disable=unused-argument
+
+        return b''
+
+    @staticmethod
+    def _encode_force_command(force_command):
+        """Encode a force-command option"""
+
+        return String(force_command)
+
+    @staticmethod
+    def _encode_source_address(source_address):
+        """Encode a source-address option"""
+
+        return NameList(str(addr).encode('ascii') for addr in source_address)
+
+    @staticmethod
+    def _decode_bool(packet):
+        """Decode a boolean option value"""
+
+        # pylint: disable=unused-argument
+
+        return True
+
+    @staticmethod
+    def _decode_force_command(packet):
+        """Decode a force-command option"""
 
         try:
             return packet.get_string().decode('utf-8')
@@ -443,8 +772,8 @@ class SSHCertificate:
             raise KeyImportError('Invalid characters in command') from None
 
     @staticmethod
-    def _parse_source_address(packet):
-        """Parse a source-address option"""
+    def _decode_source_address(packet):
+        """Decode a source-address option"""
 
         try:
             return [ip_network(addr.decode('ascii'))
@@ -452,23 +781,66 @@ class SSHCertificate:
         except (UnicodeDecodeError, ValueError):
             raise KeyImportError('Invalid source address') from None
 
-    def _decode_options(self, options, valid_options, critical=True):
+    @staticmethod
+    def _decode_options(options, decoders, critical=True):
         """Decode options found in this certificate"""
 
         packet = SSHPacket(options)
+        result = {}
+
         while packet:
             name = packet.get_string()
-            data_packet = SSHPacket(packet.get_string())
 
-            decoder = valid_options.get(name)
+            decoder = decoders.get(name)
             if decoder:
-                data = decoder(data_packet) if callable(decoder) else True
+                data_packet = SSHPacket(packet.get_string())
+                result[name.decode('ascii')] = decoder(data_packet)
                 data_packet.check_end()
-                self.options[name.decode('ascii')] = data
             elif critical:
                 raise KeyImportError('Unrecognized critical option: %s' %
                                      name.decode('ascii', errors='replace'))
 
+        return result
+
+    def export_certificate(self, format_name):
+        """Export a certificate in the requested format
+
+           This function returns this certificate encoded in the requested
+           format. Available formats include:
+
+               openssh, rfc4716
+
+           :param str format:
+               The format to export the certificate in.
+
+        """
+
+        if format_name == 'openssh':
+            return self.algorithm + b' ' + binascii.b2a_base64(self.data)
+        elif format_name == 'rfc4716':
+            return (b'---- BEGIN SSH2 PUBLIC KEY ----\n' +
+                    _wrap_base64(self.data) +
+                    b'---- END SSH2 PUBLIC KEY ----\n')
+        else:
+            raise KeyExportError('Unknown export format')
+
+    def write_certificate(self, filename, *args, **kwargs):
+        """Write a certificate to a file in the requested format
+
+           This function is a simple wrapper around export_certificate
+           which writes the exported certificate to a file.
+
+           :param str filename:
+               The filename to write the certificate to.
+           :param \\*args,\\ \\*\\*kwargs:
+               Additional arguments to pass through to
+               :meth:`export_certificate`.
+
+        """
+
+        with open(filename, 'wb') as f:
+            f.write(self.export_certificate(*args, **kwargs))
+
     def validate(self, cert_type, principal):
         """Validate the certificate type, validity period, and principal
 
@@ -502,37 +874,67 @@ class SSHCertificate:
 
 
 class SSHCertificateV01(SSHCertificate):
-    """Decoder class for version 01 SSH certificates"""
+    """Encoder/decoder class for version 01 SSH certificates"""
+
+    # pylint: disable=bad-whitespace
 
-    _user_critical_options = {
-        b'force-command':           SSHCertificate._parse_force_command,
-        b'source-address':          SSHCertificate._parse_source_address
+    _user_option_encoders = (
+        ('force-command',           SSHCertificate._encode_force_command),
+        ('source-address',          SSHCertificate._encode_source_address)
+    )
+
+    _user_extension_encoders = (
+        ('permit-X11-forwarding',   SSHCertificate._encode_bool),
+        ('permit-agent-forwarding', SSHCertificate._encode_bool),
+        ('permit-port-forwarding',  SSHCertificate._encode_bool),
+        ('permit-pty',              SSHCertificate._encode_bool),
+        ('permit-user-rc',          SSHCertificate._encode_bool)
+    )
+
+    _user_option_decoders = {
+        b'force-command':           SSHCertificate._decode_force_command,
+        b'source-address':          SSHCertificate._decode_source_address
     }
 
-    _user_extensions = {
-        b'permit-X11-forwarding':   True,
-        b'permit-agent-forwarding': True,
-        b'permit-port-forwarding':  True,
-        b'permit-pty':              True,
-        b'permit-user-rc':          True
+    _user_extension_decoders = {
+        b'permit-X11-forwarding':   SSHCertificate._decode_bool,
+        b'permit-agent-forwarding': SSHCertificate._decode_bool,
+        b'permit-port-forwarding':  SSHCertificate._decode_bool,
+        b'permit-pty':              SSHCertificate._decode_bool,
+        b'permit-user-rc':          SSHCertificate._decode_bool
     }
 
-    def __init__(self, packet, algorithm, key_handler):
+    # pylint: enable=bad-whitespace
+
+    @classmethod
+    def _encode(cls, key, serial, cert_type, key_id, principals,
+                valid_after, valid_before, options, extensions):
+        """Encode a version 01 SSH certificate"""
+
+        return b''.join((String(os.urandom(32)), key.encode_ssh_public(),
+                         UInt64(serial), UInt32(cert_type), String(key_id),
+                         String(principals), UInt64(valid_after),
+                         UInt64(valid_before), String(options),
+                         String(extensions), String('')))
+
+    @classmethod
+    def _decode(cls, packet, key_handler):
+        """Decode a version 01 SSH certificate"""
+
         _ = packet.get_string()                             # nonce
         key_params = key_handler.decode_ssh_public(packet)
         serial = packet.get_uint64()
         cert_type = packet.get_uint32()
         key_id = packet.get_string()
-        valid_principals = packet.get_string()
+        principals = packet.get_string()
         valid_after = packet.get_uint64()
         valid_before = packet.get_uint64()
         options = packet.get_string()
         extensions = packet.get_string()
         _ = packet.get_string()                             # reserved
 
-        super().__init__(packet, algorithm, key_handler, key_params, serial,
-                         cert_type, key_id, valid_principals, valid_after,
-                         valid_before, options, extensions)
+        return (key_params, serial, cert_type, key_id, principals,
+                valid_after, valid_before, options, extensions)
 
 
 class SSHKeyPair:
@@ -1021,12 +1423,14 @@ def register_public_key_alg(algorithm, handler):
         _pkcs8_oid_map[handler.pkcs8_oid] = handler
 
 
-def register_certificate_alg(algorithm, key_handler, cert_handler):
+def register_certificate_alg(version, algorithm, key_handler, cert_handler):
     """Register a new certificate algorithm"""
 
     _certificate_alg_map[algorithm] = (key_handler, cert_handler)
     _certificate_algs.append(algorithm)
 
+    _certificate_version_map[key_handler, version] = (algorithm, cert_handler)
+
 
 def get_public_key_algs():
     """Return supported public key algorithms"""
@@ -1071,7 +1475,7 @@ def decode_ssh_certificate(data):
         key_handler, cert_handler = _certificate_alg_map.get(alg, (None, None))
 
         if cert_handler:
-            return cert_handler(packet, alg, key_handler)
+            return cert_handler.construct(packet, alg, key_handler)
         else:
             raise KeyImportError('Unknown certificate algorithm: %s' %
                                  alg.decode('ascii', errors='replace'))
@@ -1114,6 +1518,8 @@ def generate_private_key(alg_name, **kwargs):
 
        :returns: An :class:`SSHKey` private key
 
+       :raises: :exc:`KeyGenerationError` if the requested key parameters
+                are unsupported
     """
 
     algorithm = alg_name.encode('utf-8')
diff --git a/asyncssh/rsa.py b/asyncssh/rsa.py
index 9f72413..a125e44 100644
--- a/asyncssh/rsa.py
+++ b/asyncssh/rsa.py
@@ -204,5 +204,5 @@ class _RSAKey(SSHKey):
 
 register_public_key_alg(b'ssh-rsa', _RSAKey)
 
-register_certificate_alg(b'ssh-rsa-cert-v01 at openssh.com',
+register_certificate_alg(1, b'ssh-rsa-cert-v01 at openssh.com',
                          _RSAKey, SSHCertificateV01)
diff --git a/asyncssh/version.py b/asyncssh/version.py
index 3647dd1..a1e3cc9 100644
--- a/asyncssh/version.py
+++ b/asyncssh/version.py
@@ -18,4 +18,4 @@ __author_email__ = 'ronf at timeheart.net'
 
 __url__ = 'http://asyncssh.timeheart.net'
 
-__version__ = '1.6.1'
+__version__ = '1.6.2'
diff --git a/docs/api.rst b/docs/api.rst
index 010d694..6314c0d 100644
--- a/docs/api.rst
+++ b/docs/api.rst
@@ -1021,17 +1021,45 @@ to import it from, or an already loaded :class:`SSHKey` public key.
 See the function :func:`import_public_key` for the list of supported
 public key formats.
 
+.. index:: Specifying time values
+.. _SpecifyingTimeValues:
+
+Specifying time values
+----------------------
+
+When generating certificates, an optional validity interval can be
+specified using the ``valid_after`` and ``valid_before`` parameters
+to the :meth:`generate_user_certificate() <SSHKey.generate_user_certificate>`
+and :meth:`generate_host_certificate() <SSHKey.generate_host_certificate>`
+methods. These values can be specified in any of the following ways:
+
+    * An int or float UNIX epoch time, such as what is returned by
+      :func:`time.time`.
+    * A :class:`datetime.datetime` value.
+    * A string value of ``now`` to request the current time.
+    * A string value in the form ``YYYYMMDD`` to specify an absolute date.
+    * A string value in the form ``YYYYMMDDHHMMSS`` to specify an
+      absolute date and time.
+    * A relative time made up of a mix of positive or negative numbers and
+      the letters 'w', 'd', 'h', 'm', and 's', representing an offset from
+      the current time in weeks, days, hours, minutes, or seconds,
+      respectively. Multiple of these values can be included. For
+      instance, '+1w2d3h' means 1 week, 2 days, and 3 hours in the future.
+
 SSHKey
 ------
 
 .. autoclass:: SSHKey()
 
-   ================================== =
+   ========================================= =
+   .. automethod:: convert_to_public
+   .. automethod:: generate_user_certificate
+   .. automethod:: generate_host_certificate
    .. automethod:: export_private_key
    .. automethod:: export_public_key
    .. automethod:: write_private_key
    .. automethod:: write_public_key
-   ================================== =
+   ========================================= =
 
 SSHKeyPair
 ----------
@@ -1045,7 +1073,11 @@ SSHCertificate
 
 .. autoclass:: SSHCertificate()
 
+   ================================== =
+   .. automethod:: export_certificate
+   .. automethod:: write_certificate
    .. automethod:: validate
+   ================================== =
 
 generate_private_key
 --------------------
@@ -1272,6 +1304,11 @@ KeyEncryptionError
 
 .. autoexception:: KeyEncryptionError
 
+KeyGenerationError
+------------------
+
+.. autoexception:: KeyGenerationError
+
 .. index:: Supported algorithms
 .. _SupportedAlgorithms:
 
diff --git a/docs/changes.rst b/docs/changes.rst
index 9e9b1e5..d18e943 100644
--- a/docs/changes.rst
+++ b/docs/changes.rst
@@ -3,6 +3,18 @@
 Change Log
 ==========
 
+Release 1.6.2 (4 Sep 2016)
+--------------------------
+
+* Added generate_user_certificate() and generate_host_certificate() methods
+  to SSHKey class to generate SSH certificates, and export_certificate()
+  and write_certificate() methods on SSHCertificate class to export
+  certificates for use in other tools.
+
+* Improved editor unit tests to eliminate timing dependency.
+
+* Cleaned up a few minor documentation issues.
+
 Release 1.6.1 (27 Aug 2016)
 ---------------------------
 
diff --git a/tests/test_editor.py b/tests/test_editor.py
index 8e0ccc8..fbd770c 100644
--- a/tests/test_editor.py
+++ b/tests/test_editor.py
@@ -34,6 +34,7 @@ def _handle_session(stdin, stdout, stderr):
             data += yield from stdin.readline()
         except asyncssh.BreakReceived:
             break_count += 1
+            stdout.write('B')
 
             if break_count == 1:
                 # Set twice to get coverage of when echo isn't changing
@@ -170,7 +171,7 @@ class _TestEditor(_CheckEditor):
             process = yield from conn.create_process(term_type='ansi')
 
             process.stdin.write('\x03')
-            yield from asyncio.sleep(0.1)
+            yield from process.stdout.readexactly(1)
 
             process.stdin.write('abcd\x08\n')
             process.stdin.write_eof()
@@ -186,12 +187,12 @@ class _TestEditor(_CheckEditor):
             process = yield from conn.create_process(term_type='ansi')
 
             process.stdin.write('\x03')
-            yield from asyncio.sleep(0.1)
+            yield from process.stdout.readexactly(1)
 
             process.stdin.write('abc')
 
             process.stdin.write('\x03')
-            yield from asyncio.sleep(0.1)
+            yield from process.stdout.readexactly(1)
 
             process.stdin.write('\n')
             process.stdin.write_eof()
@@ -207,10 +208,10 @@ class _TestEditor(_CheckEditor):
             process = yield from conn.create_process(term_type='ansi')
 
             process.stdin.write('\x03\x03')
-            yield from asyncio.sleep(0.1)
+            yield from process.stdout.readexactly(2)
 
             process.stdin.write('abc\x03')
-            yield from asyncio.sleep(0.1)
+            yield from process.stdout.readexactly(15)
 
             process.stdin.write('\n')
             process.stdin.write_eof()
diff --git a/tests/test_public_key.py b/tests/test_public_key.py
index 2cdc2e3..f5be853 100644
--- a/tests/test_public_key.py
+++ b/tests/test_public_key.py
@@ -18,6 +18,7 @@
 """
 
 import binascii
+from datetime import datetime
 import os
 
 from .util import bcrypt_available, libnacl_available
@@ -35,7 +36,6 @@ from asyncssh.asn1 import TaggedDERObject
 from asyncssh.packet import MPInt, String, UInt32
 from asyncssh.pbe import pkcs1_decrypt
 from asyncssh.public_key import CERT_TYPE_USER, CERT_TYPE_HOST, SSHKey
-from asyncssh.public_key import decode_ssh_public_key
 from asyncssh.public_key import get_public_key_algs, get_certificate_algs
 
 
@@ -876,6 +876,11 @@ class _TestPublicKey(TempDirTestCase):
             cert = read_certificate('cert')
             self.assertEqual(cert.key, self.pubkey)
 
+        with self.subTest('Export certificate'):
+            cert.write_certificate('cert2', fmt)
+            cert2 = read_certificate('cert2')
+            self.assertEqual(cert.key, cert2.key)
+
         with self.subTest('Validate certificate'):
             self.assertIsNone(cert.validate(cert_type, 'name'))
 
@@ -908,23 +913,30 @@ class _TestPublicKey(TempDirTestCase):
         with self.subTest('Invalid certificate critical option'):
             with self.assertRaises(KeyImportError):
                 cert = self.make_certificate(cert_type, self.pubkey,
-                                             self.privca, 'name',
+                                             self.privca, ('name',),
                                              options={b'xxx': b''})
                 import_certificate(cert)
 
         with self.subTest('Ignored certificate extension'):
             cert = self.make_certificate(cert_type, self.pubkey,
-                                         self.privca, 'name',
+                                         self.privca, ('name',),
... 153 lines suppressed ...

-- 
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/python-modules/packages/python-asyncssh.git



More information about the Python-modules-commits mailing list