[Pkg-freeipa-devel] [Git][freeipa-team/python-jwcrypto][upstream] 74 commits: Add P-256K alias for secp256k1
Timo Aaltonen (@tjaalton)
gitlab at salsa.debian.org
Thu Feb 15 08:36:09 GMT 2024
Timo Aaltonen pushed to branch upstream at FreeIPA packaging / python-jwcrypto
Commits:
3524c722 by Simo Sorce at 2021-12-02T12:36:44-05:00
Add P-256K alias for secp256k1
Resolves #241
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
8613bae0 by Simo Sorce at 2021-12-02T12:36:44-05:00
Refactor how EC curves are fetched
Deprecates the get_curve() function which shouldn't really be exposed
to users as it is an internal detail.
Change tests and jwa.py to stop using get_curve()
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
172af589 by Simo Sorce at 2021-12-03T16:29:48-05:00
Add a sphinx doctest target
Reformat example code as necessary to make it pass tests.
Specifically some of the randomly generated output has been replaced
with '...' which is the only way to make doctest match/skip a part of
the ouput while still running the command for testing.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
48f62347 by Simo Sorce at 2022-01-07T15:09:22-05:00
Add way to verify detached payloads
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
c6dcae9e by Justin Frahm at 2022-04-13T14:40:53-04:00
Handle multiple keys with same kid
Signed-off-by: Justin Frahm <justin.frahm at maxar.com>
- - - - -
38b427c3 by Simo Sorce at 2022-04-26T18:48:14-04:00
Fix test to use correct parameters
Otherwise false negatives may happen
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
d3f6226b by Simo Sorce at 2022-04-26T18:51:15-04:00
Version 1.2
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
d83b316b by Simo Sorce at 2022-04-28T14:13:45-04:00
Add auto-publishing to pypi on release
This triggers when a release tag is pushed
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
7a181049 by Simo Sorce at 2022-04-28T14:30:52-04:00
Source README.md in setup to be used for PyPI
When Release scripts publish to PyPI they will set the README.md as
the long decription there.
- - - - -
fd2f49e0 by Simo Sorce at 2022-05-10T09:03:02-04:00
Allow passing p2s/p2c params on encrypting
This was not really obvious but clearly it should be possible to pass in
parameters (p2s/p2c) in input when using PBES, and set defaults if they
are not provided.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
7dcc162b by Simo Sorce at 2022-05-10T09:03:02-04:00
Fix error message
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
5dc1aee6 by Simo Sorce at 2022-05-10T09:03:28-04:00
Allow audience claims to be arrays
This means that if a server checking a token provides an array as the
check_claims argument it intends that the server is handling multiple
identities and the token checks will result valid as long as one of the
audiences matches one of the identities the server impersonates.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
8f2ddb7f by Simo Sorce at 2022-05-11T10:59:51-04:00
Test that JWS argument is not modified
See Issue #281
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
4d375fc6 by Simo Sorce at 2022-05-11T10:59:51-04:00
JWS: Ensure arguments are not modified
In some cases a dictionary passsed in to JWS would be stored directly as
a reference and later modified due to internal header processing.
Ensure all disctionaries are properly copied, generally by simply
re-encoding them on assignment.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
a33cbb7d by Simo Sorce at 2022-05-11T10:59:51-04:00
JWT: Ensure claims setter does not modify argument
The processing of setting claims modifies the passed in dictionary if
there are deveult_claims set on the token. Always use a copy of the
dictionary to avoid changes to the passed in argument.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
9b3b4aa1 by Simo Sorce at 2022-05-11T15:29:23-04:00
Add class method to deserialize JWE token
One shot api to get a JWE token from a serialized json token
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
1a0d24dd by Simo Sorce at 2022-05-11T15:29:23-04:00
Add __str__ and __repr_ for JWE
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
5654d4a1 by Simo Sorce at 2022-05-11T15:29:23-04:00
Add class method to deserialize JWS token
One shot api to get a JWS token from a serialized json token
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
4d618002 by Simo Sorce at 2022-05-11T15:29:23-04:00
Add __str__ and __repr_ for JWS
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
481be4b1 by Simo Sorce at 2022-05-11T15:29:23-04:00
Add class method to deserialize JWT token
One shot api to get a JWT token from a serialized json token
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
56e3642a by Simo Sorce at 2022-05-11T15:29:23-04:00
Add __str__ and __repr_ for JWT
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
9f72813c by Simo Sorce at 2022-05-11T15:29:23-04:00
Add __eq__ function for JWS, JWE, JWT
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
bc1fd83e by Simo Sorce at 2022-05-11T15:29:23-04:00
Add tests for overloaded operators
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
737361f1 by Simo Sorce at 2022-05-11T17:40:09-04:00
JWT: Add validate() method
This allows callers to deserialize() without a key and later validate
the parsed token without having to deserialize() again from scratch.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
1b834581 by Simo Sorce at 2022-05-11T18:34:41-04:00
Add docstrings for return types of public methods
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
81797b9e by Simo Sorce at 2022-05-11T18:56:21-04:00
Add ReadTheDocs badge
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
c26c4a21 by Simo Sorce at 2022-05-11T22:30:59-04:00
JWE: Allow JWKSet as key for decryption
Adds support to use a JWKSet directly in JWE so that callers do not need
to loop on their own.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
e795a137 by Simo Sorce at 2022-05-11T22:30:59-04:00
JWS: Allow JWKSet as key for verification
Adds support to use a JWKSet directly in JWS so that callers do not need
to loop on their own.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
659fe723 by Simo Sorce at 2022-05-11T22:30:59-04:00
JWT: Move JWKSet processing to JWE,JWS
Instead of handling key sets in the JWT, push the work down into JWS and
JWE tokens now that they directly support it.
This has the side effect that the JWTMissingKey error cannot be easily
reported anymore, so deprecate it and stop using it.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
aad7da1f by Simo Sorce at 2022-05-12T15:07:53-04:00
Move version information into a single place
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
bd2a741b by Simo Sorce at 2022-05-12T15:09:41-04:00
Version 1.3
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
f2941876 by Simo Sorce at 2022-05-12T23:08:07-04:00
Add more nitpicking sphinx testing
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
793fb433 by Simo Sorce at 2022-05-12T23:08:07-04:00
Add documentation for jwcrypto.common
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
1e5c385a by Simo Sorce at 2022-05-12T23:08:07-04:00
Fix sphinx nitpicked errors
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
60fc7ee1 by Simo Sorce at 2022-05-13T13:11:02-04:00
Disable HW optimizations on ppc64le
Apparently since openssl 3.0.2 some assembly HW optimizations are
triggeringering QEMU emulation errors. Disabling accelaration at
runtime seem to make tests pass again.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
997b900d by Simo Sorce at 2022-05-21T11:37:25-04:00
Introduce a new JWKeyNotFound exception
This new Exception is returned only for the newly introduced support for
using JWKset.
This patch also includes a bugfix for jwe to be able to successfully
decrypt using a JWKSet, which was non-functional, and a direct test for
both JWE and JWS to insure no regressions in JWKSet support.
Also restores use of JWTMissingKey for backwards compatibility.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
e1b4c36e by Simo Sorce at 2022-05-21T11:39:08-04:00
Version 1.3.1
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
e5c1e421 by Christian Clauss at 2022-08-11T06:23:17-04:00
Upgrade GitHub Actions
- - - - -
5a13cfc6 by Simo Sorce at 2022-08-13T10:02:57-04:00
Add support for RFC 9278: JWK Thumbprint URI
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
f4e912f8 by Simo Sorce at 2022-09-13T12:14:23-04:00
Make JWT require to know what to expect
This is needed to address CVE-2022-3102.
Thanks to Tom tervoort from Secura for finding and reporting this issue.
Also test that "unepxected" token types are not validated
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
34b65252 by Simo Sorce at 2022-09-13T12:14:23-04:00
Add global workaround for applications
Because the previous patch changes the behavoir of jwcrypto, this knob
is a quick way for application developers to get back the old behavior
temporarily without having to change the code immediately as it may
require some significant refactoring, depending on how the application
was written.
This is not intended to be used in the long term and will be eventually
deleted. Unfortunately I cannot decorate a simply global variable with
the @deprecated decoration to make it clearer.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
84f121f5 by Simo Sorce at 2022-09-13T12:15:49-04:00
Version 1.4
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
c4e0beee by Simo Sorce at 2022-09-14T18:16:14-04:00
Add more heuristics for backwards JWT compat
These additional heuristics help in case the calling application was
correctly calssifying key usage, as this is another valid hitn of what
the application intended.
Invalid key usage would already cause failure, so this does not affect
the countermeasures introduced but can avoid issues in older
applications.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
54618cc2 by Simo Sorce at 2022-09-14T18:17:32-04:00
Version 1.4.1
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
69954476 by Jan Christian Grünhage at 2022-09-15T08:55:51-04:00
Fix typo in new backwards JWT compat heuristics
- - - - -
a7b2136b by Simo Sorce at 2022-09-15T08:55:51-04:00
Add tests for algos heuristics
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
83ab6cb4 by Simo Sorce at 2022-09-15T08:56:36-04:00
Version 1.4.2
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
3a8fb807 by Simo Sorce at 2022-09-28T13:29:21-04:00
Raising the bar for minimum pyca/cryptography
Fixes #305
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
28d71891 by Christian Clauss at 2022-10-04T12:45:56-04:00
Fix typos with codespell
- - - - -
14d1f81a by Simo Sorce at 2022-10-04T13:09:56-04:00
Add codespell checks in CI
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
fcdc7d76 by spilikin at 2022-10-10T16:35:40-04:00
Add Brainpool EC-curves support
This commit adds the support of Brainpool curves to jwcrypto. The Brainpool curves defined in RFC 5639 are mandatory for use in german e-health systems as defined by the Federal Office of Information Security (BSI) and National Digital Health Agency (gematik GmbH).
In order to use the public E-Health APIs clients are required to:
* Load and use the Brainpool keys using JWK
* Sign and verify the signatures using the Brainpool elliptic curves using JWS
* Encrypt and decrypt the data using the Brainpool elliptic curves and AES using JWE
At the time of this commit there is no official standardization of these algorithms for JOSE/JWK/JWS/JWE. The use of these algorithms is specified solely by the gematik GmbH – National Digital Health Agency - for use in german e-health applications.
Signed-off-by: Sergej Suskov <git at spilikin.dev>
- - - - -
652afd92 by Christoph Zwerschke at 2023-03-15T10:19:30-04:00
Fix error message
- - - - -
4f6cf303 by Simo Sorce at 2023-03-15T10:57:35-04:00
Python 3.6 is not available anymore in CI
With the configured Ubuntu image it is not available, and it is not
worth doing a lot of work for it as 3.6 is on the way out generally
anyway.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
0c31ee00 by Simo Sorce at 2023-03-15T10:57:35-04:00
Make linter happier about dummy exception
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
381c5d12 by Simo Sorce at 2023-03-15T10:57:35-04:00
Fix codespell issue
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
dc21a783 by Simo Sorce at 2023-03-15T10:57:35-04:00
Fix test to actually do what it should
This one was odd, glad CI picked it.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
9f2cd3ab by Simo Sorce at 2023-03-15T11:05:07-04:00
Fix CI to run on the correct branch for main
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
2fec7038 by Simo Sorce at 2023-05-30T13:14:22-04:00
Misc fixes and docstring corrections
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
e08cbf16 by Simo Sorce at 2023-05-30T13:14:22-04:00
Use separate input_keysize property
This allows to propery compute an octect key for algorithms like
A256CBC-HS512 ha sa different input keysize than the putput key size.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
db1f9f45 by Simo Sorce at 2023-05-30T13:14:22-04:00
Add test to generate key with algorithm
This uses an algorithm that has different input_keysize and output
keysize.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
41fb08a0 by Simo Sorce at 2023-05-30T13:43:41-04:00
Version 1.5
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
4c900198 by Amaury Chamayou at 2023-11-28T16:31:42-05:00
Fix X25519 import/export from PEM
Signed-off-by: Amaury Chamayou <amaury at xargs.fr>
- - - - -
ac40895d by Simo Sorce at 2023-11-28T16:46:19-05:00
Read the Docs now requires a config file
Let's give it one.
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
d64536b1 by peppelinux at 2023-12-07T09:16:13-05:00
chore: refactor for removing pdb symbols
- - - - -
6ee0e891 by Giuseppe De Marco at 2023-12-07T09:16:13-05:00
chore: arg renamed
- - - - -
d2655d37 by Simo Sorce at 2023-12-26T14:43:14-05:00
Fix potential DoS issue with p2c header
Unbounded p2c headers may be used to cause an application that accept
PBES algorithms to spend alot of resources running PBKDF2 with a very
high number of iterations.
Clamp the default maximum to 16384 (double the default of 8192).
An application that wants to use more iterations will have to chenge the
jwa default max.
Fixes CVE-2023-6681
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
066d13f2 by Simo Sorce at 2023-12-26T14:46:36-05:00
Update Security Policy
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
8ae0df65 by Simo Sorce at 2023-12-26T14:46:36-05:00
Version 1.5.1
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
70c0782d by David Diamant at 2024-01-08T12:29:27-05:00
replace deprecated package with typing_extensions
Signed-off-by: David Diamant <david at homelend.com>
- - - - -
6c61f42e by Simo Sorce at 2024-02-07T11:53:15-05:00
Version 1.5.2
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
c659e385 by Simo Sorce at 2024-02-07T15:13:41-05:00
Drop python 3.6 and 3.7 and add 3.11 support
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
a06b84a1 by Simo Sorce at 2024-02-07T15:13:41-05:00
Version 1.5.3
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
e7ef80f2 by Simo Sorce at 2024-02-13T10:50:12-05:00
Set a minimum version for typing_extensions
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
b9432ef4 by Simo Sorce at 2024-02-13T10:50:34-05:00
Version 1.5.4
Signed-off-by: Simo Sorce <simo at redhat.com>
- - - - -
28 changed files:
- .github/workflows/build.yml
- .github/workflows/codeql-analysis.yml
- .github/workflows/ppc64le.yml
- + .github/workflows/publish-to-pypi.yml
- .gitignore
- + .readthedocs.yaml
- MANIFEST.in
- Makefile
- README.md
- SECURITY.md
- + docs/source/common.rst
- docs/source/conf.py
- docs/source/index.rst
- docs/source/jwe.rst
- docs/source/jwk.rst
- docs/source/jws.rst
- docs/source/jwt.rst
- + jwcrypto/VERSION
- jwcrypto/common.py
- jwcrypto/jwa.py
- jwcrypto/jwe.py
- jwcrypto/jwk.py
- jwcrypto/jws.py
- jwcrypto/jwt.py
- jwcrypto/tests-cookbook.py
- jwcrypto/tests.py
- setup.py
- tox.ini
Changes:
=====================================
.github/workflows/build.yml
=====================================
@@ -2,7 +2,7 @@
"name": "Build",
"on": {
"push": {
- "branches": [ "master" ]
+ "branches": [ "main" ]
},
"pull_request": null,
},
@@ -14,29 +14,18 @@
"fail-fast": false,
"matrix": {
"name": [
- "python-36",
- "python-37",
"python-38",
"python-39",
"python-310",
+ "python-311",
"doc",
+ "doctest",
"sphinx",
"lint",
"pep8",
+ "codespell",
],
"include": [
- {
- "name": "python-36",
- "python": "3.6",
- "toxenv": "py36",
- "arch": "x64",
- },
- {
- "name": "python-37",
- "python": "3.7",
- "toxenv": "py37",
- "arch": "x64",
- },
{
"name": "python-38",
"python": "3.8",
@@ -55,37 +44,55 @@
"toxenv": "py310",
"arch": "x64",
},
+ {
+ "name": "python-311",
+ "python": "3.11",
+ "toxenv": "py311",
+ "arch": "x64",
+ },
{
"name": "doc",
- "python": "3.10",
+ "python": "3.11",
"toxenv": "doc",
"arch": "x64",
},
+ {
+ "name": "doctest",
+ "python": "3.11",
+ "toxenv": "doctest",
+ "arch": "x64",
+ },
{
"name": "sphinx",
- "python": "3.10",
+ "python": "3.11",
"toxenv": "sphinx",
"arch": "x64",
},
{
"name": "lint",
- "python": "3.10",
+ "python": "3.11",
"toxenv": "lint",
"arch": "x64",
},
{
"name": "pep8",
- "python": "3.10",
+ "python": "3.11",
"toxenv": "pep8",
"arch": "x64",
},
+ {
+ "name": "codespell",
+ "python": "3.11",
+ "toxenv": "codespell",
+ "arch": "x64",
+ },
],
},
},
"steps": [
- { "uses": "actions/checkout at v2" },
+ { "uses": "actions/checkout at v3" },
{
- "uses": "actions/setup-python at v2",
+ "uses": "actions/setup-python at v4",
"with": {
"python-version": "${{ matrix.python }}",
"architecture": "${{ matrix.arch }}"
=====================================
.github/workflows/codeql-analysis.yml
=====================================
@@ -13,10 +13,10 @@ name: "CodeQL"
on:
push:
- branches: [ master ]
+ branches: [ main ]
pull_request:
# The branches below must be a subset of the branches above
- branches: [ master ]
+ branches: [ main ]
paths-ignore:
- '**/*.md'
- '**/*.txt'
=====================================
.github/workflows/ppc64le.yml
=====================================
@@ -2,7 +2,7 @@
"name": "ppc64le CI",
"on": {
"push": {
- "branches": [ "master" ]
+ "branches": [ "main" ]
},
},
"jobs": {
=====================================
.github/workflows/publish-to-pypi.yml
=====================================
@@ -0,0 +1,34 @@
+{
+ "name": "Release to PyPI",
+ "on": {
+ "push": {
+ "tags": [ "v*.*" ]
+ },
+ },
+ "jobs": {
+ "pypi": {
+ "name": "Publish Release",
+ "runs-on": "ubuntu-latest",
+ "steps": [
+ { "uses": "actions/checkout at v2" },
+ {
+ "uses": "actions/setup-python at v2",
+ "with": {
+ "python-version": "3.10"
+ },
+ },
+ { "run": "sudo apt-get update" },
+ { "run": "sudo apt-get install cargo" },
+ { "run": "pip --version" },
+ { "run": "python setup.py sdist" },
+ {
+ "uses": "pypa/gh-action-pypi-publish at release/v1",
+ "with": {
+ "user": "__token__",
+ "password": "${{secrets.PYPI_API_TOKEN}}",
+ },
+ },
+ ],
+ },
+ },
+}
=====================================
.gitignore
=====================================
@@ -6,3 +6,4 @@ cscope.out
.tox
.coverage
*.egg-info
+env
=====================================
.readthedocs.yaml
=====================================
@@ -0,0 +1,35 @@
+# Read the Docs configuration file for Sphinx projects
+# See https://docs.readthedocs.io/en/stable/config-file/v2.html for details
+
+# Required
+version: 2
+
+# Set the OS, Python version and other tools you might need
+build:
+ os: ubuntu-22.04
+ tools:
+ python: "3.12"
+ # You can also specify other tool versions:
+ # nodejs: "20"
+ # rust: "1.70"
+ # golang: "1.20"
+
+# Build documentation in the "docs/" directory with Sphinx
+sphinx:
+ configuration: docs/source/conf.py
+ # You can configure Sphinx to use a different builder, for instance use the dirhtml builder for simpler URLs
+ # builder: "dirhtml"
+ # Fail on all warnings to avoid broken references
+ # fail_on_warning: true
+
+# Optionally build your docs in additional formats such as PDF and ePub
+formats:
+ - pdf
+ - epub
+
+# Optional but recommended, declare the Python requirements required
+# to build your documentation
+# See https://docs.readthedocs.io/en/stable/guides/reproducible-builds.html
+# python:
+# install:
+# - requirements: docs/requirements.txt
=====================================
MANIFEST.in
=====================================
@@ -1,2 +1,3 @@
include LICENSE README.md
include tox.ini setup.cfg
+include jwcrypto/VERSION
=====================================
Makefile
=====================================
@@ -20,17 +20,17 @@ testlong: export JWCRYPTO_TESTS_ENABLE_MMA=True
testlong: export TOX_TESTENV_PASSENV=JWCRYPTO_TESTS_ENABLE_MMA
testlong:
rm -f .coverage
- tox -e py36
+ tox -e py311
test:
rm -f .coverage
- tox -e py36 --skip-missing-interpreter
- tox -e py37 --skip-missing-interpreter
tox -e py38 --skip-missing-interpreter
tox -e py39 --skip-missing-interpreter
+ tox -e py310 --skip-missing-interpreter
+ tox -e py311 --skip-missing-interpreter
DOCS_DIR = docs
.PHONY: docs
docs:
- $(MAKE) -C $(DOCS_DIR) html
+ $(MAKE) -C $(DOCS_DIR) html doctest
=====================================
README.md
=====================================
@@ -3,6 +3,7 @@
[![Build Status](https://github.com/latchset/jwcrypto/actions/workflows/build.yml/badge.svg)](https://github.com/latchset/jwcrypto/actions/workflows/build.yml)
[![ppc64le Build](https://github.com/latchset/jwcrypto/actions/workflows/ppc64le.yml/badge.svg)](https://github.com/latchset/jwcrypto/actions/workflows/ppc64le.yml)
[![Code Scan](https://github.com/latchset/jwcrypto/actions/workflows/codeql-analysis.yml/badge.svg)](https://github.com/latchset/jwcrypto/actions/workflows/codeql-analysis.yml)
+[![Documentation Status](https://readthedocs.org/projects/jwcrypto/badge/?version=latest)](https://jwcrypto.readthedocs.io/en/latest/?badge=latest)
JWCrypto
========
=====================================
SECURITY.md
=====================================
@@ -4,13 +4,13 @@
| Version | Supported |
| ------- | ------------------ |
-| 0.8 + | :white_check_mark: |
-| < 0.8 | :x: |
+| 1.5.1+ | :white_check_mark: |
+| < 1.5.1 | :x: |
## Reporting a Vulnerability
-Please contact simo at redhat.com if you have found a security vulnerability
+Please use the GitHub feature to report security vulnerabilities.
Expect a response within 2 business days (not on week ends or holidays).
-If the vulnerbaility is confirmed and accepted you will be given instruction on any embargo or disclosure timeline via email.
+If the vulnerbaility is confirmed and accepted you will be given instruction on any embargo or disclosure timeline.
=====================================
docs/source/common.rst
=====================================
@@ -0,0 +1,41 @@
+Common
+======
+
+Functions
+---------
+.. autofunction:: jwcrypto.common.base64url_encode
+.. autofunction:: jwcrypto.common.base64url_decode
+.. autofunction:: jwcrypto.common.json_encode
+.. autofunction:: jwcrypto.common.json_decode
+
+Classes
+-------
+
+.. autoclass:: jwcrypto.common.JWSEHeaderRegistry
+ :members:
+
+Exceptions
+----------
+
+.. autoclass:: jwcrypto.common.JWException
+
+.. autoclass:: jwcrypto.common.InvalidJWAAlgorithm
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.InvalidCEKeyLength
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.InvalidJWEOperation
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.InvalidJWEKeyType
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.InvalidJWEKeyLength
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.InvalidJWSERegOperation
+ :show-inheritance:
+
+.. autoclass:: jwcrypto.common.JWKeyNotFound
+ :show-inheritance:
=====================================
docs/source/conf.py
=====================================
@@ -30,7 +30,11 @@ sys.path.insert(0, os.path.abspath('../..'))
# Add any Sphinx extension module names here, as strings. They can be
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
-extensions = ['sphinx.ext.autodoc', 'sphinx.ext.intersphinx']
+extensions = [
+ 'sphinx.ext.autodoc',
+ 'sphinx.ext.intersphinx',
+ 'sphinx.ext.doctest',
+]
# Add any paths that contain templates here, relative to this directory.
templates_path = ['.templates']
@@ -53,9 +57,10 @@ copyright = u'2016-2021, JWCrypto Contributors'
# built documents.
#
# The short X.Y version.
-version = '1.1'
+with open(os.path.join('../../jwcrypto', 'VERSION')) as verfile:
+ version = verfile.read().strip()
# The full version, including alpha/beta/rc tags.
-release = '1.1'
+release = version
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
=====================================
docs/source/index.rst
=====================================
@@ -24,8 +24,13 @@ Contents:
jws
jwe
jwt
+ common
+Note: In the examples, random or generated output values are replaced
+with '...' to allow for doctesting. Where possible the immutable part of
+a token has been preserved, and only the variable part replaced with '...'
+
Indices and tables
==================
=====================================
docs/source/jwe.rst
=====================================
@@ -60,8 +60,8 @@ Encrypt a JWE token::
>>> key = jwk.JWK.generate(kty='oct', size=256)
>>> payload = "My Encrypted message"
>>> jwetoken = jwe.JWE(payload.encode('utf-8'),
- json_encode({"alg": "A256KW",
- "enc": "A256CBC-HS512"}))
+ ... json_encode({"alg": "A256KW",
+ ... "enc": "A256CBC-HS512"}))
>>> jwetoken.add_recipient(key)
>>> enc = jwetoken.serialize()
@@ -82,14 +82,14 @@ Encrypt a JWE token::
>>> public_key.import_key(**json_decode(private_key.export_public()))
>>> payload = "My Encrypted message"
>>> protected_header = {
- "alg": "RSA-OAEP-256",
- "enc": "A256CBC-HS512",
- "typ": "JWE",
- "kid": public_key.thumbprint(),
- }
+ ... "alg": "RSA-OAEP-256",
+ ... "enc": "A256CBC-HS512",
+ ... "typ": "JWE",
+ ... "kid": public_key.thumbprint(),
+ ... }
>>> jwetoken = jwe.JWE(payload.encode('utf-8'),
- recipient=public_key,
- protected=protected_header)
+ ... recipient=public_key,
+ ... protected=protected_header)
>>> enc = jwetoken.serialize()
Decrypt a JWE token::
=====================================
docs/source/jwk.rst
=====================================
@@ -66,25 +66,24 @@ Create a 256bit symmetric key::
>>> key = jwk.JWK.generate(kty='oct', size=256)
Export the key with::
- >>> key.export()
- '{"k":"X6TBlwY2so8EwKZ2TFXM7XHSgWBKQJhcspzYydp5Y-o","kty":"oct"}'
+ >>> key.export() #doctest: +ELLIPSIS
+ '{"k":"...","kty":"oct"}'
Create a 2048bit RSA key pair::
- >>> jwk.JWK.generate(kty='RSA', size=2048)
+ >>> jwk.JWK.generate(kty='RSA', size=2048) #doctest: +ELLIPSIS
+ {"kid":"Missing Key ID","thumbprint":"..."}
Create a P-256 EC key pair and export the public key::
>>> key = jwk.JWK.generate(kty='EC', crv='P-256')
- >>> key.export(private_key=False)
- '{"y":"VYlYwBfOTIICojCPfdUjnmkpN-g-lzZKxzjAoFmDRm8",
- "x":"3mdE0rODWRju6qqU01Kw5oPYdNxBOMisFvJFH1vEu9Q",
- "crv":"P-256","kty":"EC"}'
+ >>> key.export(private_key=False) #doctest: +ELLIPSIS
+ '{"crv":"P-256","kty":"EC","x":"...","y":"..."}'
Import a P-256 Public Key::
>>> expkey = {"y":"VYlYwBfOTIICojCPfdUjnmkpN-g-lzZKxzjAoFmDRm8",
- "x":"3mdE0rODWRju6qqU01Kw5oPYdNxBOMisFvJFH1vEu9Q",
- "crv":"P-256","kty":"EC"}
+ ... "x":"3mdE0rODWRju6qqU01Kw5oPYdNxBOMisFvJFH1vEu9Q",
+ ... "crv":"P-256","kty":"EC"}
>>> key = jwk.JWK(**expkey)
Import a Key from a PEM file::
- >>> with open("public.pem", "rb") as pemfile:
- >>> key = jwk.JWK.from_pem(pemfile.read())
+ >>> with open("public.pem", "rb") as pemfile: #doctest: +SKIP
+ ... key = jwk.JWK.from_pem(pemfile.read())
=====================================
docs/source/jws.rst
=====================================
@@ -54,8 +54,8 @@ Sign a JWS token::
>>> payload = "My Integrity protected message"
>>> jwstoken = jws.JWS(payload.encode('utf-8'))
>>> jwstoken.add_signature(key, None,
- json_encode({"alg": "HS256"}),
- json_encode({"kid": key.thumbprint()}))
+ ... json_encode({"alg": "HS256"}),
+ ... json_encode({"kid": key.thumbprint()}))
>>> sig = jwstoken.serialize()
Verify a JWS token::
=====================================
docs/source/jwt.rst
=====================================
@@ -14,35 +14,42 @@ Classes
:members:
:show-inheritance:
+Variables
+---------
+
+.. autodata:: jwcrypto.jwt.JWTClaimsRegistry
+
+.. autodata:: jwcrypto.jwt.JWT_expect_type
+
Examples
--------
Create a symmetric key::
>>> from jwcrypto import jwt, jwk
>>> key = jwk.JWK(generate='oct', size=256)
- >>> key.export()
- '{"k":"Wal4ZHCBsml0Al_Y8faoNTKsXCkw8eefKXYFuwTBOpA","kty":"oct"}'
+ >>> key.export() # doctest: +ELLIPSIS
+ '{"k":"...","kty":"oct"}'
Create a signed token with the generated key::
>>> Token = jwt.JWT(header={"alg": "HS256"},
- claims={"info": "I'm a signed token"})
+ ... claims={"info": "I'm a signed token"})
>>> Token.make_signed_token(key)
- >>> Token.serialize()
- u'eyJhbGciOiJIUzI1NiJ9.eyJpbmZvIjoiSSdtIGEgc2lnbmVkIHRva2VuIn0.rjnRMAKcaRamEHnENhg0_Fqv7Obo-30U4bcI_v-nfEM'
+ >>> Token.serialize() #doctest: +ELLIPSIS
+ 'eyJhbGciOiJIUzI1NiJ9.eyJpbmZvIjoiSSdtIGEgc2lnbmVkIHRva2VuIn0...'
Further encrypt the token with the same key::
>>> Etoken = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"},
- claims=Token.serialize())
+ ... claims=Token.serialize())
>>> Etoken.make_encrypted_token(key)
>>> Etoken.serialize()
- u'eyJhbGciOiJBMjU2S1ciLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0.ST5RmjqDLj696xo7YFTFuKUhcd3naCrm6yMjBM3cqWiFD6U8j2JIsbclsF7ryNg8Ktmt1kQJRKavV6DaTl1T840tP3sIs1qz.wSxVhZH5GyzbJnPBAUMdzQ.6uiVYwrRBzAm7Uge9rEUjExPWGbgerF177A7tMuQurJAqBhgk3_5vee5DRH84kHSapFOxcEuDdMBEQLI7V2E0F57-d01TFStHzwtgtSmeZRQ6JSIL5XlgJouwHfSxn9Z_TGl5xxq4TksORHED1vnRA.5jPyPWanJVqlOohApEbHmxi3JHp1MXbmvQe2_dVd8FI'
+ 'eyJhbGciOiJBMjU2S1ciLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0...'
Now decrypt and verify::
>>> from jwcrypto import jwt, jwk
>>> k = {"k": "Wal4ZHCBsml0Al_Y8faoNTKsXCkw8eefKXYFuwTBOpA", "kty": "oct"}
>>> key = jwk.JWK(**k)
- >>> e = u'eyJhbGciOiJBMjU2S1ciLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0.ST5RmjqDLj696xo7YFTFuKUhcd3naCrm6yMjBM3cqWiFD6U8j2JIsbclsF7ryNg8Ktmt1kQJRKavV6DaTl1T840tP3sIs1qz.wSxVhZH5GyzbJnPBAUMdzQ.6uiVYwrRBzAm7Uge9rEUjExPWGbgerF177A7tMuQurJAqBhgk3_5vee5DRH84kHSapFOxcEuDdMBEQLI7V2E0F57-d01TFStHzwtgtSmeZRQ6JSIL5XlgJouwHfSxn9Z_TGl5xxq4TksORHED1vnRA.5jPyPWanJVqlOohApEbHmxi3JHp1MXbmvQe2_dVd8FI'
- >>> ET = jwt.JWT(key=key, jwt=e)
+ >>> e = 'eyJhbGciOiJBMjU2S1ciLCJlbmMiOiJBMjU2Q0JDLUhTNTEyIn0.ST5RmjqDLj696xo7YFTFuKUhcd3naCrm6yMjBM3cqWiFD6U8j2JIsbclsF7ryNg8Ktmt1kQJRKavV6DaTl1T840tP3sIs1qz.wSxVhZH5GyzbJnPBAUMdzQ.6uiVYwrRBzAm7Uge9rEUjExPWGbgerF177A7tMuQurJAqBhgk3_5vee5DRH84kHSapFOxcEuDdMBEQLI7V2E0F57-d01TFStHzwtgtSmeZRQ6JSIL5XlgJouwHfSxn9Z_TGl5xxq4TksORHED1vnRA.5jPyPWanJVqlOohApEbHmxi3JHp1MXbmvQe2_dVd8FI'
+ >>> ET = jwt.JWT(key=key, jwt=e, expected_type="JWE")
>>> ST = jwt.JWT(key=key, jwt=ET.claims)
>>> ST.claims
- u'{"info":"I\'m a signed token"}'
+ '{"info":"I\'m a signed token"}'
=====================================
jwcrypto/VERSION
=====================================
@@ -0,0 +1 @@
+1.5.4
=====================================
jwcrypto/common.py
=====================================
@@ -126,6 +126,22 @@ class InvalidJWSERegOperation(JWException):
super(InvalidJWSERegOperation, self).__init__(msg)
+class JWKeyNotFound(JWException):
+ """The key needed to complete the operation was not found.
+
+ This exception is raised when a JWKSet is used to perform
+ some operation and the key required to successfully complete
+ the operation is not found.
+ """
+
+ def __init__(self, message=None):
+ if message:
+ msg = message
+ else:
+ msg = 'Key Not Found'
+ super(JWKeyNotFound, self).__init__(msg)
+
+
# JWSE Header Registry definitions
# RFC 7515 - 9.1: JSON Web Signature and Encryption Header Parameters Registry
=====================================
jwcrypto/jwa.py
=====================================
@@ -28,6 +28,8 @@ from jwcrypto.jwk import JWK
# Implements RFC 7518 - JSON Web Algorithms (JWA)
+default_max_pbkdf2_iterations = 16384
+
class JWAAlgorithm(metaclass=ABCMeta):
@@ -44,7 +46,7 @@ class JWAAlgorithm(metaclass=ABCMeta):
@property
@abstractmethod
def keysize(self):
- """The actual/recommended/minimum key size"""
+ """The algorithm key size"""
@property
@abstractmethod
@@ -56,6 +58,14 @@ class JWAAlgorithm(metaclass=ABCMeta):
def algorithm_use(self):
"""One of 'sig', 'kex', 'enc'"""
+ @property
+ def input_keysize(self):
+ """The input key size"""
+ try:
+ return self.wrap_key_size
+ except AttributeError:
+ return self.keysize
+
def _bitsize(x):
return len(x) * 8
@@ -138,9 +148,9 @@ class _RawEC(_RawJWS):
def sign(self, key, payload):
skey = key.get_op_key('sign', self._curve)
+ size = skey.key_size
signature = skey.sign(payload, ec.ECDSA(self.hashfn))
r, s = ec_utils.decode_dss_signature(signature)
- size = key.get_curve(self._curve).key_size
return _encode_int(r, size) + _encode_int(s, size)
def verify(self, key, payload, signature):
@@ -395,7 +405,7 @@ class _Rsa15(_RSA, JWAAlgorithm):
cek = super(_Rsa15, self).unwrap(key, bitsize, ek, headers)
# always raise so we always run through the exception handling
# code in all cases
- raise Exception('Dummy')
+ raise ValueError('Dummy')
except Exception: # pylint: disable=broad-except
return cek
@@ -580,6 +590,9 @@ class _Pbes2HsAesKw(_RawKeyMgmt):
self.aeskwmap = {128: _A128KW, 192: _A192KW, 256: _A256KW}
def _get_key(self, alg, key, p2s, p2c):
+ if p2c > default_max_pbkdf2_iterations:
+ raise ValueError('Invalid p2c value, too large')
+
if not isinstance(key, JWK):
# backwards compatibility for old interface
if isinstance(key, bytes):
@@ -608,13 +621,25 @@ class _Pbes2HsAesKw(_RawKeyMgmt):
return JWK(kty="oct", use="enc", k=base64url_encode(rk))
def wrap(self, key, bitsize, cek, headers):
- p2s = _randombits(128)
- p2c = 8192
+ ret_header = {}
+ if 'p2s' in headers:
+ p2s = base64url_decode(headers['p2s'])
+ if len(p2s) < 8:
+ raise ValueError('Invalid Salt, must be 8 or more octects')
+ else:
+ p2s = _randombits(128)
+ ret_header['p2s'] = base64url_encode(p2s)
+ if 'p2c' in headers:
+ p2c = headers['p2c']
+ else:
+ p2c = 8192
+ ret_header['p2c'] = p2c
kek = self._get_key(headers['alg'], key, p2s, p2c)
aeskw = self.aeskwmap[self.keysize]()
ret = aeskw.wrap(kek, bitsize, cek, headers)
- ret['header'] = {'p2s': base64url_encode(p2s), 'p2c': p2c}
+ if len(ret_header) > 0:
+ ret['header'] = ret_header
return ret
def unwrap(self, key, bitsize, ek, headers):
@@ -850,10 +875,10 @@ class _EdDsa(_RawJWS, JWAAlgorithm):
class _RawJWE:
- def encrypt(self, k, a, m):
+ def encrypt(self, k, aad, m):
raise NotImplementedError
- def decrypt(self, k, a, iv, e, t):
+ def decrypt(self, k, aad, iv, e, t):
raise NotImplementedError
@@ -867,10 +892,10 @@ class _AesCbcHmacSha2(_RawJWE):
self.blocksize = algorithms.AES.block_size
self.wrap_key_size = self.keysize * 2
- def _mac(self, k, a, iv, e):
- al = _encode_int(_bitsize(a), 64)
+ def _mac(self, k, aad, iv, e):
+ al = _encode_int(_bitsize(aad), 64)
h = hmac.HMAC(k, self.hashfn, backend=self.backend)
- h.update(a)
+ h.update(aad)
h.update(iv)
h.update(e)
h.update(al)
@@ -878,16 +903,19 @@ class _AesCbcHmacSha2(_RawJWE):
return m[:_inbytes(self.keysize)]
# RFC 7518 - 5.2.2
- def encrypt(self, k, a, m):
+ def encrypt(self, k, aad, m):
""" Encrypt according to the selected encryption and hashing
functions.
- :param k: Encryption key (optional)
- :param a: Additional Authentication Data
+ :param k: Encryption key
+ :param aad: Additional Authentication Data
:param m: Plaintext
Returns a dictionary with the computed data.
"""
+ if len(k) != _inbytes(self.wrap_key_size):
+ raise ValueError("Invalid input key size")
+
hkey = k[:_inbytes(self.keysize)]
ekey = k[_inbytes(self.keysize):]
@@ -901,26 +929,29 @@ class _AesCbcHmacSha2(_RawJWE):
e = encryptor.update(padded_data) + encryptor.finalize()
# mac
- t = self._mac(hkey, a, iv, e)
+ t = self._mac(hkey, aad, iv, e)
return (iv, e, t)
- def decrypt(self, k, a, iv, e, t):
+ def decrypt(self, k, aad, iv, e, t):
""" Decrypt according to the selected encryption and hashing
functions.
- :param k: Encryption key (optional)
- :param a: Additional Authenticated Data
+ :param k: Encryption key
+ :param aad: Additional Authenticated Data
:param iv: Initialization Vector
:param e: Ciphertext
:param t: Authentication Tag
Returns plaintext or raises an error
"""
+ if len(k) != _inbytes(self.wrap_key_size):
+ raise ValueError("Invalid input key size")
+
hkey = k[:_inbytes(self.keysize)]
dkey = k[_inbytes(self.keysize):]
# verify mac
- if not constant_time.bytes_eq(t, self._mac(hkey, a, iv, e)):
+ if not constant_time.bytes_eq(t, self._mac(hkey, aad, iv, e)):
raise InvalidSignature('Failed to verify MAC')
# decrypt
@@ -977,12 +1008,12 @@ class _AesGcm(_RawJWE):
self.wrap_key_size = self.keysize
# RFC 7518 - 5.3
- def encrypt(self, k, a, m):
- """ Encrypt accoriding to the selected encryption and hashing
+ def encrypt(self, k, aad, m):
+ """ Encrypt according to the selected encryption and hashing
functions.
- :param k: Encryption key (optional)
- :param a: Additional Authentication Data
+ :param k: Encryption key
+ :param aad: Additional Authentication Data
:param m: Plaintext
Returns a dictionary with the computed data.
@@ -991,16 +1022,16 @@ class _AesGcm(_RawJWE):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv),
backend=self.backend)
encryptor = cipher.encryptor()
- encryptor.authenticate_additional_data(a)
+ encryptor.authenticate_additional_data(aad)
e = encryptor.update(m) + encryptor.finalize()
return (iv, e, encryptor.tag)
- def decrypt(self, k, a, iv, e, t):
- """ Decrypt accoriding to the selected encryption and hashing
+ def decrypt(self, k, aad, iv, e, t):
+ """ Decrypt according to the selected encryption and hashing
functions.
- :param k: Encryption key (optional)
- :param a: Additional Authenticated Data
+ :param k: Encryption key
+ :param aad: Additional Authenticated Data
:param iv: Initialization Vector
:param e: Ciphertext
:param t: Authentication Tag
@@ -1010,7 +1041,7 @@ class _AesGcm(_RawJWE):
cipher = Cipher(algorithms.AES(k), modes.GCM(iv, t),
backend=self.backend)
decryptor = cipher.decryptor()
- decryptor.authenticate_additional_data(a)
+ decryptor.authenticate_additional_data(aad)
return decryptor.update(e) + decryptor.finalize()
@@ -1041,6 +1072,54 @@ class _A256Gcm(_AesGcm, JWAAlgorithm):
algorithm_use = 'enc'
+class _BP256R1(_RawEC, JWAAlgorithm):
+
+ name = "BP256R1"
+ description = (
+ "ECDSA using Brainpool256R1 curve and SHA-256"
+ " (unregistered, custom-defined in breach"
+ " of IETF rules by gematik GmbH)"
+ )
+ keysize = 256
+ algorithm_usage_location = 'alg'
+ algorithm_use = 'sig'
+
+ def __init__(self):
+ super(_BP256R1, self).__init__('BP-256', hashes.SHA256())
+
+
+class _BP384R1(_RawEC, JWAAlgorithm):
+
+ name = "BP384R1"
+ description = (
+ "ECDSA using Brainpool384R1 curve and SHA-384"
+ " (unregistered, custom-defined in breach"
+ " of IETF rules by gematik GmbH)"
+ )
+ keysize = 384
+ algorithm_usage_location = 'alg'
+ algorithm_use = 'sig'
+
+ def __init__(self):
+ super(_BP384R1, self).__init__('BP-384', hashes.SHA384())
+
+
+class _BP512R1(_RawEC, JWAAlgorithm):
+
+ name = "BP512R1"
+ description = (
+ "ECDSA using Brainpool512R1 curve and SHA-512"
+ " (unregistered, custom-defined in breach"
+ " of IETF rules by gematik GmbH)"
+ )
+ keysize = 512
+ algorithm_usage_location = 'alg'
+ algorithm_use = 'sig'
+
+ def __init__(self):
+ super(_BP512R1, self).__init__('BP-512', hashes.SHA512())
+
+
class JWA:
"""JWA Signing Algorithms.
@@ -1085,7 +1164,10 @@ class JWA:
'A256CBC-HS512': _A256CbcHs512,
'A128GCM': _A128Gcm,
'A192GCM': _A192Gcm,
- 'A256GCM': _A256Gcm
+ 'A256GCM': _A256Gcm,
+ 'BP256R1': _BP256R1,
+ 'BP384R1': _BP384R1,
+ 'BP512R1': _BP512R1
}
@classmethod
=====================================
jwcrypto/jwe.py
=====================================
@@ -3,11 +3,12 @@
import zlib
from jwcrypto import common
-from jwcrypto.common import JWException
+from jwcrypto.common import JWException, JWKeyNotFound
from jwcrypto.common import JWSEHeaderParameter, JWSEHeaderRegistry
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA
+from jwcrypto.jwk import JWKSet
# RFC 7516 - 4.1
@@ -271,6 +272,9 @@ class JWE:
with the compact representation and `compact` is True.
:raises InvalidJWEOperation: if no recipients have been added
to the object.
+
+ :return: A json formatted string or a compact representation string
+ :rtype: `str`
"""
if 'ciphertext' not in self.objects:
@@ -280,7 +284,7 @@ class JWE:
for invalid in 'aad', 'unprotected':
if invalid in self.objects:
raise InvalidJWEOperation(
- "Can't use compact encoding when the '%s' parameter"
+ "Can't use compact encoding when the '%s' parameter "
"is set" % invalid)
if 'protected' not in self.objects:
raise InvalidJWEOperation(
@@ -355,6 +359,14 @@ class JWE:
raise InvalidJWEData('Unsupported critical header: '
'"%s"' % k)
+ def _unwrap_decrypt(self, alg, enc, key, enckey, header,
+ aad, iv, ciphertext, tag):
+ cek = alg.unwrap(key, enc.wrap_key_size, enckey, header)
+ data = enc.decrypt(cek, aad, iv, ciphertext, tag)
+ self.decryptlog.append('Success')
+ self.cek = cek
+ return data
+
# FIXME: allow to specify which algorithms to accept as valid
def _decrypt(self, key, ppe):
@@ -374,16 +386,39 @@ class JWE:
aad = base64url_encode(self.objects.get('protected', ''))
if 'aad' in self.objects:
aad += '.' + base64url_encode(self.objects['aad'])
+ aad = aad.encode('utf-8')
- cek = alg.unwrap(key, enc.wrap_key_size,
- ppe.get('encrypted_key', b''), jh)
- data = enc.decrypt(cek, aad.encode('utf-8'),
- self.objects['iv'],
- self.objects['ciphertext'],
- self.objects['tag'])
+ if isinstance(key, JWKSet):
+ keys = key
+ if 'kid' in self.jose_header:
+ kid_keys = key.get_keys(self.jose_header['kid'])
+ if not kid_keys:
+ raise JWKeyNotFound('Key ID {} not in key set'.format(
+ self.jose_header['kid']))
+ keys = kid_keys
- self.decryptlog.append('Success')
- self.cek = cek
+ for k in keys:
+ try:
+ data = self._unwrap_decrypt(alg, enc, k,
+ ppe.get('encrypted_key', b''),
+ jh, aad, self.objects['iv'],
+ self.objects['ciphertext'],
+ self.objects['tag'])
+ self.decryptlog.append("Success")
+ break
+ except Exception as e: # pylint: disable=broad-except
+ keyid = k.get('kid', k.thumbprint())
+ self.decryptlog.append('Key [{}] failed: [{}]'.format(
+ keyid, repr(e)))
+
+ if "Success" not in self.decryptlog:
+ raise JWKeyNotFound('No working key found in key set')
+ else:
+ data = self._unwrap_decrypt(alg, enc, key,
+ ppe.get('encrypted_key', b''),
+ jh, aad, self.objects['iv'],
+ self.objects['ciphertext'],
+ self.objects['tag'])
compress = jh.get('zip', None)
if compress == 'DEF':
@@ -397,31 +432,40 @@ class JWE:
"""Decrypt a JWE token.
:param key: The (:class:`jwcrypto.jwk.JWK`) decryption key.
- :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key or a password
- string (optional).
+ :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key,
+ or a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed
+ by the 'kid' header or (deprecated) a string containing a password.
:raises InvalidJWEOperation: if the key is not a JWK object.
:raises InvalidJWEData: if the ciphertext can't be decrypted or
the object is otherwise malformed.
+ :raises JWKeyNotFound: if key is a JWKSet and the key is not found.
"""
if 'ciphertext' not in self.objects:
raise InvalidJWEOperation("No available ciphertext")
self.decryptlog = []
+ missingkey = False
if 'recipients' in self.objects:
for rec in self.objects['recipients']:
try:
self._decrypt(key, rec)
except Exception as e: # pylint: disable=broad-except
+ if isinstance(e, JWKeyNotFound):
+ missingkey = True
self.decryptlog.append('Failed: [%s]' % repr(e))
else:
try:
self._decrypt(key, self.objects)
except Exception as e: # pylint: disable=broad-except
+ if isinstance(e, JWKeyNotFound):
+ missingkey = True
self.decryptlog.append('Failed: [%s]' % repr(e))
if not self.plaintext:
+ if missingkey:
+ raise JWKeyNotFound("Key Not found in JWKSet")
raise InvalidJWEData('No recipient matched the provided '
'key' + repr(self.decryptlog))
@@ -431,12 +475,15 @@ class JWE:
NOTE: Destroys any current status and tries to import the raw
JWE provided.
+ If a key is provided a decryption step will be attempted after
+ the object is successfully deserialized.
+
:param raw_jwe: a 'raw' JWE token (JSON Encoded or Compact
notation) string.
- :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key or a password
- string (optional).
- If a key is provided a decryption step will be attempted after
- the object is successfully deserialized.
+ :param key: A (:class:`jwcrypto.jwk.JWK`) decryption key,
+ or a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed
+ by the 'kid' header or (deprecated) a string containing a password
+ (optional).
:raises InvalidJWEData: if the raw object is an invalid JWE token.
:raises InvalidJWEOperation: if the decryption fails.
@@ -478,17 +525,17 @@ class JWE:
o['header'] = json_encode(djwe['header'])
except ValueError as e:
- c = raw_jwe.split('.')
- if len(c) != 5:
+ data = raw_jwe.split('.')
+ if len(data) != 5:
raise InvalidJWEData() from e
- p = base64url_decode(c[0])
+ p = base64url_decode(data[0])
o['protected'] = p.decode('utf-8')
- ekey = base64url_decode(c[1])
+ ekey = base64url_decode(data[1])
if ekey != b'':
- o['encrypted_key'] = base64url_decode(c[1])
- o['iv'] = base64url_decode(c[2])
- o['ciphertext'] = base64url_decode(c[3])
- o['tag'] = base64url_decode(c[4])
+ o['encrypted_key'] = base64url_decode(data[1])
+ o['iv'] = base64url_decode(data[2])
+ o['ciphertext'] = base64url_decode(data[3])
+ o['tag'] = base64url_decode(data[4])
self.objects = o
@@ -510,3 +557,52 @@ class JWE:
if len(jh) == 0:
raise InvalidJWEOperation("JOSE Header not available")
return jh
+
+ @classmethod
+ def from_jose_token(cls, token):
+ """Creates a JWE object from a serialized JWE token.
+
+ :param token: A string with the json or compat representation
+ of the token.
+
+ :raises InvalidJWEData: if the raw object is an invalid JWE token.
+
+ :return: A JWE token
+ :rtype: JWE
+ """
+
+ obj = cls()
+ obj.deserialize(token)
+ return obj
+
+ def __eq__(self, other):
+ if not isinstance(other, JWE):
+ return False
+ try:
+ return self.serialize() == other.serialize()
+ except Exception: # pylint: disable=broad-except
+ data1 = {'plaintext': self.plaintext}
+ data1.update(self.objects)
+ data2 = {'plaintext': other.plaintext}
+ data2.update(other.objects)
+ return data1 == data2
+
+ def __str__(self):
+ try:
+ return self.serialize()
+ except Exception: # pylint: disable=broad-except
+ return self.__repr__()
+
+ def __repr__(self):
+ try:
+ return f'JWE.from_json_token("{self.serialize()}")'
+ except Exception: # pylint: disable=broad-except
+ plaintext = repr(self.plaintext)
+ protected = self.objects.get('protected')
+ unprotected = self.objects.get('unprotected')
+ aad = self.objects.get('aad')
+ algs = self._allowed_algs
+ return f'JWE(plaintext={plaintext}, ' + \
+ f'protected={protected}, ' + \
+ f'unprotected={unprotected}, ' + \
+ f'aad={aad}, algs={algs})'
=====================================
jwcrypto/jwk.py
=====================================
@@ -11,7 +11,7 @@ from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric import rsa
-from deprecated import deprecated
+from typing_extensions import deprecated
from jwcrypto.common import JWException
from jwcrypto.common import base64url_decode, base64url_encode
@@ -74,12 +74,15 @@ except ImportError:
X448PrivateKey = UnimplementedOKPCurveKey
-_OKP_CURVE = namedtuple('Name', 'pubkey privkey')
+_Ed25519_CURVE = namedtuple('Ed25519', 'pubkey privkey')
+_Ed448_CURVE = namedtuple('Ed448', 'pubkey privkey')
+_X25519_CURVE = namedtuple('X25519', 'pubkey privkey')
+_X448_CURVE = namedtuple('X448', 'pubkey privkey')
_OKP_CURVES_TABLE = {
- 'Ed25519': _OKP_CURVE(Ed25519PublicKey, Ed25519PrivateKey),
- 'Ed448': _OKP_CURVE(Ed448PublicKey, Ed448PrivateKey),
- 'X25519': _OKP_CURVE(X25519PublicKey, X25519PrivateKey),
- 'X448': _OKP_CURVE(X448PublicKey, X448PrivateKey)
+ 'Ed25519': _Ed25519_CURVE(Ed25519PublicKey, Ed25519PrivateKey),
+ 'Ed448': _Ed448_CURVE(Ed448PublicKey, Ed448PrivateKey),
+ 'X25519': _X25519_CURVE(X25519PublicKey, X25519PrivateKey),
+ 'X448': _X448_CURVE(X448PublicKey, X448PrivateKey)
}
@@ -159,7 +162,17 @@ JWKEllipticCurveRegistry = {'P-256': 'P-256 curve',
'Ed25519': 'Ed25519 signature algorithm key pairs',
'Ed448': 'Ed448 signature algorithm key pairs',
'X25519': 'X25519 function key pairs',
- 'X448': 'X448 function key pairs'}
+ 'X448': 'X448 function key pairs',
+ 'BP-256': 'BrainpoolP256R1 curve'
+ ' (unregistered, custom-defined in breach'
+ ' of IETF rules by gematik GmbH)',
+ 'BP-384': 'BrainpoolP384R1 curve'
+ ' (unregistered, custom-defined in breach'
+ ' of IETF rules by gematik GmbH)',
+ 'BP-512': 'BrainpoolP512R1 curve'
+ ' (unregistered, custom-defined in breach'
+ ' of IETF rules by gematik GmbH)'
+ }
"""Registry of allowed Elliptic Curves"""
# RFC 7517 - 8.2
@@ -183,7 +196,28 @@ JWKOperationsRegistry = {'sign': 'Compute digital Signature or MAC',
JWKpycaCurveMap = {'secp256r1': 'P-256',
'secp384r1': 'P-384',
'secp521r1': 'P-521',
- 'secp256k1': 'secp256k1'}
+ 'secp256k1': 'secp256k1',
+ 'brainpoolP256r1': 'BP-256',
+ 'brainpoolP384r1': 'BP-384',
+ 'brainpoolP512r1': 'BP-512'}
+
+IANANamedInformationHashAlgorithmRegistry = {
+ 'sha-256': hashes.SHA256(),
+ 'sha-256-128': None,
+ 'sha-256-120': None,
+ 'sha-256-96': None,
+ 'sha-256-64': None,
+ 'sha-256-32': None,
+ 'sha-384': hashes.SHA384(),
+ 'sha-512': hashes.SHA512(),
+ 'sha3-224': hashes.SHA3_224(),
+ 'sha3-256': hashes.SHA3_256(),
+ 'sha3-384': hashes.SHA3_384(),
+ 'sha3-512': hashes.SHA3_512(),
+ 'blake2s-256': hashes.BLAKE2s(32),
+ 'blake2b-256': None, # pyca supports only 64 bytes for BLAKEb
+ 'blake2b-512': hashes.BLAKE2b(64),
+}
class InvalidJWKType(JWException):
@@ -271,7 +305,7 @@ class JWK(dict):
"""
def __init__(self, **kwargs):
- """Creates a new JWK object.
+ r"""Creates a new JWK object.
The function arguments must be valid parameters as defined in the
'IANA JSON Web Key Set Parameters registry' and specified in
@@ -296,6 +330,8 @@ class JWK(dict):
valid key type as value then a new key will be generated according
to the defaults or provided key strength options (type specific).
+ :param \**kwargs: parameters (optional).
+
:raises InvalidJWKType: if the key type is invalid
:raises InvalidJWKValue: if incorrect or inconsistent parameters
are provided.
@@ -341,7 +377,7 @@ class JWK(dict):
alg = JWA.instantiate_alg(params['alg'])
except KeyError as e:
raise ValueError("Invalid 'alg' parameter") from e
- size = alg.keysize
+ size = alg.input_keysize
return size
def _generate_oct(self, params):
@@ -395,19 +431,51 @@ class JWK(dict):
)
self.import_key(**params)
- def _get_curve_by_name(self, name):
- if name == 'P-256':
+ def _get_curve_by_name(self, name, ctype=None):
+ crv = self.get('crv')
+
+ if name is None:
+ cname = crv
+ elif name == 'P-256K':
+ # P-256K is an alias for 'secp256k1' to handle compatibility
+ # with some implementation using this old drafting name
+ cname = 'secp256k1'
+ else:
+ cname = name
+
+ # Check we are asking for the correct curve unless this is being
+ # requested for generation on a blank JWK object
+ if crv:
+ ccrv = crv
+ if ccrv == 'P-256K':
+ ccrv = 'secp256k1'
+ if ccrv != cname:
+ raise InvalidJWKValue('Curve requested is "%s", but '
+ 'key curve is "%s"' % (name, crv))
+ kty = self.get('kty')
+ if kty is not None and ctype is not None and kty != ctype:
+ raise InvalidJWKType('Curve Requested is of type "%s", but '
+ 'key curve is of type "%s"' % (ctype, kty))
+
+ # Return a curve object
+ if cname == 'P-256':
return ec.SECP256R1()
- elif name == 'P-384':
+ elif cname == 'P-384':
return ec.SECP384R1()
- elif name == 'P-521':
+ elif cname == 'P-521':
return ec.SECP521R1()
- elif name == 'secp256k1':
+ elif cname == 'secp256k1':
return ec.SECP256K1()
- elif name in _OKP_CURVES_TABLE:
- return name
+ elif cname == 'BP-256':
+ return ec.BrainpoolP256R1()
+ elif cname == 'BP-384':
+ return ec.BrainpoolP384R1()
+ elif cname == 'BP-512':
+ return ec.BrainpoolP512R1()
+ elif cname in _OKP_CURVES_TABLE:
+ return _OKP_CURVES_TABLE[cname]
else:
- raise InvalidJWKValue('Unknown Elliptic Curve Type')
+ raise InvalidJWKValue('Unknown Curve Name [%s]' % (name))
def _generate_EC(self, params):
curve = 'P-256'
@@ -417,8 +485,8 @@ class JWK(dict):
# precedence
if 'crv' in params:
curve = params.pop('crv')
- curve_name = self._get_curve_by_name(curve)
- key = ec.generate_private_key(curve_name, default_backend())
+ curve_fn = self._get_curve_by_name(curve, 'EC')
+ key = ec.generate_private_key(curve_fn, default_backend())
self._import_pyca_pri_ec(key, **params)
def _import_pyca_pri_ec(self, key, **params):
@@ -447,11 +515,8 @@ class JWK(dict):
def _generate_OKP(self, params):
if 'crv' not in params:
raise InvalidJWKValue('Must specify "crv" for OKP key generation')
- try:
- key = _OKP_CURVES_TABLE[params['crv']].privkey.generate()
- except KeyError as e:
- raise InvalidJWKValue('"%s" is not a supported curve for the '
- 'OKP key type' % params['crv']) from e
+ curve_fn = self._get_curve_by_name(params['crv'], 'OKP')
+ key = curve_fn.privkey.generate()
self._import_pyca_pri_okp(key, **params)
def _okp_curve_from_pyca_key(self, key):
@@ -541,11 +606,11 @@ class JWK(dict):
# check key_ops
if 'key_ops' in newkey:
for ko in newkey['key_ops']:
- c = 0
+ cnt = 0
for cko in newkey['key_ops']:
if ko == cko:
- c += 1
- if c != 1:
+ cnt += 1
+ if cnt != 1:
raise InvalidJWKValue('Duplicate values in "key_ops"')
# check use/key_ops consistency
@@ -576,6 +641,9 @@ class JWK(dict):
"""Creates a RFC 7517 JWK from the standard JSON format.
:param key: The RFC 7517 representation of a JWK.
+
+ :return: A JWK object that holds the json key.
+ :rtype: JWK
"""
obj = cls()
try:
@@ -592,6 +660,11 @@ class JWK(dict):
:param private_key(bool): Whether to export the private key.
Defaults to True.
+
+ :return: A portable representation of the key.
+ If as_dict is True then a dictionary is returned.
+ By default a json string
+ :rtype: `str` or `dict`
"""
if private_key is True:
# Use _export_all for backwards compatibility, as this
@@ -606,6 +679,11 @@ class JWK(dict):
is called on a symmetric key.
:param as_dict(bool): If set to True export as python dict not JSON
+
+ :return: A portable representation of the public key only.
+ If as_dict is True then a dictionary is returned.
+ By default a json string
+ :rtype: `str` or `dict`
"""
pub = self._public_params()
if as_dict is True:
@@ -639,6 +717,11 @@ class JWK(dict):
It fails for a JWK that has only a public key or is symmetric.
:param as_dict(bool): If set to True export as python dict not JSON
+
+ :return: A portable representation of a private key.
+ If as_dict is True then a dictionary is returned.
+ By default a json string
+ :rtype: `str` or `dict`
"""
if self.has_private:
return self._export_all(as_dict)
@@ -681,13 +764,13 @@ class JWK(dict):
return self.get('kty') == 'oct'
@property
- @deprecated
+ @deprecated('')
def key_type(self):
"""The Key type"""
return self.get('kty')
@property
- @deprecated
+ @deprecated('')
def key_id(self):
"""The Key ID.
Provided by the kid parameter if present, otherwise returns None.
@@ -695,13 +778,14 @@ class JWK(dict):
return self.get('kid')
@property
- @deprecated
+ @deprecated('')
def key_curve(self):
"""The Curve Name."""
if self.get('kty') not in ['EC', 'OKP']:
raise InvalidJWKType('Not an EC or OKP key')
return self.get('crv')
+ @deprecated('')
def get_curve(self, arg):
"""Gets the Elliptic Curve associated with the key.
@@ -709,15 +793,11 @@ class JWK(dict):
:raises InvalidJWKType: the key is not an EC or OKP key.
:raises InvalidJWKValue: if the curve name is invalid.
- """
- crv = self.get('crv')
- if self.get('kty') not in ['EC', 'OKP']:
- raise InvalidJWKType('Not an EC or OKP key')
- if arg and crv != arg:
- raise InvalidJWKValue('Curve requested is "%s", but '
- 'key curve is "%s"' % (arg, crv))
- return self._get_curve_by_name(crv)
+ :return: An EllipticCurve object
+ :rtype: `EllipticCurve`
+ """
+ return self._get_curve_by_name(arg)
def _check_constraints(self, usage, operation):
use = self.get('use')
@@ -765,7 +845,8 @@ class JWK(dict):
def _ec_pub_n(self, curve):
x = self._decode_int(self.get('x'))
y = self._decode_int(self.get('y'))
- return ec.EllipticCurvePublicNumbers(x, y, self.get_curve(curve))
+ curve_fn = self._get_curve_by_name(curve, ctype='EC')
+ return ec.EllipticCurvePublicNumbers(x, y, curve_fn)
def _ec_pri_n(self, curve):
d = self._decode_int(self.get('d'))
@@ -854,6 +935,9 @@ class JWK(dict):
not permitted with this key.
:raises InvalidJWKUsage: if the use constraints do not permit
the operation.
+
+ :return: A Python Cryptography key object for asymmetric keys
+ or a baseurl64_encoded octet string for symmetric keys
"""
validops = self.get('key_ops',
list(JWKOperationsRegistry.keys()))
@@ -887,9 +971,13 @@ class JWK(dict):
self._import_pyca_pri_ec(key)
elif isinstance(key, ec.EllipticCurvePublicKey):
self._import_pyca_pub_ec(key)
- elif isinstance(key, (Ed25519PrivateKey, Ed448PrivateKey)):
+ elif isinstance(key, (Ed25519PrivateKey,
+ Ed448PrivateKey,
+ X25519PrivateKey)):
self._import_pyca_pri_okp(key)
- elif isinstance(key, (Ed25519PublicKey, Ed448PublicKey)):
+ elif isinstance(key, (Ed25519PublicKey,
+ Ed448PublicKey,
+ X25519PublicKey)):
self._import_pyca_pub_okp(key)
else:
raise InvalidJWKValue('Unknown key object %r' % key)
@@ -940,27 +1028,30 @@ class JWK(dict):
Defaults to False which will cause the operation to fail. To avoid
encryption the user must explicitly pass None, otherwise the user
needs to provide a password in a bytes buffer.
+
+ :return: A serialized bytes buffer containing a PEM formatted key.
+ :rtype: `bytes`
"""
- e = serialization.Encoding.PEM
+ enc = serialization.Encoding.PEM
if private_key:
if not self.has_private:
raise InvalidJWKType("No private key available")
f = serialization.PrivateFormat.PKCS8
if password is None:
- a = serialization.NoEncryption()
+ enc_alg = serialization.NoEncryption()
elif isinstance(password, bytes):
- a = serialization.BestAvailableEncryption(password)
+ enc_alg = serialization.BestAvailableEncryption(password)
elif password is False:
raise ValueError("The password must be None or a bytes string")
else:
raise TypeError("The password string must be bytes")
return self._get_private_key().private_bytes(
- encoding=e, format=f, encryption_algorithm=a)
+ encoding=enc, format=f, encryption_algorithm=enc_alg)
else:
if not self.has_public:
raise InvalidJWKType("No public key available")
f = serialization.PublicFormat.SubjectPublicKeyInfo
- return self._get_public_key().public_bytes(encoding=e, format=f)
+ return self._get_public_key().public_bytes(encoding=enc, format=f)
@classmethod
def from_pyca(cls, key):
@@ -975,6 +1066,9 @@ class JWK(dict):
:param data(bytes): The data contained in a PEM file.
:param password(bytes): An optional password to unwrap the key.
+
+ :return: A JWK object.
+ :rtype: JWK
"""
obj = cls()
obj.import_from_pem(data, password)
@@ -984,6 +1078,9 @@ class JWK(dict):
"""Returns the key thumbprint as specified by RFC 7638.
:param hashalg: A hash function (defaults to SHA256)
+
+ :return: A base64url encoded digest of the key
+ :rtype: `str`
"""
t = {'kty': self.get('kty')}
@@ -994,6 +1091,28 @@ class JWK(dict):
digest.update(bytes(json_encode(t).encode('utf8')))
return base64url_encode(digest.finalize())
+ def thumbprint_uri(self, hname='sha-256'):
+ """Returns the key thumbprint URI as specified by RFC 9278.
+
+ :param hname: A hash function name as specified in IANA's
+ Named Information registry:
+ https://www.iana.org/assignments/named-information/
+ Values from `IANANamedInformationHashAlgorithmRegistry`
+
+ :return: A JWK Thumbprint URI
+ :rtype: `str`
+ """
+
+ try:
+ h = IANANamedInformationHashAlgorithmRegistry[hname]
+ except KeyError as e:
+ raise InvalidJWKValue('Unknown hash "{}"'.format(hname)) from e
+ if h is None:
+ raise InvalidJWKValue('Unsupported hash "{}"'.format(hname))
+
+ t = self.thumbprint(h)
+ return "urn:ietf:params:oauth:jwk-thumbprint:{}:{}".format(hname, t)
+
# Methods to constrain what this dict allows
def __setitem__(self, item, value):
kty = self.get('kty')
@@ -1053,6 +1172,10 @@ class JWK(dict):
super(JWK, self).__setitem__(item, value)
def update(self, *args, **kwargs):
+ r"""
+ :param \*args: arguments
+ :param \**kwargs: keyword arguments
+ """
for k, v in dict(*args, **kwargs).items():
self.__setitem__(k, v)
@@ -1118,7 +1241,10 @@ class JWK(dict):
def from_password(cls, password):
"""Creates a symmetric JWK key from a user password.
- :param key: A password in utf8 format.
+ :param password: A password in utf8 format.
+
+ :return: a JWK object
+ :rtype: JWK
"""
obj = cls()
params = {'kty': 'oct'}
@@ -1158,6 +1284,7 @@ class JWKSet(dict):
Creates a special key 'keys' that is of a type derived from 'set'
The 'keys' attribute accepts only :class:`jwcrypto.jwk.JWK` elements.
"""
+
def __init__(self, *args, **kwargs):
super(JWKSet, self).__init__()
super(JWKSet, self).__setitem__('keys', _JWKkeys())
@@ -1176,6 +1303,10 @@ class JWKSet(dict):
super(JWKSet, self).__setitem__(key, val)
def update(self, *args, **kwargs):
+ r"""
+ :param \*args: arguments
+ :param \**kwargs: keyword arguments
+ """
for k, v in dict(*args, **kwargs).items():
self.__setitem__(k, v)
@@ -1195,6 +1326,11 @@ class JWKSet(dict):
Defaults to True.
:param as_dict(bool): Whether to return a dict instead of
a JSON object
+
+ :return: A portable representation of the key set.
+ If as_dict is True then a dictionary is returned.
+ By default a json string
+ :rtype: `str` or `dict`
"""
exp_dict = {}
for k, v in self.items():
@@ -1233,6 +1369,9 @@ class JWKSet(dict):
"""Creates a RFC 7517 key set from the standard JSON format.
:param keyset: The RFC 7517 representation of a JOSE key set.
+
+ :return: A JWKSet object.
+ :rtype: JWKSet
"""
obj = cls()
obj.import_keyset(keyset)
@@ -1241,11 +1380,27 @@ class JWKSet(dict):
def get_key(self, kid):
"""Gets a key from the set.
:param kid: the 'kid' key identifier.
+
+ :return: A JWK from the set
+ :rtype: JWK
+ """
+ keys = self.get_keys(kid)
+ if len(keys) > 1:
+ raise InvalidJWKValue(
+ 'Duplicate keys found with requested kid: 1 expected')
+ try:
+ return tuple(keys)[0]
+ except IndexError:
+ return None
+
+ def get_keys(self, kid):
+ """Gets keys from the set with matching kid.
+ :param kid: the 'kid' key identifier.
+
+ :return: a List of keys
+ :rtype: `list`
"""
- for jwk in self['keys']:
- if jwk.get('kid') == kid:
- return jwk
- return None
+ return {key for key in self['keys'] if key.get('kid') == kid}
def __repr__(self):
repr_dict = {}
=====================================
jwcrypto/jws.py
=====================================
@@ -1,11 +1,11 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
-from jwcrypto.common import JWException
+from jwcrypto.common import JWException, JWKeyNotFound
from jwcrypto.common import JWSEHeaderParameter, JWSEHeaderRegistry
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwa import JWA
-from jwcrypto.jwk import JWK
+from jwcrypto.jwk import JWK, JWKSet
JWSHeaderRegistry = {
'alg': JWSEHeaderParameter('Algorithm', False, True, None),
@@ -99,35 +99,33 @@ class JWSCore:
:param alg: The algorithm used to produce the signature.
See RFC 7518
- :param key: A (:class:`jwcrypto.jwk.JWK`) key of appropriate
- type for the "alg" provided in the 'protected' json string.
+ :param key: A (:class:`jwcrypto.jwk.JWK`) verification or
+ a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed by the
+ 'kid' header. A JWKSet is allowed only for verification operations.
:param header: A JSON string representing the protected header.
:param payload(bytes): An arbitrary value
:param algs: An optional list of allowed algorithms
- :raises ValueError: if the key is not a :class:`JWK` object
+ :raises ValueError: if the key is not a (:class:`jwcrypto.jwk.JWK`)
:raises InvalidJWAAlgorithm: if the algorithm is not valid, is
unknown or otherwise not yet implemented.
:raises InvalidJWSOperation: if the algorithm is not allowed.
"""
self.alg = alg
self.engine = self._jwa(alg, algs)
- if not isinstance(key, JWK):
- raise ValueError('key is not a JWK object')
self.key = key
if header is not None:
if isinstance(header, dict):
- self.header = header
header = json_encode(header)
- else:
- self.header = json_decode(header)
+ # Make sure this is always a deep copy of the dict
+ self.header = json_decode(header)
self.protected = base64url_encode(header.encode('utf-8'))
else:
self.header = {}
self.protected = ''
- self.payload = payload
+ self.payload = self._payload(payload)
def _jwa(self, name, allowed):
if allowed is None:
@@ -136,32 +134,37 @@ class JWSCore:
raise InvalidJWSOperation('Algorithm not allowed')
return JWA.signing_alg(name)
- def _payload(self):
+ def _payload(self, payload):
if self.header.get('b64', True):
- return base64url_encode(self.payload).encode('utf-8')
+ return base64url_encode(payload).encode('utf-8')
else:
- if isinstance(self.payload, bytes):
- return self.payload
+ if isinstance(payload, bytes):
+ return payload
else:
- return self.payload.encode('utf-8')
+ return payload.encode('utf-8')
def sign(self):
"""Generates a signature"""
- payload = self._payload()
- sigin = b'.'.join([self.protected.encode('utf-8'), payload])
+ if not isinstance(self.key, JWK):
+ raise ValueError('key is not a JWK object')
+ sigin = b'.'.join([self.protected.encode('utf-8'),
+ self.payload])
signature = self.engine.sign(self.key, sigin)
return {'protected': self.protected,
- 'payload': payload,
+ 'payload': self.payload,
'signature': base64url_encode(signature)}
def verify(self, signature):
"""Verifies a signature
:raises InvalidJWSSignature: if the verification fails.
+
+ :return: Returns True or an Exception
+ :rtype: `bool`
"""
try:
- payload = self._payload()
- sigin = b'.'.join([self.protected.encode('utf-8'), payload])
+ sigin = b'.'.join([self.protected.encode('utf-8'),
+ self.payload])
self.engine.verify(self.key, sigin, signature)
except Exception as e: # pylint: disable=broad-except
raise InvalidJWSSignature('Verification failed') from e
@@ -252,7 +255,6 @@ class JWS:
return header
- # TODO: support selecting key with 'kid' and passing in multiple keys
def _verify(self, alg, key, payload, signature, protected, header=None):
p = {}
# verify it is a valid JSON object and decode
@@ -260,7 +262,7 @@ class JWS:
p = json_decode(protected)
if not isinstance(p, dict):
raise InvalidJWSSignature('Invalid Protected header')
- # merge heders, and verify there are no duplicates
+ # merge headers, and verify there are no duplicates
if header:
if not isinstance(header, dict):
raise InvalidJWSSignature('Invalid Unprotected header')
@@ -277,58 +279,115 @@ class JWS:
raise InvalidJWSSignature('No "alg" in headers')
if alg:
if 'alg' in p and alg != p['alg']:
- raise InvalidJWSSignature('"alg" mismatch, requested '
- '"%s", found "%s"' % (alg,
- p['alg']))
- a = alg
+ raise InvalidJWSSignature(
+ '"alg" mismatch, requested'
+ f''' "{alg}", found "{p['alg']}"'''
+ )
+ resulting_alg = alg
else:
- a = p['alg']
+ resulting_alg = p['alg']
# the following will verify the "alg" is supported and the signature
# verifies
- c = JWSCore(a, key, protected, payload, self._allowed_algs)
- c.verify(signature)
+ if isinstance(key, JWK):
+ signer = JWSCore(resulting_alg, key, protected,
+ payload, self._allowed_algs)
+ signer.verify(signature)
+ self.verifylog.append("Success")
+ elif isinstance(key, JWKSet):
+ keys = key
+ if 'kid' in self.jose_header:
+ kid_keys = key.get_keys(self.jose_header['kid'])
+ if not kid_keys:
+ raise JWKeyNotFound('Key ID {} not in key set'.format(
+ self.jose_header['kid']))
+ keys = kid_keys
+
+ for k in keys:
+ try:
+ signer2 = JWSCore(
+ resulting_alg, k, protected,
+ payload, self._allowed_algs
+ )
+ signer2.verify(signature)
+ self.verifylog.append("Success")
+ break
+ except Exception as e: # pylint: disable=broad-except
+ keyid = k.get('kid', k.thumbprint())
+ self.verifylog.append('Key [{}] failed: [{}]'.format(
+ keyid, repr(e)))
+ if "Success" not in self.verifylog:
+ raise JWKeyNotFound('No working key found in key set')
+ else:
+ raise ValueError("Unrecognized key type")
+
+ # Helper to deal with detached payloads in verification
+ def _get_obj_payload(self, obj, dp):
+ op = obj.get('payload')
+ if dp is not None:
+ if op is None or len(op) == 0:
+ return dp
+ else:
+ raise InvalidJWSOperation('Object Payload present but'
+ ' Detached Payload provided')
+ return op
- def verify(self, key, alg=None):
+ def verify(self, key, alg=None, detached_payload=None):
"""Verifies a JWS token.
- :param key: The (:class:`jwcrypto.jwk.JWK`) verification key.
+ :param key: A (:class:`jwcrypto.jwk.JWK`) verification or
+ a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed by the
+ 'kid' header.
:param alg: The signing algorithm (optional). Usually the algorithm
is known as it is provided with the JOSE Headers of the token.
+ :param detached_payload: A detached payload to verify the signature
+ against. Only valid for tokens that are not carrying a payload.
:raises InvalidJWSSignature: if the verification fails.
+ :raises InvalidJWSOperation: if a detached_payload is provided but
+ an object payload exists
+ :raises JWKeyNotFound: if key is a JWKSet and the key is not found.
"""
self.verifylog = []
self.objects['valid'] = False
obj = self.objects
+ missingkey = False
if 'signature' in obj:
+ payload = self._get_obj_payload(obj, detached_payload)
try:
self._verify(alg, key,
- obj['payload'],
+ payload,
obj['signature'],
obj.get('protected', None),
obj.get('header', None))
obj['valid'] = True
except Exception as e: # pylint: disable=broad-except
+ if isinstance(e, JWKeyNotFound):
+ missingkey = True
self.verifylog.append('Failed: [%s]' % repr(e))
elif 'signatures' in obj:
+ payload = self._get_obj_payload(obj, detached_payload)
for o in obj['signatures']:
try:
self._verify(alg, key,
- obj['payload'],
+ payload,
o['signature'],
o.get('protected', None),
o.get('header', None))
# Ok if at least one verifies
obj['valid'] = True
except Exception as e: # pylint: disable=broad-except
+ if isinstance(e, JWKeyNotFound):
+ missingkey = True
self.verifylog.append('Failed: [%s]' % repr(e))
else:
raise InvalidJWSSignature('No signatures available')
if not self.is_valid:
+ if missingkey:
+ raise JWKeyNotFound('No working key found in key set')
raise InvalidJWSSignature('Verification failed for all '
'signatures' + repr(self.verifylog))
@@ -364,16 +423,20 @@ class JWS:
NOTE: Destroys any current status and tries to import the raw
JWS provided.
+ If a key is provided a verification step will be attempted after
+ the object is successfully deserialized.
+
:param raw_jws: a 'raw' JWS token (JSON Encoded or Compact
notation) string.
- :param key: A (:class:`jwcrypto.jwk.JWK`) verification key (optional).
- If a key is provided a verification step will be attempted after
- the object is successfully deserialized.
+ :param key: A (:class:`jwcrypto.jwk.JWK`) verification or
+ a (:class:`jwcrypto.jwk.JWKSet`) that contains a key indexed by the
+ 'kid' header (optional).
:param alg: The signing algorithm (optional). Usually the algorithm
is known as it is provided with the JOSE Headers of the token.
:raises InvalidJWSObject: if the raw object is an invalid JWS token.
:raises InvalidJWSSignature: if the verification fails.
+ :raises JWKeyNotFound: if key is a JWKSet and the key is not found.
"""
self.objects = {}
o = {}
@@ -397,16 +460,16 @@ class JWS:
o['payload'] = djws['payload']
except ValueError:
- c = raw_jws.split('.')
- if len(c) != 3:
+ data = raw_jws.split('.')
+ if len(data) != 3:
raise InvalidJWSObject('Unrecognized'
' representation') from None
- p = base64url_decode(str(c[0]))
+ p = base64url_decode(str(data[0]))
if len(p) > 0:
o['protected'] = p.decode('utf-8')
self._deserialize_b64(o, o['protected'])
- o['payload'] = base64url_decode(str(c[1]))
- o['signature'] = base64url_decode(str(c[2]))
+ o['payload'] = base64url_decode(str(data[1]))
+ o['signature'] = base64url_decode(str(data[2]))
self.objects = o
@@ -428,7 +491,7 @@ class JWS:
:param header: The Unprotected Header (optional)
:raises InvalidJWSObject: if invalid headers are provided.
- :raises ValueError: if the key is not a :class:`JWK` object.
+ :raises ValueError: if the key is not a (:class:`jwcrypto.jwk.JWK`)
:raises ValueError: if the algorithm is missing or is not provided
by one of the headers.
:raises InvalidJWAAlgorithm: if the algorithm is not valid, is
@@ -437,13 +500,13 @@ class JWS:
b64 = True
- p = {}
if protected:
if isinstance(protected, dict):
- p = protected
- protected = json_encode(p)
- else:
- p = json_decode(protected)
+ protected = json_encode(protected)
+ # Make sure p is always a deep copy of the dict
+ p = json_decode(protected)
+ else:
+ p = dict()
# If b64 is present we must enforce criticality
if 'b64' in list(p.keys()):
@@ -459,10 +522,9 @@ class JWS:
h = None
if header:
if isinstance(header, dict):
- h = header
header = json_encode(header)
- else:
- h = json_decode(header)
+ # Make sure h is always a deep copy of the dict
+ h = json_decode(header)
p = self._merge_check_headers(p, h)
@@ -518,6 +580,9 @@ class JWS:
with the compact representation and `compact` is True.
:raises InvalidJWSSignature: if no signature has been added
to the object, or no valid signature can be found.
+
+ :return: A json formatted string or a compact representation string
+ :rtype: `str`
"""
if compact:
if 'signatures' in self.objects:
@@ -616,3 +681,41 @@ class JWS:
return jhl
else:
raise InvalidJWSOperation("JOSE Header(s) not available")
+
+ @classmethod
+ def from_jose_token(cls, token):
+ """Creates a JWS object from a serialized JWS token.
+
+ :param token: A string with the json or compat representation
+ of the token.
+
+ :raises InvalidJWSObject: if the raw object is an invalid JWS token.
+
+ :return: A JWS token
+ :rtype: JWS
+ """
+
+ obj = cls()
+ obj.deserialize(token)
+ return obj
+
+ def __eq__(self, other):
+ if not isinstance(other, JWS):
+ return False
+ try:
+ return self.serialize() == other.serialize()
+ except Exception: # pylint: disable=broad-except
+ return self.objects == other.objects
+
+ def __str__(self):
+ try:
+ return self.serialize()
+ except Exception: # pylint: disable=broad-except
+ return self.__repr__()
+
+ def __repr__(self):
+ try:
+ return f'JWS.from_json_token("{self.serialize()}")'
+ except Exception: # pylint: disable=broad-except
+ payload = self.objects['payload'].decode('utf-8')
+ return f'JWS(payload={payload})'
=====================================
jwcrypto/jwt.py
=====================================
@@ -1,12 +1,18 @@
# Copyright (C) 2015 JWCrypto Project Contributors - see LICENSE file
+import copy
import time
import uuid
-from jwcrypto.common import JWException, json_decode, json_encode
+from typing_extensions import deprecated
+
+from jwcrypto.common import JWException, JWKeyNotFound
+from jwcrypto.common import json_decode, json_encode
from jwcrypto.jwe import JWE
+from jwcrypto.jwe import default_allowed_algs as jwe_algs
from jwcrypto.jwk import JWK, JWKSet
from jwcrypto.jws import JWS
+from jwcrypto.jws import default_allowed_algs as jws_algs
# RFC 7519 - 4.1
@@ -18,6 +24,17 @@ JWTClaimsRegistry = {'iss': 'Issuer',
'nbf': 'Not Before',
'iat': 'Issued At',
'jti': 'JWT ID'}
+"""Registry of RFC 7519 defined claims"""
+
+
+# do not use this unless you know about CVE-2022-3102
+JWT_expect_type = True
+"""This module parameter can disable the use of the expectation
+ feature that has been introduced to fix CVE-2022-3102. This knob
+ has been added as a workaround for applications that can't be
+ immediately refactored to deal with the change in behavior but it
+ is considered deprecated and will be removed in a future release.
+"""
class JWTExpired(JWException):
@@ -106,7 +123,7 @@ class JWTInvalidClaimFormat(JWException):
super(JWTInvalidClaimFormat, self).__init__(msg)
-# deprecated and not used anymore
+ at deprecated('')
class JWTMissingKeyID(JWException):
"""JSON Web Token is missing key id.
@@ -125,7 +142,7 @@ class JWTMissingKeyID(JWException):
super(JWTMissingKeyID, self).__init__(msg)
-class JWTMissingKey(JWException):
+class JWTMissingKey(JWKeyNotFound):
"""JSON Web Token is using a key not in the key set.
This exception is raised if the key that was used is not available
@@ -150,7 +167,8 @@ class JWT:
"""
def __init__(self, header=None, claims=None, jwt=None, key=None,
- algs=None, default_claims=None, check_claims=None):
+ algs=None, default_claims=None, check_claims=None,
+ expected_type=None):
"""Creates a JWT object.
:param header: A dict or a JSON string with the JWT Header data.
@@ -166,6 +184,12 @@ class JWT:
:param check_claims: An optional dict of claims that must be
present in the token, if the value is not None the claim must
match exactly.
+ :param expected_type: An optional string that defines what kind
+ of token to expect when validating a deserialized token.
+ Supported values: "JWS" or "JWE"
+ If left to None the code will try to detect what the expected
+ type is based on other parameters like 'algs' and will default
+ to JWS if no hints are found. It has no effect on token creation.
Note: either the header,claims or jwt,key parameters should be
provided as a deserialization operation (which occurs if the jwt
@@ -187,6 +211,7 @@ class JWT:
self._leeway = 60 # 1 minute clock skew allowed
self._validity = 600 # 10 minutes validity (up to 11 with leeway)
self.deserializelog = None
+ self._expected_type = expected_type
if header:
self.header = header
@@ -231,16 +256,20 @@ class JWT:
return self._claims
@claims.setter
- def claims(self, c):
- if self._reg_claims and not isinstance(c, dict):
- # decode c so we can set default claims
- c = json_decode(c)
-
- if isinstance(c, dict):
- self._add_default_claims(c)
- self._claims = json_encode(c)
+ def claims(self, data):
+ if not isinstance(data, dict):
+ if not self._reg_claims:
+ # no default_claims, can return immediately
+ self._claims = data
+ return
+ data = json_decode(data)
else:
- self._claims = c
+ # _add_default_claims modifies its argument
+ # so we must always copy it.
+ data = copy.deepcopy(data)
+
+ self._add_default_claims(data)
+ self._claims = json_encode(data)
@property
def token(self):
@@ -269,6 +298,96 @@ class JWT:
def validity(self, v):
self._validity = int(v)
+ def _expected_type_heuristics(self, key=None):
+ if self._expected_type is None and self._algs:
+ if set(self._algs).issubset(jwe_algs + ['RSA1_5']):
+ self._expected_type = "JWE"
+ elif set(self._algs).issubset(jws_algs):
+ self._expected_type = "JWS"
+ if self._expected_type is None and self._header:
+ if "enc" in json_decode(self._header):
+ self._expected_type = "JWE"
+ if self._expected_type is None and key is not None:
+ if isinstance(key, JWK):
+ use = key.get('use')
+ if use == 'sig':
+ self._expected_type = "JWS"
+ elif use == 'enc':
+ self._expected_type = "JWE"
+ elif isinstance(key, JWKSet):
+ all_use = None
+ # we can infer only if all keys are of the same type
+ for k in key:
+ use = k.get('use')
+ if all_use is None:
+ all_use = use
+ elif use != all_use:
+ all_use = None
+ break
+ if all_use == 'sig':
+ self._expected_type = "JWS"
+ elif all_use == 'enc':
+ self._expected_type = "JWE"
+ if self._expected_type is None and key is not None:
+ if isinstance(key, JWK):
+ ops = key.get('key_ops')
+ if ops:
+ if not isinstance(ops, list):
+ ops = [ops]
+ if set(ops).issubset(['sign', 'verify']):
+ self._expected_type = "JWS"
+ elif set(ops).issubset(['encrypt', 'decrypt']):
+ self._expected_type = "JWE"
+ elif isinstance(key, JWKSet):
+ all_ops = None
+ ttype = None
+ # we can infer only if all keys are of the same type
+ for k in key:
+ ops = k.get('key_ops')
+ if ops:
+ if not isinstance(ops, list):
+ ops = [ops]
+ if all_ops is None:
+ if set(ops).issubset(['sign', 'verify']):
+ all_ops = set(['sign', 'verify'])
+ ttype = "JWS"
+ elif set(ops).issubset(['encrypt', 'decrypt']):
+ all_ops = set(['encrypt', 'decrypt'])
+ ttype = "JWE"
+ else:
+ ttype = None
+ break
+ else:
+ if not set(ops).issubset(all_ops):
+ ttype = None
+ break
+ elif all_ops:
+ ttype = None
+ break
+ if ttype:
+ self._expected_type = ttype
+ if self._expected_type is None:
+ self._expected_type = "JWS"
+ return self._expected_type
+
+ @property
+ def expected_type(self):
+ if self._expected_type is not None:
+ return self._expected_type
+
+ # If no expected type is set we default to accept only JWSs,
+ # however to improve backwards compatibility we try some
+ # heuristic to see if there has been strong indication of
+ # what the expected token type is.
+ return self._expected_type_heuristics()
+
+ @expected_type.setter
+ def expected_type(self, v):
+ if v in ["JWS", "JWE"]:
+ self._expected_type = v
+ else:
+ raise ValueError("Invalid value, must be 'JWS' or 'JWE'")
+
def _add_optional_claim(self, name, claims):
if name in claims:
return
@@ -307,7 +426,8 @@ class JWT:
if name not in claims or claims[name] is None:
return
if not isinstance(claims[name], str):
- raise JWTInvalidClaimFormat("Claim %s is not a StringOrURI type")
+ raise JWTInvalidClaimFormat(
+ "Claim %s is not a StringOrURI type" % (name, ))
def _check_array_or_string_claim(self, name, claims):
if name not in claims or claims[name] is None:
@@ -358,7 +478,7 @@ class JWT:
def _check_check_claims(self, check_claims):
self._check_string_claim('iss', check_claims)
self._check_string_claim('sub', check_claims)
- self._check_string_claim('aud', check_claims)
+ self._check_array_or_string_claim('aud', check_claims)
self._check_integer_claim('exp', check_claims)
self._check_integer_claim('nbf', check_claims)
self._check_integer_claim('iat', check_claims)
@@ -398,14 +518,23 @@ class JWT:
elif name == 'aud':
if value is not None:
- if value == claims[name]:
- continue
if isinstance(claims[name], list):
- if value in claims[name]:
- continue
- raise JWTInvalidClaimValue(
- "Invalid '%s' value. Expected '%s' to be in '%s'" % (
- name, claims[name], value))
+ tclaims = claims[name]
+ else:
+ tclaims = [claims[name]]
+ if isinstance(value, list):
+ cclaims = value
+ else:
+ cclaims = [value]
+ found = False
+ for v in cclaims:
+ if v in tclaims:
+ found = True
+ break
+ if not found:
+ raise JWTInvalidClaimValue(
+ "Invalid '{}' value. Expected '{}' in '{}'".format(
+ name, claims[name], value))
elif name == 'exp':
if value is not None:
@@ -456,6 +585,7 @@ class JWT:
t.allowed_algs = self._algs
t.add_signature(key, protected=self.header)
self.token = t
+ self._expected_type = "JWS"
def make_encrypted_token(self, key):
"""Encrypts the payload.
@@ -472,6 +602,53 @@ class JWT:
t.allowed_algs = self._algs
t.add_recipient(key)
self.token = t
+ self._expected_type = "JWE"
+
+ def validate(self, key):
+ """Validate a JWT token that was deserialized w/o providing a key
+
+ :param key: A (:class:`jwcrypto.jwk.JWK`) verification or
+ decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
+ contains a key indexed by the 'kid' header.
+ """
+ self.deserializelog = []
+ if self.token is None:
+ raise ValueError("Token empty")
+
+ et = self._expected_type_heuristics(key)
+ validate_fn = None
+
+ if isinstance(self.token, JWS):
+ if et != "JWS" and JWT_expect_type:
+ raise TypeError("Expected {}, got JWS".format(et))
+ validate_fn = self.token.verify
+ elif isinstance(self.token, JWE):
+ if et != "JWE" and JWT_expect_type:
+ raise TypeError("Expected {}, got JWE".format(et))
+ validate_fn = self.token.decrypt
+ else:
+ raise ValueError("Token format unrecognized")
+
+ try:
+ validate_fn(key)
+ self.deserializelog.append("Success")
+ except Exception as e: # pylint: disable=broad-except
+ if isinstance(self.token, JWS):
+ self.deserializelog = self.token.verifylog
+ elif isinstance(self.token, JWE):
+ self.deserializelog = self.token.decryptlog
+ self.deserializelog.append(
+ 'Validation failed: [{}]'.format(repr(e)))
+ if isinstance(e, JWKeyNotFound):
+ raise JWTMissingKey() from e
+ raise
+
+ self.header = self.token.jose_header
+ payload = self.token.payload
+ if isinstance(payload, bytes):
+ payload = payload.decode('utf-8')
+ self.claims = payload
+ self._check_provided_claims()
def deserialize(self, jwt, key=None):
"""Deserialize a JWT token.
@@ -484,10 +661,10 @@ class JWT:
decryption key, or a (:class:`jwcrypto.jwk.JWKSet`) that
contains a key indexed by the 'kid' header.
"""
- c = jwt.count('.')
- if c == 2:
+ data = jwt.count('.')
+ if data == 2:
self.token = JWS()
- elif c == 4:
+ elif data == 4:
self.token = JWE()
else:
raise ValueError("Token format unrecognized")
@@ -496,44 +673,12 @@ class JWT:
if self._algs:
self.token.allowed_algs = self._algs
- self.deserializelog = []
+ self.deserializelog = None
# now deserialize and also decrypt/verify (or raise) if we
# have a key
- if key is None:
- self.token.deserialize(jwt, None)
- elif isinstance(key, JWK):
- self.token.deserialize(jwt, key)
- self.deserializelog.append("Success")
- elif isinstance(key, JWKSet):
- self.token.deserialize(jwt, None)
- if 'kid' in self.token.jose_header:
- kid_key = key.get_key(self.token.jose_header['kid'])
- if not kid_key:
- raise JWTMissingKey('Key ID %s not in key set'
- % self.token.jose_header['kid'])
- self.token.deserialize(jwt, kid_key)
- else:
- for k in key:
- try:
- self.token.deserialize(jwt, k)
- self.deserializelog.append("Success")
- break
- except Exception as e: # pylint: disable=broad-except
- keyid = k.get('kid')
- if keyid is None:
- keyid = k.thumbprint()
- self.deserializelog.append('Key [%s] failed: [%s]' % (
- keyid, repr(e)))
- continue
- if "Success" not in self.deserializelog:
- raise JWTMissingKey('No working key found in key set')
- else:
- raise ValueError("Unrecognized Key Type")
-
- if key is not None:
- self.header = self.token.jose_header
- self.claims = self.token.payload.decode('utf-8')
- self._check_provided_claims()
+ self.token.deserialize(jwt, None)
+ if key:
+ self.validate(key)
def serialize(self, compact=True):
"""Serializes the object into a JWS token.
@@ -545,5 +690,51 @@ class JWT:
:class:`jwcrypto.jwe.JWE` so that these objects can all be used
interchangeably. However the only valid JWT representation is the
compact representation.
+
+ :return: A json formatted string or a compact representation string
+ :rtype: `str`
"""
+ if not compact:
+ raise ValueError("Only the compact serialization is allowed")
+
return self.token.serialize(compact)
+
+ @classmethod
+ def from_jose_token(cls, token):
+ """Creates a JWT object from a serialized JWT token.
+
+ :param token: A string with the json or compat representation
+ of the token.
+
+ :raises InvalidJWEData or InvalidJWSObject: if the raw object is an
+ invalid JWT token.
+
+ :return: A JWT token
+ :rtype: JWT
+ """
+
+ obj = cls()
+ obj.deserialize(token)
+ return obj
+
+ def __eq__(self, other):
+ if not isinstance(other, JWT):
+ return False
+ return self._claims == other._claims and \
+ self._header == other._header and \
+ self.token == other.token
+
+ def __str__(self):
+ try:
+ return self.serialize()
+ except Exception: # pylint: disable=broad-except
+ return self.__repr__()
+
+ def __repr__(self):
+ jwt = repr(self.token)
+ return f'JWT(header={self._header}, ' + \
+ f'claims={self._claims}, ' + \
+ f'jwt={jwt}, ' + \
+ f'key=None, algs={self._algs}, ' + \
+ f'default_claims={self._reg_claims}, ' + \
+ f'check_claims={self._check_claims})'
=====================================
jwcrypto/tests-cookbook.py
=====================================
@@ -1101,7 +1101,7 @@ JWE_flattened_5_12_5 = {
# In general we can't compare ciphertexts with the reference because
# either the algorithms use random nonces to authenticate the ciphertext
-# or we randomly genrate the nonce when we create the JWe.
+# or we randomly generate the nonce when we create the JWe.
# To double check implementation we encrypt/decrypt our own input and then
# decrypt the reference and check it against the given plaintext
class Cookbook08JWETests(unittest.TestCase):
=====================================
jwcrypto/tests.py
=====================================
@@ -15,12 +15,12 @@ from jwcrypto import jwk
from jwcrypto import jws
from jwcrypto import jwt
from jwcrypto.common import InvalidJWSERegOperation
+from jwcrypto.common import JWKeyNotFound
from jwcrypto.common import JWSEHeaderParameter
from jwcrypto.common import base64url_decode, base64url_encode
from jwcrypto.common import json_decode, json_encode
jwe_algs_and_rsa1_5 = jwe.default_allowed_algs + ['RSA1_5']
-jws_algs_and_rsa1_5 = jws.default_allowed_algs + ['RSA1_5']
# RFC 7517 - A.1
PublicKeys = {"keys": [
@@ -269,7 +269,13 @@ PublicKeys_secp256k1 = {
"crv": "secp256k1",
"x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow",
"y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8"
- }
+ },
+ {
+ "kty": "EC",
+ "crv": "P-256K",
+ "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow",
+ "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8"
+ },
]
}
@@ -282,6 +288,72 @@ PrivateKeys_secp256k1 = {
"y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8",
"d": "GYhU2vrYGZrjLZn71Xniqm54Mi53xiYtaTLawzaf9dA"
},
+ {
+ "kty": "EC",
+ "crv": "P-256K",
+ "x": "Ss6na3mcci8Ud4lQrjaB_T40sfKApEcl2RLIWOJdjow",
+ "y": "7l9qIKtKPW6oEiOYBt7r22Sm0mtFJU-yBkkvMvpscd8",
+ "d": "GYhU2vrYGZrjLZn71Xniqm54Mi53xiYtaTLawzaf9dA"
+ }
+ ]
+}
+
+PublicKeys_brainpool = {
+ "keys": [
+ {
+ "kty": "EC",
+ "crv": "BP-256",
+ "x": "mpkJ29_CYAD0mzQ_MsrbjFMFYtcc9Oxpro37Fa4cLfI",
+ "y": "iBfhNHk0cI73agNpjbKW62dvuVxn7kxp1Sm8oDnzHl8",
+ },
+ {
+ "kty": "EC",
+ "crv": "BP-384",
+ "x": ("WZanneaC2Hi3xslA4znJv7otyEdV5dTPzNUvBjBXPM"
+ "ytf4mRY9JaAITdItjvUTAh"),
+ "y": ("KNLRTNdvUg66aB_TVW4POZkE3q8S0YoQrCzYUrExRDe"
+ "_BXikkqIama-GYQ3UBOQL"),
+ },
+ {
+ "kty": "EC",
+ "crv": "BP-512",
+ "x": ("aQXpvz7DH9OK5eFNO9dY3BdPY1v0-8Rg9KC322PY1Jy"
+ "BJq3EhT0uR_-tgbL2E_aGP6k56lF1xIOOtQxo8zziGA"),
+ "y": ("l9XLHHncigOPr5Tvnj_mVzBFv6i7rdBQrLTq3RXZlCC"
+ "_f_q6L2o79K9IrN_J2wWxAfS8ekuGPGlHZUzK-3D9sA"),
+ }
+ ]
+}
+
+PrivateKeys_brainpool = {
+ "keys": [
+ {
+ "kty": "EC",
+ "crv": "BP-256",
+ "x": "mpkJ29_CYAD0mzQ_MsrbjFMFYtcc9Oxpro37Fa4cLfI",
+ "y": "iBfhNHk0cI73agNpjbKW62dvuVxn7kxp1Sm8oDnzHl8",
+ "d": "KdKRgq0WEM97BQw3jpW_fTOep6fn-Samv4DfDNb-4s4"
+ },
+ {
+ "kty": "EC",
+ "crv": "BP-384",
+ "x": ("WZanneaC2Hi3xslA4znJv7otyEdV5dTPzNUvBjBXPM"
+ "ytf4mRY9JaAITdItjvUTAh"),
+ "y": ("KNLRTNdvUg66aB_TVW4POZkE3q8S0YoQrCzYUrExRDe"
+ "_BXikkqIama-GYQ3UBOQL"),
+ "d": ("B5WeRV0-RztAPAhRbphSAUrsIzy-eSfWGSM5FxOQGlJ"
+ "cq-ECLA_-SIlH7NdWIEJY")
+ },
+ {
+ "kty": "EC",
+ "crv": "BP-512",
+ "x": ("aQXpvz7DH9OK5eFNO9dY3BdPY1v0-8Rg9KC322PY1Jy"
+ "BJq3EhT0uR_-tgbL2E_aGP6k56lF1xIOOtQxo8zziGA"),
+ "y": ("l9XLHHncigOPr5Tvnj_mVzBFv6i7rdBQrLTq3RXZlCC"
+ "_f_q6L2o79K9IrN_J2wWxAfS8ekuGPGlHZUzK-3D9sA"),
+ "d": ("F_LJ9rebAjOtxoMUfngIywYsnJlZNjy3gxNAEvHjSkL"
+ "m6RUUdLXDwc50EMp0LeTh1ku039D5kldK3S9Xi0yKZA")
+ }
]
}
@@ -295,6 +367,16 @@ MCowBQYDK2VwAyEAlsRcb1mVVIUcDjNqZU27N+iPXihH1EQDa/O3utHLtqc=
-----END PUBLIC KEY-----
"""
+X25519PrivatePEM = b"""-----BEGIN PRIVATE KEY-----
+MC4CAQAwBQYDK2VuBCIEIBjAbPTtNY6CUuR5FG1+xb1u5nSRokrNaQYEsgu9O+hP
+-----END PRIVATE KEY-----
+"""
+
+X25519PublicPEM = b"""-----BEGIN PUBLIC KEY-----
+MCowBQYDK2VuAyEAW+m9ugi1psQFx6dtTl6J/XZ4JFP019S+oq4wyAoWPnQ=
+-----END PUBLIC KEY-----
+"""
+
ECPublicPEM = b"""-----BEGIN PUBLIC KEY-----
MFkwEwYHKoZIzj0CAQYIKoZIzj0DAQcDQgAEhvGzt82WMJxqTuXCZxnvwrx4enQj
6xc+erlhbTq8gTMAJBzNRPbpuj4NOwTCwjohrtY0TAkthwTuixuojpGKmw==
@@ -309,6 +391,13 @@ ECPublicJWK = {
"y": "ACQczUT26bo-DTsEwsI6Ia7WNEwJLYcE7osbqI6Rips"
}
+X25519PublicJWK = {
+ 'crv': 'X25519',
+ 'kid': '9cgLEZD5VsaV9dUPNehs2pOwxtmH-EWHJY-pC74Wjak',
+ 'kty': 'OKP',
+ 'x': 'W-m9ugi1psQFx6dtTl6J_XZ4JFP019S-oq4wyAoWPnQ'
+}
+
class TestJWK(unittest.TestCase):
def test_create_pubKeys(self):
@@ -359,24 +448,42 @@ class TestJWK(unittest.TestCase):
e.deserialize(enc, key)
self.assertEqual(e.payload.decode('utf-8'), 'test')
+ # also test key generation with input_keysize != keysize
+ key = jwk.JWK.generate(kty='oct', alg="A128CBC-HS256")
+ self.assertEqual(len(base64url_decode(key['k'])), 32)
+ e = jwe.JWE('test', '{"alg":"A256KW","enc":"A128CBC-HS256"}')
+ e.add_recipient(key)
+ enc = e.serialize()
+ e.deserialize(enc, key)
+ self.assertEqual(e.payload.decode('utf-8'), 'test')
+
def test_generate_EC_key(self):
# Backwards compat curve
key = jwk.JWK.generate(kty='EC', curve='P-256')
- key.get_curve('P-256')
+ key.get_op_key('verify', 'P-256')
# New param
key = jwk.JWK.generate(kty='EC', crv='P-521')
- key.get_curve('P-521')
+ key.get_op_key('verify', 'P-521')
# New param prevails
key = jwk.JWK.generate(kty='EC', curve='P-256', crv='P-521')
- key.get_curve('P-521')
+ key.get_op_key('verify', 'P-521')
# New secp256k curve
key = jwk.JWK.generate(kty='EC', curve='secp256k1')
- key.get_curve('secp256k1')
+ key.get_op_key('verify', 'secp256k1')
+ # Brainpool256R1 curve
+ key = jwk.JWK.generate(kty='EC', crv='BP-256')
+ key.get_op_key('verify', 'BP-256')
+ # Brainpool384R1 curve
+ key = jwk.JWK.generate(kty='EC', crv='BP-384')
+ key.get_op_key('verify', 'BP-384')
+ # Brainpool256R1 curve
+ key = jwk.JWK.generate(kty='EC', crv='BP-512')
+ key.get_op_key('verify', 'BP-512')
def test_generate_OKP_keys(self):
for crv in jwk.ImplementedOkpCurves:
key = jwk.JWK.generate(kty='OKP', crv=crv)
- self.assertEqual(key.get_curve(crv), crv)
+ self.assertEqual(key['crv'], crv)
def test_import_pyca_keys(self):
rsa1 = rsa.generate_private_key(65537, 1024, default_backend())
@@ -415,7 +522,7 @@ class TestJWK(unittest.TestCase):
ks3 = jwk.JWKSet.from_json(ks.export())
self.assertEqual(len(ks), len(ks3))
- # Test key set with mutiple keys
+ # Test key set with multiple keys
ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys))
num = 0
for item in ksm:
@@ -424,6 +531,24 @@ class TestJWK(unittest.TestCase):
num += 1
self.assertEqual(num, len(PrivateKeys['keys']))
+ def test_jwkset_get_keys(self):
+ # Test key set with multiple keys
+ ksm = jwk.JWKSet.from_json(json_encode(PrivateKeys))
+ k1 = jwk.JWK.from_json(json_encode(PrivateKeys['keys'][0]))
+ kwargs = RSAPrivateKey.copy()
+ kwargs['kid'] = '1'
+ k2 = jwk.JWK(**kwargs)
+ self.assertEqual(k1, ksm.get_key('1'))
+ self.assertIsNone(ksm.get_key('not-there'))
+
+ ksm.add(k2)
+ self.assertEqual({k1, k2}, ksm.get_keys('1'))
+ self.assertEqual(3, len(ksm['keys']))
+ # Expect that duplicate kids will
+ # raise an exception when we use get_key
+ with self.assertRaises(jwk.InvalidJWKValue):
+ ksm.get_key('1')
+
def test_jwkset_issue_208(self):
ks = jwk.JWKSet()
key1 = RSAPrivateKey.copy()
@@ -462,6 +587,11 @@ class TestJWK(unittest.TestCase):
self.assertEqual(pub_ec.export_to_pem(), ECPublicPEM)
self.assertEqual(json_decode(pub_ec.export()), ECPublicJWK)
+ def test_import_x25519_from_pem(self):
+ pub_x25519 = jwk.JWK.from_pem(X25519PublicPEM)
+ self.assertEqual(pub_x25519.export_to_pem(), X25519PublicPEM)
+ self.assertEqual(json_decode(pub_x25519.export()), X25519PublicJWK)
+
def test_export_symmetric(self):
key = jwk.JWK(**SymmetricKeys['keys'][0])
self.assertTrue(key.is_symmetric)
@@ -545,6 +675,16 @@ class TestJWK(unittest.TestCase):
for key in keylist:
jwk.JWK(**key)
+ def test_create_pubKeys_brainpool(self):
+ keylist = PublicKeys_brainpool['keys']
+ for key in keylist:
+ jwk.JWK(**key)
+
+ def test_create_priKeys_brainpool(self):
+ keylist = PrivateKeys_brainpool['keys']
+ for key in keylist:
+ jwk.JWK(**key)
+
def test_thumbprint_eddsa(self):
for i in range(0, len(PublicKeys_EdDsa['keys'])):
k = jwk.JWK(**PublicKeys_EdDsa['keys'][i])
@@ -563,10 +703,10 @@ class TestJWK(unittest.TestCase):
pubkey = jwk.JWK.from_pem(Ed25519PublicPEM)
self.assertTrue(pubkey.has_public)
self.assertFalse(pubkey.has_private)
- c = jws.JWS()
- c.deserialize(sig, pubkey, alg="EdDSA")
- self.assertTrue(c.objects['valid'])
- self.assertEqual(c.payload, payload)
+ jws_token = jws.JWS()
+ jws_token.deserialize(sig, pubkey, alg="EdDSA")
+ self.assertTrue(jws_token.objects['valid'])
+ self.assertEqual(jws_token.payload, payload)
def test_jwk_as_dict(self):
key = jwk.JWK(**PublicKeys['keys'][0])
@@ -595,6 +735,28 @@ class TestJWK(unittest.TestCase):
self.assertEqual(key['kty'], 'oct')
self.assertEqual(key['k'], 'dGVzdCBwYXNzd29yZA')
+ def test_p256k_alias(self):
+ key = jwk.JWK.generate(kty='EC', curve='P-256K')
+ key.get_op_key('verify', 'secp256k1')
+
+ pub_k = jwk.JWK(**PrivateKeys_secp256k1['keys'][0])
+ pri_k = jwk.JWK(**PrivateKeys_secp256k1['keys'][1])
+ payload = bytes(bytearray(A1_payload))
+ test = jws.JWS(payload)
+ test.add_signature(pri_k, None, json_encode({"alg": "ES256K"}), None)
+ test_serialization_compact = test.serialize(compact=True)
+ verify = jws.JWS()
+ verify.deserialize(test_serialization_compact)
+ verify.verify(pub_k.public())
+ self.assertEqual(verify.payload, payload)
+
+ def test_thumbprint_uri(self):
+ k = jwk.JWK(**PublicKeys['keys'][1])
+ self.assertEqual(
+ k.thumbprint_uri(),
+ "urn:ietf:params:oauth:jwk-thumbprint:sha-256:{}".format(
+ PublicKeys['thumbprints'][1]))
+
# RFC 7515 - A.1
A1_protected = \
@@ -828,7 +990,7 @@ class TestJWS(unittest.TestCase):
self.assertEqual(decsig, test['signature'])
else:
# Check we can verify the test signature independently
- # this is so taht we can test the ECDSA agaist a known
+ # this is so that we can test the ECDSA against a known
# good signature
s.verify(test['signature'])
@@ -873,7 +1035,7 @@ class TestJWS(unittest.TestCase):
def test_E(self):
s = jws.JWS(A6_example['payload'])
with self.assertRaises(jws.InvalidJWSSignature):
- jws.InvalidJWSSignature(s.deserialize, E_negative)
+ s.deserialize(E_negative)
s.verify(None)
def test_customhdr_jws(self):
@@ -934,6 +1096,31 @@ class TestJWS(unittest.TestCase):
jws_verify.verify(key.public())
self.assertEqual(jws_verify.payload, payload)
+ def test_brainpool_signing_and_verification(self):
+ for key_data in PrivateKeys_brainpool['keys']:
+ key = jwk.JWK(**key_data)
+ payload = bytes(bytearray(A1_payload))
+ jws_test = jws.JWS(payload)
+
+ curve_name = key.get('crv')
+ if curve_name == "BP-256":
+ alg = "BP256R1"
+ elif curve_name == "BP-384":
+ alg = "BP384R1"
+ else:
+ alg = "BP512R1"
+
+ jws_test.allowed_algs = [alg]
+ jws_test.add_signature(key, None, json_encode({"alg": alg}), None)
+ jws_test_serialization_compact = jws_test.serialize(compact=True)
+
+ jws_verify = jws.JWS()
+ jws_verify.allowed_algs = [alg]
+ jws_verify.deserialize(jws_test_serialization_compact)
+ jws_verify.verify(key.public())
+
+ self.assertEqual(jws_verify.payload, payload)
+
def test_jws_issue_224(self):
key = jwk.JWK().generate(kty='oct')
@@ -949,6 +1136,37 @@ class TestJWS(unittest.TestCase):
t.deserialize(o1)
t.verify(key)
+ def test_jws_issue_281(self):
+ header = {"alg": "HS256"}
+ header_copy = copy.deepcopy(header)
+
+ key = jwk.JWK().generate(kty='oct')
+
+ s = jws.JWS(payload='test')
+ s.add_signature(key, protected=header,
+ header={"kid": key.thumbprint()})
+
+ self.assertEqual(header, header_copy)
+
+ def test_decrypt_keyset(self):
+ ks = jwk.JWKSet()
+ key1 = jwk.JWK.generate(kty='oct', alg='HS256', kid='key1')
+ key2 = jwk.JWK.generate(kty='oct', alg='HS384', kid='key2')
+ key3 = jwk.JWK.generate(kty='oct', alg='HS512', kid='key3')
+ ks.add(key1)
+ ks.add(key2)
+ s1 = jws.JWS(payload=b'secret')
+ s1.add_signature(key1, protected='{"alg":"HS256"}')
+ s2 = jws.JWS()
+ s2.deserialize(s1.serialize(), ks)
+ self.assertEqual(s2.payload, b'secret')
+
+ s3 = jws.JWS(payload=b'secret')
+ s3.add_signature(key3, protected='{"alg":"HS256"}')
+ s4 = jws.JWS()
+ with self.assertRaises(JWKeyNotFound):
+ s4.deserialize(s3.serialize(), ks)
+
E_A1_plaintext = \
[84, 104, 101, 32, 116, 114, 117, 101, 32, 115, 105, 103, 110, 32,
@@ -1272,6 +1490,25 @@ class TestJWE(unittest.TestCase):
e2.deserialize(enc, x25519key)
self.assertEqual(e2.payload, plaintext)
+ def test_decrypt_keyset(self):
+ ks = jwk.JWKSet()
+ key1 = jwk.JWK.generate(kty='oct', alg='A128KW', kid='key1')
+ key2 = jwk.JWK.generate(kty='oct', alg='A192KW', kid='key2')
+ key3 = jwk.JWK.generate(kty='oct', alg='A256KW', kid='key3')
+ ks.add(key1)
+ ks.add(key2)
+ e1 = jwe.JWE(plaintext=b'secret')
+ e1.add_recipient(key1, '{"alg":"A128KW","enc":"A128GCM"}')
+ e2 = jwe.JWE()
+ e2.deserialize(e1.serialize(), ks)
+ self.assertEqual(e2.payload, b'secret')
+
+ e3 = jwe.JWE(plaintext=b'secret')
+ e3.add_recipient(key3, '{"alg":"A256KW","enc":"A256GCM"}')
+ e4 = jwe.JWE()
+ with self.assertRaises(JWKeyNotFound):
+ e4.deserialize(e3.serialize(), ks)
+
MMA_vector_key = jwk.JWK(**E_A2_key)
MMA_vector_ok_cek = \
@@ -1427,9 +1664,11 @@ class TestJWT(unittest.TestCase):
tinner = jwt.JWT(jwt=touter.claims, key=sigkey, check_claims=False)
self.assertEqual(A1_claims, json_decode(tinner.claims))
+ # Test Exception throwing when token is encrypted with
+ # algorithms not in the allowed set
with self.assertRaises(jwe.InvalidJWEData):
jwt.JWT(jwt=A2_token, key=E_A2_ex['key'],
- algs=jws_algs_and_rsa1_5)
+ algs=['A192KW', 'A192CBC-HS384', 'RSA1_5'])
def test_decrypt_keyset(self):
key = jwk.JWK(kid='testkey', **E_A2_key)
@@ -1442,7 +1681,9 @@ class TestJWT(unittest.TestCase):
t.make_encrypted_token(key)
token = t.serialize()
# try to decrypt without a matching key
- self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset)
+ self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset,
+ algs=jwe_algs_and_rsa1_5,
+ check_claims={'exp': 1300819380})
# now decrypt with key
keyset.add(key)
jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5,
@@ -1454,7 +1695,8 @@ class TestJWT(unittest.TestCase):
t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5)
t.make_encrypted_token(key)
token = t.serialize()
- self.assertRaises(jwe.InvalidJWEData, jwt.JWT, jwt=token, key=keyset)
+ self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset,
+ algs=jwe_algs_and_rsa1_5)
keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys))
# encrypt a new JWT with no kid
@@ -1463,12 +1705,40 @@ class TestJWT(unittest.TestCase):
t.make_encrypted_token(key)
token = t.serialize()
# try to decrypt without a matching key
- self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset)
+ self.assertRaises(jwt.JWTMissingKey, jwt.JWT, jwt=token, key=keyset,
+ algs=jwe_algs_and_rsa1_5,
+ check_claims={'exp': 1300819380})
# now decrypt with key
keyset.add(key)
jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5,
check_claims={'exp': 1300819380})
+ def test_decrypt_keyset_dup_kid(self):
+ keyset = jwk.JWKSet.from_json(json_encode(PrivateKeys))
+ # add wrong key with duplicate kid
+ key = jwk.JWK(kid='testkey', **E_A3_key)
+ keyset.add(key)
+
+ # encrypt a new JWT with kid
+ key = jwk.JWK(kid='testkey', **E_A2_key)
+ header = copy.copy(A1_header)
+ header['kid'] = 'testkey'
+ t = jwt.JWT(header, A1_claims, algs=jwe_algs_and_rsa1_5)
+ t.make_encrypted_token(key)
+ token = t.serialize()
+
+ # try to decrypt without a matching key
+ with self.assertRaises(jwt.JWTMissingKey):
+ jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5,
+ check_claims={'exp': 1300819380})
+
+ # add right key
+ keyset.add(key)
+
+ # now decrypt with key
+ jwt.JWT(jwt=token, key=keyset, algs=jwe_algs_and_rsa1_5,
+ check_claims={'exp': 1300819380})
+
def test_invalid_claim_type(self):
key = jwk.JWK(**E_A2_key)
claims = {"testclaim": "test"}
@@ -1548,9 +1818,9 @@ class TestJWT(unittest.TestCase):
t.make_signed_token(key)
token = t.serialize()
- c = jwt.JWT()
- c.deserialize(token, key)
- self.assertEqual('{}', c.claims)
+ _jwt = jwt.JWT()
+ _jwt.deserialize(token, key)
+ self.assertEqual('{}', _jwt.claims)
# empty string is also valid
t = jwt.JWT('{"alg":"HS256"}', '')
@@ -1563,9 +1833,9 @@ class TestJWT(unittest.TestCase):
t.make_signed_token(key)
token = t.serialize()
- c = jwt.JWT()
- c.deserialize(token, key)
- self.assertEqual(' ', c.claims)
+ _jwt = jwt.JWT()
+ _jwt.deserialize(token, key)
+ self.assertEqual(' ', _jwt.claims)
def test_Issue_209(self):
key = jwk.JWK(**A3_key)
@@ -1581,14 +1851,109 @@ class TestJWT(unittest.TestCase):
# the oct key before hitting the ES one
jwt.JWT(jwt=token, key=ks)
- def test_Issue_239(self):
- claims = {"aud": "www.example.com"}
- check_claims = {"aud": ["www.example.com", "account"]}
+ def test_Issue_277(self):
+ claims = {"aud": ["www.example.com", "www.test.net"]}
key = jwk.JWK(generate='oct', size=256)
token = jwt.JWT(header={"alg": "HS256"}, claims=claims)
token.make_signed_token(key)
- self.assertRaises(jwt.JWTInvalidClaimFormat, jwt.JWT, key=key,
- jwt=token.serialize(), check_claims=check_claims)
+ sertok = token.serialize()
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": "www.example.com"})
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": "www.test.net"})
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com"]})
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.test.net"]})
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com",
+ "www.test.net"]})
+ jwt.JWT(key=key, jwt=sertok, check_claims={"aud": ["www.example.com",
+ "nomatch"]})
+ self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key,
+ jwt=sertok, check_claims={"aud": "nomatch"})
+ self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key,
+ jwt=sertok, check_claims={"aud": ["nomatch"]})
+ self.assertRaises(jwt.JWTInvalidClaimValue, jwt.JWT, key=key,
+ jwt=sertok, check_claims={"aud": ["nomatch",
+ "failmatch"]})
+
+ def test_unexpected(self):
+ key = jwk.JWK(generate='oct', size=256)
+ claims = {"testclaim": "test"}
+ token = jwt.JWT(header={"alg": "HS256"}, claims=claims)
+ token.make_signed_token(key)
+ sertok = token.serialize()
+
+ token.validate(key)
+ token.expected_type = "JWS"
+ token.validate(key)
+ token.expected_type = "JWE"
+ with self.assertRaises(TypeError):
+ token.validate(key)
+
+ jwt.JWT(jwt=sertok, key=key)
+ jwt.JWT(jwt=sertok, key=key, expected_type='JWS')
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=sertok, key=key, expected_type='JWE')
+
+ jwt.JWT(jwt=sertok, algs=['HS256'], key=key)
+
+ key.use = 'sig'
+ jwt.JWT(jwt=sertok, key=key)
+ key.use = 'enc'
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=sertok, key=key)
+ key.use = None
+ key.key_ops = 'verify'
+ jwt.JWT(jwt=sertok, key=key)
+ key.key_ops = ['sign', 'verify']
+ jwt.JWT(jwt=sertok, key=key)
+ key.key_ops = 'decrypt'
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=sertok, key=key)
+ key.key_ops = ['encrypt', 'decrypt']
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=sertok, key=key)
+ key.key_ops = None
+
+ token = jwt.JWT(header={"alg": "A256KW", "enc": "A256GCM"},
+ claims=claims)
+ token.make_encrypted_token(key)
+ enctok = token.serialize()
+
+ # test workaround for older applications
+ jwt.JWT_expect_type = False
+ jwt.JWT(jwt=enctok, key=key)
+ jwt.JWT_expect_type = True
+
+ token.validate(key)
+ token.expected_type = "JWE"
+ token.validate(key)
+ token.expected_type = "JWS"
+ with self.assertRaises(TypeError):
+ token.validate(key)
+
+ jwt.JWT(jwt=enctok, key=key, expected_type='JWE')
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=enctok, key=key)
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=enctok, key=key, expected_type='JWS')
+
+ jwt.JWT(jwt=enctok, algs=['A256KW', 'A256GCM'], key=key)
+
+ key.use = 'enc'
+ jwt.JWT(jwt=enctok, key=key)
+ key.use = 'sig'
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=enctok, key=key)
+ key.use = None
+ key.key_ops = 'verify'
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=enctok, key=key)
+ key.key_ops = ['sign', 'verify']
+ with self.assertRaises(TypeError):
+ jwt.JWT(jwt=enctok, key=key)
+ key.key_ops = 'decrypt'
+ jwt.JWT(jwt=enctok, key=key)
+ key.key_ops = ['encrypt', 'decrypt']
+ jwt.JWT(jwt=enctok, key=key)
+ key.key_ops = None
class ConformanceTests(unittest.TestCase):
@@ -1712,6 +2077,40 @@ class ConformanceTests(unittest.TestCase):
check.decrypt(key)
self.assertEqual(check.payload, b'plain')
+ def test_pbes2_hs256_aeskw_custom_params(self):
+ enc = jwe.JWE(plaintext='plain',
+ protected={"alg": "PBES2-HS256+A128KW",
+ "enc": "A256CBC-HS512",
+ "p2c": 4096,
+ "p2s": base64url_encode("A" * 16)})
+ key = jwk.JWK.from_password('password')
+ enc.add_recipient(key)
+ o = enc.serialize()
+ check = jwe.JWE()
+ check.deserialize(o)
+ check.decrypt(key)
+ self.assertEqual(check.payload, b'plain')
+
+ enc = jwe.JWE(plaintext='plain',
+ protected={"alg": "PBES2-HS256+A128KW",
+ "enc": "A256CBC-HS512",
+ "p2c": 4096,
+ "p2s": base64url_encode("A" * 7)})
+ key = jwk.JWK.from_password('password')
+ self.assertRaises(ValueError, enc.add_recipient, key)
+
+ # Test p2c iteration checks
+ maxiter = jwa.default_max_pbkdf2_iterations
+ p2cenc = jwe.JWE(plaintext='plain',
+ protected={"alg": "PBES2-HS256+A128KW",
+ "enc": "A256CBC-HS512",
+ "p2c": maxiter + 1,
+ "p2s": base64url_encode("A" * 16)})
+ with self.assertRaisesRegex(ValueError, 'too large'):
+ p2cenc.add_recipient(key)
+ jwa.default_max_pbkdf2_iterations += 2
+ p2cenc.add_recipient(key)
+
class JWATests(unittest.TestCase):
def test_jwa_create(self):
@@ -1793,6 +2192,17 @@ class TestUnencodedPayload(unittest.TestCase):
sig = s.serialize(compact=True)
self.assertEqual(sig, result)
+ def test_detached_payload_verification(self):
+ token = \
+ 'eyJhbGciOiJIUzI1NiIsImI2NCI6ZmFsc2UsImNyaXQiOlsiYjY0Il19..' + \
+ 'A5dxf2s96_n5FLueVuW1Z_vh161FwXZC4YLPff6dmDY'
+
+ s = jws.JWS()
+ s.deserialize(token)
+ s.verify(jwk.JWK(**SymmetricKeys['keys'][1]),
+ detached_payload=rfc7797_payload)
+ self.assertTrue(s.is_valid)
+
def test_misses_crit(self):
s = jws.JWS(rfc7797_payload)
with self.assertRaises(jws.InvalidJWSObject):
@@ -1806,3 +2216,147 @@ class TestUnencodedPayload(unittest.TestCase):
with self.assertRaises(jws.InvalidJWSObject):
s.add_signature(jwk.JWK(**SymmetricKeys['keys'][1]),
protected=rfc7797_u_header)
+
+
+class TestOverloadedOperators(unittest.TestCase):
+
+ def test_jws_equality(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ payload = "My Integrity protected message"
+ signer_a = jws.JWS(payload.encode('utf-8'))
+ signer_b = jws.JWS(payload.encode('utf-8'))
+ self.assertEqual(signer_a, signer_b)
+
+ signer_a.add_signature(key, None,
+ json_encode({"alg": "HS256"}),
+ json_encode({"kid": key.thumbprint()}))
+ # One is signed, the other is not
+ self.assertNotEqual(signer_a, signer_b)
+
+ signer_b.add_signature(key, None,
+ json_encode({"alg": "HS256"}),
+ json_encode({"kid": key.thumbprint()}))
+ # This kind of signature is deterministic so they should be equal
+ self.assertEqual(signer_a, signer_b)
+
+ signer_c = jws.JWS.from_jose_token(signer_a.serialize())
+ self.assertNotEqual(signer_a, signer_c)
+ signer_c.verify(key)
+ self.assertEqual(signer_a, signer_c)
+
+ def test_jws_representations(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ payload = "My Integrity protected message"
+ token = jws.JWS(payload.encode('utf-8'))
+ self.assertEqual(str(token),
+ "JWS(payload=My Integrity protected message)")
+ self.assertEqual(repr(token),
+ "JWS(payload=My Integrity protected message)")
+ token.add_signature(key, None,
+ json_encode({"alg": "HS256"}),
+ json_encode({"kid": key.thumbprint()}))
+ ser = token.serialize()
+ self.assertEqual(str(token), ser)
+ self.assertEqual(repr(token), f'JWS.from_json_token("{ser}")')
+
+ def test_jwe_equality(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ payload = "My Encrypted message"
+ signer_a = jwe.JWE(payload.encode('utf-8'),
+ json_encode({"alg": "A256KW",
+ "enc": "A256CBC-HS512"}))
+ signer_b = jwe.JWE(payload.encode('utf-8'),
+ json_encode({"alg": "A256KW",
+ "enc": "A256CBC-HS512"}))
+ self.assertEqual(signer_a, signer_b)
+
+ signer_a.add_recipient(key)
+ # One is encrypted, the other is not
+ self.assertNotEqual(signer_a, signer_b)
+
+ signer_b.add_recipient(key)
+ # Encryption generates a random CEK so tokens will always differ
+ self.assertNotEqual(signer_a, signer_b)
+
+ signer_c = jwe.JWE.from_jose_token(signer_a.serialize())
+ self.assertEqual(signer_a, signer_c)
+
+ def test_jwe_representations(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ payload = "My Encrypted message"
+ token = jwe.JWE(payload.encode('utf-8'),
+ json_encode({"alg": "A256KW",
+ "enc": "A256CBC-HS512"}))
+ strrep = "JWE(plaintext=b\'My Encrypted message\', " + \
+ "protected={\"alg\":\"A256KW\"," + \
+ "\"enc\":\"A256CBC-HS512\"}, " + \
+ "unprotected=None, aad=None, algs=None)"
+ self.assertEqual(str(token), strrep)
+ self.assertEqual(repr(token), strrep)
+
+ token.add_recipient(key)
+ ser = token.serialize()
+ self.assertEqual(str(token), ser)
+ self.assertEqual(repr(token), f'JWE.from_json_token("{ser}")')
+
+ def test_jwt_equality(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ signer_a = jwt.JWT(header={"alg": "HS256"},
+ claims={"info": "I'm a signed token"})
+ signer_b = jwt.JWT(header={"alg": "HS256"},
+ claims={"info": "I'm a signed token"})
+ self.assertEqual(signer_a, signer_b)
+
+ signer_a.make_signed_token(key)
+ # One is signed, the other is not
+ self.assertNotEqual(signer_a, signer_b)
+
+ signer_b.make_signed_token(key)
+ # This kind of signature is deterministic so they should be equal
+ self.assertEqual(signer_a, signer_b)
+
+ signer_c = jwt.JWT.from_jose_token(signer_a.serialize())
+ self.assertNotEqual(signer_a, signer_c)
+ signer_c.validate(key)
+ self.assertEqual(signer_a, signer_c)
+
+ ea = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"},
+ claims=signer_a.serialize())
+ eb = jwt.JWT(header={"alg": "A256KW", "enc": "A256CBC-HS512"},
+ claims=signer_b.serialize())
+ self.assertEqual(ea, eb)
+
+ ea.make_encrypted_token(key)
+ # One is encrypted, the other is not
+ self.assertNotEqual(ea, eb)
+
+ eb.make_encrypted_token(key)
+ # Encryption generates a random CEK so tokens will always differ
+ self.assertNotEqual(ea, eb)
+
+ ect = jwt.JWT.from_jose_token(ea.serialize())
+ self.assertNotEqual(ea, ect)
+ ect.expected_type = "JWE"
+ ect.validate(key)
+ self.assertEqual(ea, ect)
+
+ def test_jwt_representations(self):
+ key = jwk.JWK.generate(kty='oct', size=256)
+ token = jwt.JWT(header={"alg": "HS256"},
+ claims={"info": "I'm a signed token"})
+ strrep = 'JWT(header={"alg":"HS256"}, claims={"info":"I\'m a ' + \
+ 'signed token"}, jwt=None, key=None, algs=None, ' + \
+ 'default_claims=None, check_claims=None)'
+ self.assertEqual(str(token), strrep)
+ self.assertEqual(repr(token), strrep)
+ token.make_signed_token(key)
+
+ ser = token.serialize()
+ self.assertEqual(str(token), ser)
+ ser2 = token.token.serialize()
+
+ reprrep = 'JWT(header={"alg":"HS256"}, ' + \
+ 'claims={"info":"I\'m a signed token"}, ' + \
+ f'jwt=JWS.from_json_token("{ser2}"), key=None, ' + \
+ 'algs=None, default_claims=None, check_claims=None)'
+ self.assertEqual(repr(token), reprrep)
=====================================
setup.py
=====================================
@@ -2,31 +2,42 @@
#
# Copyright (C) 2015 JWCrypto Project Contributors, see LICENSE file
+import os
from setuptools import setup
+# read the contents of your README file
+from pathlib import Path
+this_directory = Path(__file__).parent
+long_description = (this_directory / "README.md").read_text()
+
+version = None
+with open(os.path.join('jwcrypto', 'VERSION')) as verfile:
+ version = verfile.read().strip()
+
setup(
name = 'jwcrypto',
- version = '1.1',
+ version = version,
license = 'LGPLv3+',
maintainer = 'JWCrypto Project Contributors',
maintainer_email = 'simo at redhat.com',
url='https://github.com/latchset/jwcrypto',
packages = ['jwcrypto'],
description = 'Implementation of JOSE Web standards',
+ long_description=long_description,
+ long_description_content_type='text/markdown',
classifiers = [
- 'Programming Language :: Python :: 3.6',
- 'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
+ 'Programming Language :: Python :: 3.11',
'Intended Audience :: Developers',
'Topic :: Security',
'Topic :: Software Development :: Libraries :: Python Modules'
],
data_files = [('share/doc/jwcrypto', ['LICENSE', 'README.md'])],
install_requires = [
- 'cryptography >= 2.3',
- 'deprecated',
+ 'cryptography >= 3.4',
+ 'typing_extensions >= 4.5.0',
],
- python_requires = '>= 3.6',
+ python_requires = '>= 3.8',
)
=====================================
tox.ini
=====================================
@@ -1,10 +1,11 @@
[tox]
-envlist = lint,py36,py37,py38,py39,py310,pep8,doc,sphinx
+envlist = lint,py38,py39,py310,py311,pep8,doc,sphinx,doctest
skip_missing_interpreters = true
[testenv]
setenv =
PYTHONPATH = {envsitepackagesdir}
+ OPENSSL_ppccap = 0x0
deps =
pytest
coverage
@@ -15,7 +16,7 @@ commands =
{envpython} -m coverage report -m
[testenv:lint]
-basepython = python3.10
+basepython = python3.11
deps =
pylint
#sitepackages = True
@@ -23,7 +24,7 @@ commands =
{envpython} -m pylint -d c,r,i,W0613 -r n -f colorized --notes= --disable=star-args ./jwcrypto
[testenv:pep8]
-basepython = python3.10
+basepython = python3.11
deps =
flake8
flake8-import-order
@@ -36,18 +37,33 @@ deps =
doc8
docutils
markdown
-basepython = python3.10
+basepython = python3.11
commands =
doc8 --allow-long-titles README.md
markdown_py README.md -f {toxworkdir}/README.md.html
[testenv:sphinx]
-basepython = python3.10
+basepython = python3.11
changedir = docs/source
deps =
sphinx
commands =
- sphinx-build -v -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
+ sphinx-build -n -v -W -b html -d {envtmpdir}/doctrees . {envtmpdir}/html
+
+[testenv:doctest]
+basepython = python3.11
+changedir = docs/source
+deps =
+ sphinx
+commands =
+ sphinx-build -v -W -b doctest -d {envtmpdir}/doctrees . {envtmpdir}/doctest
+
+[testenv:codespell]
+basepython = python3.11
+deps =
+ codespell
+commands =
+ codespell --ignore-words-list="ser,ect" jwcrypto
[pytest]
python_files = jwcrypto/test*.py
View it on GitLab: https://salsa.debian.org/freeipa-team/python-jwcrypto/-/compare/ce1646c449c7b0f1c2dc6f170c12b529f2926450...b9432ef46fc8ee90c813469440ea86b049916e52
--
View it on GitLab: https://salsa.debian.org/freeipa-team/python-jwcrypto/-/compare/ce1646c449c7b0f1c2dc6f170c12b529f2926450...b9432ef46fc8ee90c813469440ea86b049916e52
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/pkg-freeipa-devel/attachments/20240215/56d43aad/attachment-0001.htm>
More information about the Pkg-freeipa-devel
mailing list