[med-svn] [Git][python-team/packages/python-xopen][upstream] New upstream version 1.7.0
Nilesh Patra (@nilesh)
gitlab at salsa.debian.org
Tue Dec 20 22:39:07 GMT 2022
Nilesh Patra pushed to branch upstream at Debian Python Team / packages / python-xopen
Commits:
0cba8325 by Nilesh Patra at 2022-12-21T03:30:29+05:30
New upstream version 1.7.0
- - - - -
21 changed files:
- + .gitattributes
- − .github/workflows/ci.yml
- + .pre-commit-config.yaml
- PKG-INFO
- README.rst
- pyproject.toml
- setup.cfg
- − setup.py
- src/xopen.egg-info/PKG-INFO
- src/xopen.egg-info/SOURCES.txt
- src/xopen.egg-info/requires.txt
- src/xopen/__init__.py
- src/xopen/_version.py
- + tests/conftest.py
- − tests/file.txt.bz2.test
- − tests/file.txt.gz.test
- − tests/file.txt.xz.test
- tests/file.txt.test → tests/file.txt.zst
- + tests/test_piped.py
- tests/test_xopen.py
- tox.ini
Changes:
=====================================
.gitattributes
=====================================
@@ -0,0 +1,2 @@
+tests/file.txt eol=lf
+tests/file.txt.test eol=lf
=====================================
.github/workflows/ci.yml deleted
=====================================
@@ -1,85 +0,0 @@
-name: CI
-
-on: [push, pull_request]
-
-jobs:
- lint:
- timeout-minutes: 10
- runs-on: ubuntu-latest
- strategy:
- matrix:
- python-version: [3.7]
- toxenv: [flake8, mypy]
- steps:
- - uses: actions/checkout at v2
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python at v2
- with:
- python-version: ${{ matrix.python-version }}
- - name: Install dependencies
- run: python -m pip install tox
- - name: Run tox ${{ matrix.toxenv }}
- run: tox -e ${{ matrix.toxenv }}
-
- test:
- timeout-minutes: 10
- runs-on: ${{ matrix.os }}
- strategy:
- matrix:
- os: [ubuntu-latest]
- python-version: ["3.6", "3.7", "3.8", "3.9", "pypy-3.7"]
- include:
- - os: macos-latest
- python-version: 3.7
- - os: ubuntu-20.04
- python-version: 3.7
- with-isal: true
- steps:
- - name: Install pigz and pbzip2 MacOS
- if: startsWith(matrix.os, 'macos')
- run: brew install pigz pbzip2
- - name: Install pigz and pbzip2 Linux
- if: startsWith(matrix.os, 'ubuntu')
- run: sudo apt-get install pigz pbzip2
- - name: Install isal
- if: matrix.with-isal && !startsWith(matrix.os, 'macos')
- run: sudo apt-get install isal libisal-dev
- - uses: actions/checkout at v2
- - name: Set up Python ${{ matrix.python-version }}
- uses: actions/setup-python at v2
- with:
- python-version: ${{ matrix.python-version }}
- - name: Install dependencies
- run: python -m pip install tox
- - name: Test
- run: tox -e py
- if: matrix.with-isal == null
- - name: Test with isal
- run: tox -e isal
- if: matrix.with-isal
- - name: Upload coverage report
- uses: codecov/codecov-action at v1
-
- deploy:
- timeout-minutes: 10
- runs-on: ubuntu-latest
- needs: [lint, test]
- if: startsWith(github.ref, 'refs/tags')
- steps:
- - uses: actions/checkout at v2
- with:
- fetch-depth: 0 # required for setuptools_scm
- - name: Set up Python
- uses: actions/setup-python at v2
- with:
- python-version: 3.7
- - name: Make distributions
- run: |
- python -m pip install build
- python -m build
- ls -l dist/
- - name: Publish to PyPI
- uses: pypa/gh-action-pypi-publish at v1.4.1
- with:
- user: __token__
- password: ${{ secrets.pypi_password }}
=====================================
.pre-commit-config.yaml
=====================================
@@ -0,0 +1,10 @@
+repos:
+- repo: https://github.com/pre-commit/pre-commit-hooks
+ rev: v2.3.0
+ hooks:
+ - id: end-of-file-fixer
+ - id: trailing-whitespace
+- repo: https://github.com/psf/black
+ rev: 22.3.0
+ hooks:
+ - id: black
=====================================
PKG-INFO
=====================================
@@ -1,24 +1,25 @@
Metadata-Version: 2.1
Name: xopen
-Version: 1.2.1
+Version: 1.7.0
Summary: Open compressed files transparently
Home-page: https://github.com/pycompression/xopen/
Author: Marcel Martin et al.
Author-email: mail at marcelm.net
License: MIT
-Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
-Requires-Python: >=3.6
+Requires-Python: >=3.7
+Description-Content-Type: text/x-rst
Provides-Extra: dev
+Provides-Extra: zstd
License-File: LICENSE
.. image:: https://github.com/pycompression/xopen/workflows/CI/badge.svg
:target: https://github.com/pycompression/xopen
- :alt:
-
-.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=master
+ :alt:
+
+.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=main
:target: https://pypi.python.org/pypi/xopen
.. image:: https://img.shields.io/conda/v/conda-forge/xopen.svg
@@ -33,34 +34,39 @@ License-File: LICENSE
xopen
=====
-This small Python module provides an ``xopen`` function that works like the
-built-in ``open`` function, but can also deal with compressed files.
-Supported compression formats are gzip, bzip2 and xz. They are automatically
-recognized by their file extensions `.gz`, `.bz2` or `.xz`.
-
-The focus is on being as efficient as possible on all supported Python versions.
-For example, ``xopen`` uses ``pigz``, which is a parallel version of ``gzip``,
-to open ``.gz`` files, which is faster than using the built-in ``gzip.open``
-function. ``pigz`` can use multiple threads when compressing, but is also faster
-when reading ``.gz`` files, so it is used both for reading and writing if it is
-available. For gzip compression levels 1 to 3,
-`igzip <https://github.com/intel/isa-l/>`_ is used for an even greater speedup.
-
-For use cases where using only the main thread is desired xopen can be used
-with ``threads=0``. This will use `python-isal
+This Python module provides an ``xopen`` function that works like the
+built-in ``open`` function but also transparently deals with compressed files.
+Supported compression formats are currently gzip, bzip2, xz and optionally Zstandard.
+
+``xopen`` selects the most efficient method for reading or writing a compressed file.
+This often means opening a pipe to an external tool, such as
+`pigz <https://zlib.net/pigz/>`_, which is a parallel version of ``gzip``,
+or `igzip <https://github.com/intel/isa-l/>`_, which is a highly optimized
+version of ``gzip``.
+
+If ``threads=0`` is passed to ``xopen()``, no external process is used..
+For gzip files, this will then use `python-isal
<https://github.com/pycompression/python-isal>`_ (which binds isa-l) if
-python-isal is installed (automatic on Linux systems, as it is a requirement).
-For installation instructions for python-isal please
-checkout the `python-isal homepage
-<https://github.com/pycompression/python-isal>`_. If python-isal is not
-available ``gzip.open`` is used.
+it is installed (since ``python-isal`` is a dependency of ``xopen``,
+this should always be the case).
+Neither ``igzip`` nor ``python-isal`` support compression levels
+greater 3, so if no external tool is available or ``threads`` has been set to 0,
+Python’s built-in ``gzip.open`` is used.
-This module has originally been developed as part of the `Cutadapt
-tool <https://cutadapt.readthedocs.io/>`_ that is used in bioinformatics to
-manipulate sequencing data. It has been in successful use within that software
-for a few years.
+For xz files, a pipe to the ``xz`` program is used because it has built-in support for multithreaded compression.
-``xopen`` is compatible with Python versions 3.6 and later.
+For bz2 files, `pbzip2 (parallel bzip2) <http://compression.ca/pbzip2/>`_ is used.
+
+``xopen`` falls back to Python’s built-in functions
+(``gzip.open``, ``lzma.open``, ``bz2.open``)
+if none of the other methods can be used.
+
+The file format to use is determined from the file name if the extension is recognized
+(``.gz``, ``.bz2``, ``.xz`` or ``.zst``).
+When reading a file without a recognized file extension, xopen attempts to detect the format
+by reading the first couple of bytes from the file.
+
+``xopen`` is compatible with Python versions 3.7 and later.
Usage
@@ -70,81 +76,131 @@ Open a file for reading::
from xopen import xopen
- with xopen('file.txt.xz') as f:
+ with xopen("file.txt.gz") as f:
content = f.read()
-Or without context manager::
+Write to a file in binary mode,
+set the compression level
+and avoid using an external process::
from xopen import xopen
- f = xopen('file.txt.xz')
- content = f.read()
- f.close()
+ with xopen("file.txt.xz", mode="wb", threads=0, compresslevel=3)
+ f.write(b"Hello")
-Open a file in binary mode for writing::
- from xopen import xopen
+Reproducibility
+---------------
- with xopen('file.txt.gz', mode='wb') as f:
- f.write(b'Hello')
+xopen writes gzip files in a reproducible manner.
+Normally, gzip files contain a timestamp in the file header,
+which means that compressing the same data at different times results in different output files.
+xopen disables this for all of the supported gzip compression backends.
+For example, when using an external process, it sets the command-line option
+``--no-name`` (same as ``-n``).
-Credits
--------
+Note that different gzip compression backends typically do not produce
+identical output, so reproducibility may no longer be given when the execution environment changes
+from one ``xopen()`` invocation to the next.
+This includes the CPU architecture as `igzip adjusts its algorithm
+depending on it <https://github.com/intel/isa-l/issues/140#issuecomment-634877966>`_.
-The name ``xopen`` was taken from the C function of the same name in the
-`utils.h file which is part of
-BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+bzip2 and xz compression methods do not store timestamps in the file headers,
+so output from them is also reproducible.
-Kyle Beauchamp <https://github.com/kyleabeauchamp/> has contributed support for
-appending to files.
-Ruben Vorderman <https://github.com/rhpvorderman/> contributed improvements to
-make reading and writing gzipped files faster.
+Optional Zstandard support
+--------------------------
-Benjamin Vaisvil <https://github.com/bvaisvil> contributed support for
-format detection from content.
+For reading and writing Zstandard (``.zst``) files, either the ``zstd`` command-line
+program or the Python ``zstandard`` package needs to be installed.
-Dries Schaumont <https://github.com/DriesSchaumont> contributed support for
-faster bz2 reading and writing using pbzip2.
+* If the ``threads`` parameter to ``xopen()`` is ``None`` (the default) or any value greater than 0,
+ ``xopen`` uses an external ``zstd`` process.
+* If the above fails (because no ``zstd`` program is available) or if ``threads`` is 0,
+ the ``zstandard`` package is used.
-Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
-If you also want to open S3 files, you may want to use that module instead.
+To ensure that you get the correct ``zstandard`` version, you can specify the ``zstd`` extra for
+``xopen``, that is, install it using ``pip install xopen[zstd]``.
-Changes
--------
+Changelog
+---------
-v1.2.0
-~~~~~~
+v1.7.0 (2022-11-03)
+~~~~~~~~~~~~~~~~~~~
+
+* #91: Added optional support for Zstandard (``.zst``) files.
+ This requires that the Python ``zstandard`` package is installed
+ or that the ``zstd`` command-line program is available.
+
+v1.6.0 (2022-08-10)
+~~~~~~~~~~~~~~~~~~~
+
+* #94: When writing gzip files, the timestamp and name of the original
+ file is omitted (equivalent to using ``gzip --no-name`` (or ``-n``) on the
+ command line). This allows files to be written in a reproducible manner.
+
+v1.5.0 (2022-03-23)
+~~~~~~~~~~~~~~~~~~~
+
+* #100: Dropped Python 3.6 support
+* #101: Added support for piping into and from an external ``xz`` process. Contributed by @fanninpm.
+* #102: Support setting the xz compression level. Contributed by @tsibley.
+
+v1.4.0 (2022-01-14)
+~~~~~~~~~~~~~~~~~~~
+
+* Add ``seek()`` and ``tell()`` to the ``PipedCompressionReader`` classes
+ (for Windows compatibility)
+
+v1.3.0 (2022-01-10)
+~~~~~~~~~~~~~~~~~~~
+
+* xopen is now available on Windows (in addition to Linux and macOS).
+* For greater compatibility with `the built-in open()
+ function <https://docs.python.org/3/library/functions.html#open>`_,
+ ``xopen()`` has gained the parameters *encoding*, *errors* and *newlines*
+ with the same meaning as in ``open()``. Unlike built-in ``open()``, though,
+ encoding is UTF-8 by default.
+* A parameter *format* has been added that allows to force the compression
+ file format.
+
+v1.2.0 (2021-09-21)
+~~~~~~~~~~~~~~~~~~~
* `pbzip2 <http://compression.ca/pbzip2/>`_ is now used to open ``.bz2`` files if
- ``threads`` is greater than zero.
+ ``threads`` is greater than zero (contributed by @DriesSchaumont).
+
+v1.1.0 (2021-01-20)
+~~~~~~~~~~~~~~~~~~~
-v1.1.0
-~~~~~~
* Python 3.5 support is dropped.
* On Linux systems, `python-isal <https://github.com/pycompression/python-isal>`_
is now added as a requirement. This will speed up the reading of gzip files
significantly when no external processes are used.
-v1.0.0
-~~~~~~
+v1.0.0 (2020-11-05)
+~~~~~~~~~~~~~~~~~~~
+
* If installed, the ``igzip`` program (part of
`Intel ISA-L <https://github.com/intel/isa-l/>`_) is now used for reading
and writing gzip-compressed files at compression levels 1-3, which results
in a significant speedup.
-v0.9.0
-~~~~~~
-* When the file name extension of a file to be opened for reading is not
+v0.9.0 (2020-04-02)
+~~~~~~~~~~~~~~~~~~~
+
+* #80: When the file name extension of a file to be opened for reading is not
available, the content is inspected (if possible) and used to determine
- which compression format applies.
+ which compression format applies (contributed by @bvaisvil).
* This release drops Python 2.7 and 3.4 support. Python 3.5 or later is
now required.
-v0.8.4
-~~~~~~
+v0.8.4 (2019-10-24)
+~~~~~~~~~~~~~~~~~~~
+
* When reading gzipped files, force ``pigz`` to use only a single process.
``pigz`` cannot use multiple cores anyway when decompressing. By default,
it would use extra I/O processes, which slightly reduces wall-clock time,
@@ -153,35 +209,88 @@ v0.8.4
* Allow ``threads=0`` for specifying that no external ``pigz``/``gzip``
process should be used (then regular ``gzip.open()`` is used instead).
-v0.8.3
-~~~~~~
-* When reading gzipped files, let ``pigz`` use at most four threads by default.
- This limit previously only applied when writing to a file.
+v0.8.3 (2019-10-18)
+~~~~~~~~~~~~~~~~~~~
+
+* #20: When reading gzipped files, let ``pigz`` use at most four threads by default.
+ This limit previously only applied when writing to a file. Contributed by @bernt-matthias.
* Support Python 3.8
-v0.8.0
-~~~~~~
-* Speed improvements when iterating over gzipped files.
+v0.8.0 (2019-08-14)
+~~~~~~~~~~~~~~~~~~~
+
+* #14: Speed improvements when iterating over gzipped files.
+
+v0.6.0 (2019-05-23)
+~~~~~~~~~~~~~~~~~~~
-v0.6.0
-~~~~~~
* For reading from gzipped files, xopen will now use a ``pigz`` subprocess.
This is faster than using ``gzip.open``.
* Python 2 support will be dropped in one of the next releases.
-v0.5.0
-~~~~~~
+v0.5.0 (2019-01-30)
+~~~~~~~~~~~~~~~~~~~
+
* By default, pigz is now only allowed to use at most four threads. This hopefully reduces
problems some users had with too many threads when opening many files at the same time.
* xopen now accepts pathlib.Path objects.
+v0.4.0 (2019-01-07)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 3.3 support
+* Add a ``threads`` parameter (passed on to ``pigz``)
+
+v0.3.2 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* #6: Make multi-block bz2 work on Python 2 by using external bz2file library.
+
+v0.3.1 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 2.6 support
+* #5: Fix PipedGzipReader.read() not returning anything
+
+v0.3.0 (2017-11-15)
+~~~~~~~~~~~~~~~~~~~
+
+* Add gzip compression parameter
+
+v0.2.1 (2017-05-31)
+~~~~~~~~~~~~~~~~~~~
+
+* #3: Allow appending to bz2 and lzma files where possible
+
+v0.1.1 (2016-12-02)
+~~~~~~~~~~~~~~~~~~~
+
+* Fix a deadlock
+
+v0.1.0 (2016-09-09)
+~~~~~~~~~~~~~~~~~~~
-Contributors
-------------
+* Initial release
+
+Credits
+-------
+
+The name ``xopen`` was taken from the C function of the same name in the
+`utils.h file which is part of
+BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+
+Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
+If you also want to open S3 files, you may want to use that module instead.
+
+ at kyleabeauchamp contributed support for appending to files before this repository was created.
+
+
+Maintainers
+-----------
* Marcel Martin
* Ruben Vorderman
-* For more contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
+* For a list of contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
Links
@@ -190,5 +299,3 @@ Links
* `Source code <https://github.com/pycompression/xopen/>`_
* `Report an issue <https://github.com/pycompression/xopen/issues>`_
* `Project page on PyPI (Python package index) <https://pypi.python.org/pypi/xopen/>`_
-
-
=====================================
README.rst
=====================================
@@ -1,8 +1,8 @@
.. image:: https://github.com/pycompression/xopen/workflows/CI/badge.svg
:target: https://github.com/pycompression/xopen
- :alt:
-
-.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=master
+ :alt:
+
+.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=main
:target: https://pypi.python.org/pypi/xopen
.. image:: https://img.shields.io/conda/v/conda-forge/xopen.svg
@@ -17,34 +17,39 @@
xopen
=====
-This small Python module provides an ``xopen`` function that works like the
-built-in ``open`` function, but can also deal with compressed files.
-Supported compression formats are gzip, bzip2 and xz. They are automatically
-recognized by their file extensions `.gz`, `.bz2` or `.xz`.
-
-The focus is on being as efficient as possible on all supported Python versions.
-For example, ``xopen`` uses ``pigz``, which is a parallel version of ``gzip``,
-to open ``.gz`` files, which is faster than using the built-in ``gzip.open``
-function. ``pigz`` can use multiple threads when compressing, but is also faster
-when reading ``.gz`` files, so it is used both for reading and writing if it is
-available. For gzip compression levels 1 to 3,
-`igzip <https://github.com/intel/isa-l/>`_ is used for an even greater speedup.
-
-For use cases where using only the main thread is desired xopen can be used
-with ``threads=0``. This will use `python-isal
+This Python module provides an ``xopen`` function that works like the
+built-in ``open`` function but also transparently deals with compressed files.
+Supported compression formats are currently gzip, bzip2, xz and optionally Zstandard.
+
+``xopen`` selects the most efficient method for reading or writing a compressed file.
+This often means opening a pipe to an external tool, such as
+`pigz <https://zlib.net/pigz/>`_, which is a parallel version of ``gzip``,
+or `igzip <https://github.com/intel/isa-l/>`_, which is a highly optimized
+version of ``gzip``.
+
+If ``threads=0`` is passed to ``xopen()``, no external process is used..
+For gzip files, this will then use `python-isal
<https://github.com/pycompression/python-isal>`_ (which binds isa-l) if
-python-isal is installed (automatic on Linux systems, as it is a requirement).
-For installation instructions for python-isal please
-checkout the `python-isal homepage
-<https://github.com/pycompression/python-isal>`_. If python-isal is not
-available ``gzip.open`` is used.
+it is installed (since ``python-isal`` is a dependency of ``xopen``,
+this should always be the case).
+Neither ``igzip`` nor ``python-isal`` support compression levels
+greater 3, so if no external tool is available or ``threads`` has been set to 0,
+Python’s built-in ``gzip.open`` is used.
-This module has originally been developed as part of the `Cutadapt
-tool <https://cutadapt.readthedocs.io/>`_ that is used in bioinformatics to
-manipulate sequencing data. It has been in successful use within that software
-for a few years.
+For xz files, a pipe to the ``xz`` program is used because it has built-in support for multithreaded compression.
-``xopen`` is compatible with Python versions 3.6 and later.
+For bz2 files, `pbzip2 (parallel bzip2) <http://compression.ca/pbzip2/>`_ is used.
+
+``xopen`` falls back to Python’s built-in functions
+(``gzip.open``, ``lzma.open``, ``bz2.open``)
+if none of the other methods can be used.
+
+The file format to use is determined from the file name if the extension is recognized
+(``.gz``, ``.bz2``, ``.xz`` or ``.zst``).
+When reading a file without a recognized file extension, xopen attempts to detect the format
+by reading the first couple of bytes from the file.
+
+``xopen`` is compatible with Python versions 3.7 and later.
Usage
@@ -54,81 +59,131 @@ Open a file for reading::
from xopen import xopen
- with xopen('file.txt.xz') as f:
+ with xopen("file.txt.gz") as f:
content = f.read()
-Or without context manager::
+Write to a file in binary mode,
+set the compression level
+and avoid using an external process::
from xopen import xopen
- f = xopen('file.txt.xz')
- content = f.read()
- f.close()
+ with xopen("file.txt.xz", mode="wb", threads=0, compresslevel=3)
+ f.write(b"Hello")
-Open a file in binary mode for writing::
- from xopen import xopen
+Reproducibility
+---------------
- with xopen('file.txt.gz', mode='wb') as f:
- f.write(b'Hello')
+xopen writes gzip files in a reproducible manner.
+Normally, gzip files contain a timestamp in the file header,
+which means that compressing the same data at different times results in different output files.
+xopen disables this for all of the supported gzip compression backends.
+For example, when using an external process, it sets the command-line option
+``--no-name`` (same as ``-n``).
-Credits
--------
+Note that different gzip compression backends typically do not produce
+identical output, so reproducibility may no longer be given when the execution environment changes
+from one ``xopen()`` invocation to the next.
+This includes the CPU architecture as `igzip adjusts its algorithm
+depending on it <https://github.com/intel/isa-l/issues/140#issuecomment-634877966>`_.
-The name ``xopen`` was taken from the C function of the same name in the
-`utils.h file which is part of
-BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+bzip2 and xz compression methods do not store timestamps in the file headers,
+so output from them is also reproducible.
-Kyle Beauchamp <https://github.com/kyleabeauchamp/> has contributed support for
-appending to files.
-Ruben Vorderman <https://github.com/rhpvorderman/> contributed improvements to
-make reading and writing gzipped files faster.
+Optional Zstandard support
+--------------------------
-Benjamin Vaisvil <https://github.com/bvaisvil> contributed support for
-format detection from content.
+For reading and writing Zstandard (``.zst``) files, either the ``zstd`` command-line
+program or the Python ``zstandard`` package needs to be installed.
-Dries Schaumont <https://github.com/DriesSchaumont> contributed support for
-faster bz2 reading and writing using pbzip2.
+* If the ``threads`` parameter to ``xopen()`` is ``None`` (the default) or any value greater than 0,
+ ``xopen`` uses an external ``zstd`` process.
+* If the above fails (because no ``zstd`` program is available) or if ``threads`` is 0,
+ the ``zstandard`` package is used.
-Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
-If you also want to open S3 files, you may want to use that module instead.
+To ensure that you get the correct ``zstandard`` version, you can specify the ``zstd`` extra for
+``xopen``, that is, install it using ``pip install xopen[zstd]``.
-Changes
--------
+Changelog
+---------
+
+v1.7.0 (2022-11-03)
+~~~~~~~~~~~~~~~~~~~
+
+* #91: Added optional support for Zstandard (``.zst``) files.
+ This requires that the Python ``zstandard`` package is installed
+ or that the ``zstd`` command-line program is available.
+
+v1.6.0 (2022-08-10)
+~~~~~~~~~~~~~~~~~~~
+
+* #94: When writing gzip files, the timestamp and name of the original
+ file is omitted (equivalent to using ``gzip --no-name`` (or ``-n``) on the
+ command line). This allows files to be written in a reproducible manner.
+
+v1.5.0 (2022-03-23)
+~~~~~~~~~~~~~~~~~~~
+
+* #100: Dropped Python 3.6 support
+* #101: Added support for piping into and from an external ``xz`` process. Contributed by @fanninpm.
+* #102: Support setting the xz compression level. Contributed by @tsibley.
+
+v1.4.0 (2022-01-14)
+~~~~~~~~~~~~~~~~~~~
-v1.2.0
-~~~~~~
+* Add ``seek()`` and ``tell()`` to the ``PipedCompressionReader`` classes
+ (for Windows compatibility)
+
+v1.3.0 (2022-01-10)
+~~~~~~~~~~~~~~~~~~~
+
+* xopen is now available on Windows (in addition to Linux and macOS).
+* For greater compatibility with `the built-in open()
+ function <https://docs.python.org/3/library/functions.html#open>`_,
+ ``xopen()`` has gained the parameters *encoding*, *errors* and *newlines*
+ with the same meaning as in ``open()``. Unlike built-in ``open()``, though,
+ encoding is UTF-8 by default.
+* A parameter *format* has been added that allows to force the compression
+ file format.
+
+v1.2.0 (2021-09-21)
+~~~~~~~~~~~~~~~~~~~
* `pbzip2 <http://compression.ca/pbzip2/>`_ is now used to open ``.bz2`` files if
- ``threads`` is greater than zero.
+ ``threads`` is greater than zero (contributed by @DriesSchaumont).
+
+v1.1.0 (2021-01-20)
+~~~~~~~~~~~~~~~~~~~
-v1.1.0
-~~~~~~
* Python 3.5 support is dropped.
* On Linux systems, `python-isal <https://github.com/pycompression/python-isal>`_
is now added as a requirement. This will speed up the reading of gzip files
significantly when no external processes are used.
-v1.0.0
-~~~~~~
+v1.0.0 (2020-11-05)
+~~~~~~~~~~~~~~~~~~~
+
* If installed, the ``igzip`` program (part of
`Intel ISA-L <https://github.com/intel/isa-l/>`_) is now used for reading
and writing gzip-compressed files at compression levels 1-3, which results
in a significant speedup.
-v0.9.0
-~~~~~~
-* When the file name extension of a file to be opened for reading is not
+v0.9.0 (2020-04-02)
+~~~~~~~~~~~~~~~~~~~
+
+* #80: When the file name extension of a file to be opened for reading is not
available, the content is inspected (if possible) and used to determine
- which compression format applies.
+ which compression format applies (contributed by @bvaisvil).
* This release drops Python 2.7 and 3.4 support. Python 3.5 or later is
now required.
-v0.8.4
-~~~~~~
+v0.8.4 (2019-10-24)
+~~~~~~~~~~~~~~~~~~~
+
* When reading gzipped files, force ``pigz`` to use only a single process.
``pigz`` cannot use multiple cores anyway when decompressing. By default,
it would use extra I/O processes, which slightly reduces wall-clock time,
@@ -137,35 +192,88 @@ v0.8.4
* Allow ``threads=0`` for specifying that no external ``pigz``/``gzip``
process should be used (then regular ``gzip.open()`` is used instead).
-v0.8.3
-~~~~~~
-* When reading gzipped files, let ``pigz`` use at most four threads by default.
- This limit previously only applied when writing to a file.
+v0.8.3 (2019-10-18)
+~~~~~~~~~~~~~~~~~~~
+
+* #20: When reading gzipped files, let ``pigz`` use at most four threads by default.
+ This limit previously only applied when writing to a file. Contributed by @bernt-matthias.
* Support Python 3.8
-v0.8.0
-~~~~~~
-* Speed improvements when iterating over gzipped files.
+v0.8.0 (2019-08-14)
+~~~~~~~~~~~~~~~~~~~
+
+* #14: Speed improvements when iterating over gzipped files.
+
+v0.6.0 (2019-05-23)
+~~~~~~~~~~~~~~~~~~~
-v0.6.0
-~~~~~~
* For reading from gzipped files, xopen will now use a ``pigz`` subprocess.
This is faster than using ``gzip.open``.
* Python 2 support will be dropped in one of the next releases.
-v0.5.0
-~~~~~~
+v0.5.0 (2019-01-30)
+~~~~~~~~~~~~~~~~~~~
+
* By default, pigz is now only allowed to use at most four threads. This hopefully reduces
problems some users had with too many threads when opening many files at the same time.
* xopen now accepts pathlib.Path objects.
+v0.4.0 (2019-01-07)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 3.3 support
+* Add a ``threads`` parameter (passed on to ``pigz``)
+
+v0.3.2 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* #6: Make multi-block bz2 work on Python 2 by using external bz2file library.
+
+v0.3.1 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 2.6 support
+* #5: Fix PipedGzipReader.read() not returning anything
+
+v0.3.0 (2017-11-15)
+~~~~~~~~~~~~~~~~~~~
+
+* Add gzip compression parameter
+
+v0.2.1 (2017-05-31)
+~~~~~~~~~~~~~~~~~~~
+
+* #3: Allow appending to bz2 and lzma files where possible
+
+v0.1.1 (2016-12-02)
+~~~~~~~~~~~~~~~~~~~
+
+* Fix a deadlock
+
+v0.1.0 (2016-09-09)
+~~~~~~~~~~~~~~~~~~~
+
+* Initial release
+
+Credits
+-------
+
+The name ``xopen`` was taken from the C function of the same name in the
+`utils.h file which is part of
+BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+
+Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
+If you also want to open S3 files, you may want to use that module instead.
+
+ at kyleabeauchamp contributed support for appending to files before this repository was created.
+
-Contributors
-------------
+Maintainers
+-----------
* Marcel Martin
* Ruben Vorderman
-* For more contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
+* For a list of contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
Links
=====================================
pyproject.toml
=====================================
@@ -1,5 +1,9 @@
[build-system]
requires = ["setuptools", "wheel", "setuptools_scm>=6.2"]
+build-backend = "setuptools.build_meta"
[tool.setuptools_scm]
write_to = "src/xopen/_version.py"
+
+[tool.pytest.ini_options]
+addopts = "--strict-markers"
=====================================
setup.cfg
=====================================
@@ -5,6 +5,7 @@ author_email = mail at marcelm.net
url = https://github.com/pycompression/xopen/
description = Open compressed files transparently
long_description = file: README.rst
+long_description_content_type = text/x-rst
license = MIT
classifiers =
Development Status :: 5 - Production/Stable
@@ -12,12 +13,13 @@ classifiers =
Programming Language :: Python :: 3
[options]
-python_requires = >=3.6
+python_requires = >=3.7
package_dir =
=src
packages = find:
install_requires =
- isal>=0.9.0; platform_machine == "x86_64" or platform_machine == "AMD64" or platform_machine == "aarch64"
+ isal>=1.0.0; platform.python_implementation == 'CPython' and (platform.machine == "x86_64" or platform.machine == "AMD64")
+ typing_extensions; python_version<'3.8'
[options.packages.find]
where = src
@@ -27,6 +29,7 @@ where = src
[options.extras_require]
dev = pytest
+zstd = zstandard<1
[egg_info]
tag_build =
=====================================
setup.py deleted
=====================================
@@ -1,3 +0,0 @@
-from setuptools import setup
-
-setup(setup_requires=["setuptools_scm"])
=====================================
src/xopen.egg-info/PKG-INFO
=====================================
@@ -1,24 +1,25 @@
Metadata-Version: 2.1
Name: xopen
-Version: 1.2.1
+Version: 1.7.0
Summary: Open compressed files transparently
Home-page: https://github.com/pycompression/xopen/
Author: Marcel Martin et al.
Author-email: mail at marcelm.net
License: MIT
-Platform: UNKNOWN
Classifier: Development Status :: 5 - Production/Stable
Classifier: License :: OSI Approved :: MIT License
Classifier: Programming Language :: Python :: 3
-Requires-Python: >=3.6
+Requires-Python: >=3.7
+Description-Content-Type: text/x-rst
Provides-Extra: dev
+Provides-Extra: zstd
License-File: LICENSE
.. image:: https://github.com/pycompression/xopen/workflows/CI/badge.svg
:target: https://github.com/pycompression/xopen
- :alt:
-
-.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=master
+ :alt:
+
+.. image:: https://img.shields.io/pypi/v/xopen.svg?branch=main
:target: https://pypi.python.org/pypi/xopen
.. image:: https://img.shields.io/conda/v/conda-forge/xopen.svg
@@ -33,34 +34,39 @@ License-File: LICENSE
xopen
=====
-This small Python module provides an ``xopen`` function that works like the
-built-in ``open`` function, but can also deal with compressed files.
-Supported compression formats are gzip, bzip2 and xz. They are automatically
-recognized by their file extensions `.gz`, `.bz2` or `.xz`.
-
-The focus is on being as efficient as possible on all supported Python versions.
-For example, ``xopen`` uses ``pigz``, which is a parallel version of ``gzip``,
-to open ``.gz`` files, which is faster than using the built-in ``gzip.open``
-function. ``pigz`` can use multiple threads when compressing, but is also faster
-when reading ``.gz`` files, so it is used both for reading and writing if it is
-available. For gzip compression levels 1 to 3,
-`igzip <https://github.com/intel/isa-l/>`_ is used for an even greater speedup.
-
-For use cases where using only the main thread is desired xopen can be used
-with ``threads=0``. This will use `python-isal
+This Python module provides an ``xopen`` function that works like the
+built-in ``open`` function but also transparently deals with compressed files.
+Supported compression formats are currently gzip, bzip2, xz and optionally Zstandard.
+
+``xopen`` selects the most efficient method for reading or writing a compressed file.
+This often means opening a pipe to an external tool, such as
+`pigz <https://zlib.net/pigz/>`_, which is a parallel version of ``gzip``,
+or `igzip <https://github.com/intel/isa-l/>`_, which is a highly optimized
+version of ``gzip``.
+
+If ``threads=0`` is passed to ``xopen()``, no external process is used..
+For gzip files, this will then use `python-isal
<https://github.com/pycompression/python-isal>`_ (which binds isa-l) if
-python-isal is installed (automatic on Linux systems, as it is a requirement).
-For installation instructions for python-isal please
-checkout the `python-isal homepage
-<https://github.com/pycompression/python-isal>`_. If python-isal is not
-available ``gzip.open`` is used.
+it is installed (since ``python-isal`` is a dependency of ``xopen``,
+this should always be the case).
+Neither ``igzip`` nor ``python-isal`` support compression levels
+greater 3, so if no external tool is available or ``threads`` has been set to 0,
+Python’s built-in ``gzip.open`` is used.
-This module has originally been developed as part of the `Cutadapt
-tool <https://cutadapt.readthedocs.io/>`_ that is used in bioinformatics to
-manipulate sequencing data. It has been in successful use within that software
-for a few years.
+For xz files, a pipe to the ``xz`` program is used because it has built-in support for multithreaded compression.
-``xopen`` is compatible with Python versions 3.6 and later.
+For bz2 files, `pbzip2 (parallel bzip2) <http://compression.ca/pbzip2/>`_ is used.
+
+``xopen`` falls back to Python’s built-in functions
+(``gzip.open``, ``lzma.open``, ``bz2.open``)
+if none of the other methods can be used.
+
+The file format to use is determined from the file name if the extension is recognized
+(``.gz``, ``.bz2``, ``.xz`` or ``.zst``).
+When reading a file without a recognized file extension, xopen attempts to detect the format
+by reading the first couple of bytes from the file.
+
+``xopen`` is compatible with Python versions 3.7 and later.
Usage
@@ -70,81 +76,131 @@ Open a file for reading::
from xopen import xopen
- with xopen('file.txt.xz') as f:
+ with xopen("file.txt.gz") as f:
content = f.read()
-Or without context manager::
+Write to a file in binary mode,
+set the compression level
+and avoid using an external process::
from xopen import xopen
- f = xopen('file.txt.xz')
- content = f.read()
- f.close()
+ with xopen("file.txt.xz", mode="wb", threads=0, compresslevel=3)
+ f.write(b"Hello")
-Open a file in binary mode for writing::
- from xopen import xopen
+Reproducibility
+---------------
- with xopen('file.txt.gz', mode='wb') as f:
- f.write(b'Hello')
+xopen writes gzip files in a reproducible manner.
+Normally, gzip files contain a timestamp in the file header,
+which means that compressing the same data at different times results in different output files.
+xopen disables this for all of the supported gzip compression backends.
+For example, when using an external process, it sets the command-line option
+``--no-name`` (same as ``-n``).
-Credits
--------
+Note that different gzip compression backends typically do not produce
+identical output, so reproducibility may no longer be given when the execution environment changes
+from one ``xopen()`` invocation to the next.
+This includes the CPU architecture as `igzip adjusts its algorithm
+depending on it <https://github.com/intel/isa-l/issues/140#issuecomment-634877966>`_.
-The name ``xopen`` was taken from the C function of the same name in the
-`utils.h file which is part of
-BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+bzip2 and xz compression methods do not store timestamps in the file headers,
+so output from them is also reproducible.
-Kyle Beauchamp <https://github.com/kyleabeauchamp/> has contributed support for
-appending to files.
-Ruben Vorderman <https://github.com/rhpvorderman/> contributed improvements to
-make reading and writing gzipped files faster.
+Optional Zstandard support
+--------------------------
-Benjamin Vaisvil <https://github.com/bvaisvil> contributed support for
-format detection from content.
+For reading and writing Zstandard (``.zst``) files, either the ``zstd`` command-line
+program or the Python ``zstandard`` package needs to be installed.
-Dries Schaumont <https://github.com/DriesSchaumont> contributed support for
-faster bz2 reading and writing using pbzip2.
+* If the ``threads`` parameter to ``xopen()`` is ``None`` (the default) or any value greater than 0,
+ ``xopen`` uses an external ``zstd`` process.
+* If the above fails (because no ``zstd`` program is available) or if ``threads`` is 0,
+ the ``zstandard`` package is used.
-Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
-If you also want to open S3 files, you may want to use that module instead.
+To ensure that you get the correct ``zstandard`` version, you can specify the ``zstd`` extra for
+``xopen``, that is, install it using ``pip install xopen[zstd]``.
-Changes
--------
+Changelog
+---------
-v1.2.0
-~~~~~~
+v1.7.0 (2022-11-03)
+~~~~~~~~~~~~~~~~~~~
+
+* #91: Added optional support for Zstandard (``.zst``) files.
+ This requires that the Python ``zstandard`` package is installed
+ or that the ``zstd`` command-line program is available.
+
+v1.6.0 (2022-08-10)
+~~~~~~~~~~~~~~~~~~~
+
+* #94: When writing gzip files, the timestamp and name of the original
+ file is omitted (equivalent to using ``gzip --no-name`` (or ``-n``) on the
+ command line). This allows files to be written in a reproducible manner.
+
+v1.5.0 (2022-03-23)
+~~~~~~~~~~~~~~~~~~~
+
+* #100: Dropped Python 3.6 support
+* #101: Added support for piping into and from an external ``xz`` process. Contributed by @fanninpm.
+* #102: Support setting the xz compression level. Contributed by @tsibley.
+
+v1.4.0 (2022-01-14)
+~~~~~~~~~~~~~~~~~~~
+
+* Add ``seek()`` and ``tell()`` to the ``PipedCompressionReader`` classes
+ (for Windows compatibility)
+
+v1.3.0 (2022-01-10)
+~~~~~~~~~~~~~~~~~~~
+
+* xopen is now available on Windows (in addition to Linux and macOS).
+* For greater compatibility with `the built-in open()
+ function <https://docs.python.org/3/library/functions.html#open>`_,
+ ``xopen()`` has gained the parameters *encoding*, *errors* and *newlines*
+ with the same meaning as in ``open()``. Unlike built-in ``open()``, though,
+ encoding is UTF-8 by default.
+* A parameter *format* has been added that allows to force the compression
+ file format.
+
+v1.2.0 (2021-09-21)
+~~~~~~~~~~~~~~~~~~~
* `pbzip2 <http://compression.ca/pbzip2/>`_ is now used to open ``.bz2`` files if
- ``threads`` is greater than zero.
+ ``threads`` is greater than zero (contributed by @DriesSchaumont).
+
+v1.1.0 (2021-01-20)
+~~~~~~~~~~~~~~~~~~~
-v1.1.0
-~~~~~~
* Python 3.5 support is dropped.
* On Linux systems, `python-isal <https://github.com/pycompression/python-isal>`_
is now added as a requirement. This will speed up the reading of gzip files
significantly when no external processes are used.
-v1.0.0
-~~~~~~
+v1.0.0 (2020-11-05)
+~~~~~~~~~~~~~~~~~~~
+
* If installed, the ``igzip`` program (part of
`Intel ISA-L <https://github.com/intel/isa-l/>`_) is now used for reading
and writing gzip-compressed files at compression levels 1-3, which results
in a significant speedup.
-v0.9.0
-~~~~~~
-* When the file name extension of a file to be opened for reading is not
+v0.9.0 (2020-04-02)
+~~~~~~~~~~~~~~~~~~~
+
+* #80: When the file name extension of a file to be opened for reading is not
available, the content is inspected (if possible) and used to determine
- which compression format applies.
+ which compression format applies (contributed by @bvaisvil).
* This release drops Python 2.7 and 3.4 support. Python 3.5 or later is
now required.
-v0.8.4
-~~~~~~
+v0.8.4 (2019-10-24)
+~~~~~~~~~~~~~~~~~~~
+
* When reading gzipped files, force ``pigz`` to use only a single process.
``pigz`` cannot use multiple cores anyway when decompressing. By default,
it would use extra I/O processes, which slightly reduces wall-clock time,
@@ -153,35 +209,88 @@ v0.8.4
* Allow ``threads=0`` for specifying that no external ``pigz``/``gzip``
process should be used (then regular ``gzip.open()`` is used instead).
-v0.8.3
-~~~~~~
-* When reading gzipped files, let ``pigz`` use at most four threads by default.
- This limit previously only applied when writing to a file.
+v0.8.3 (2019-10-18)
+~~~~~~~~~~~~~~~~~~~
+
+* #20: When reading gzipped files, let ``pigz`` use at most four threads by default.
+ This limit previously only applied when writing to a file. Contributed by @bernt-matthias.
* Support Python 3.8
-v0.8.0
-~~~~~~
-* Speed improvements when iterating over gzipped files.
+v0.8.0 (2019-08-14)
+~~~~~~~~~~~~~~~~~~~
+
+* #14: Speed improvements when iterating over gzipped files.
+
+v0.6.0 (2019-05-23)
+~~~~~~~~~~~~~~~~~~~
-v0.6.0
-~~~~~~
* For reading from gzipped files, xopen will now use a ``pigz`` subprocess.
This is faster than using ``gzip.open``.
* Python 2 support will be dropped in one of the next releases.
-v0.5.0
-~~~~~~
+v0.5.0 (2019-01-30)
+~~~~~~~~~~~~~~~~~~~
+
* By default, pigz is now only allowed to use at most four threads. This hopefully reduces
problems some users had with too many threads when opening many files at the same time.
* xopen now accepts pathlib.Path objects.
+v0.4.0 (2019-01-07)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 3.3 support
+* Add a ``threads`` parameter (passed on to ``pigz``)
+
+v0.3.2 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* #6: Make multi-block bz2 work on Python 2 by using external bz2file library.
+
+v0.3.1 (2017-11-22)
+~~~~~~~~~~~~~~~~~~~
+
+* Drop Python 2.6 support
+* #5: Fix PipedGzipReader.read() not returning anything
+
+v0.3.0 (2017-11-15)
+~~~~~~~~~~~~~~~~~~~
+
+* Add gzip compression parameter
+
+v0.2.1 (2017-05-31)
+~~~~~~~~~~~~~~~~~~~
+
+* #3: Allow appending to bz2 and lzma files where possible
+
+v0.1.1 (2016-12-02)
+~~~~~~~~~~~~~~~~~~~
+
+* Fix a deadlock
+
+v0.1.0 (2016-09-09)
+~~~~~~~~~~~~~~~~~~~
-Contributors
-------------
+* Initial release
+
+Credits
+-------
+
+The name ``xopen`` was taken from the C function of the same name in the
+`utils.h file which is part of
+BWA <https://github.com/lh3/bwa/blob/83662032a2192d5712996f36069ab02db82acf67/utils.h>`_.
+
+Some ideas were taken from the `canopener project <https://github.com/selassid/canopener>`_.
+If you also want to open S3 files, you may want to use that module instead.
+
+ at kyleabeauchamp contributed support for appending to files before this repository was created.
+
+
+Maintainers
+-----------
* Marcel Martin
* Ruben Vorderman
-* For more contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
+* For a list of contributors, see <https://github.com/pycompression/xopen/graphs/contributors>
Links
@@ -190,5 +299,3 @@ Links
* `Source code <https://github.com/pycompression/xopen/>`_
* `Report an issue <https://github.com/pycompression/xopen/issues>`_
* `Project page on PyPI (Python package index) <https://pypi.python.org/pypi/xopen/>`_
-
-
=====================================
src/xopen.egg-info/SOURCES.txt
=====================================
@@ -1,11 +1,12 @@
.codecov.yml
.editorconfig
+.gitattributes
.gitignore
+.pre-commit-config.yaml
LICENSE
README.rst
pyproject.toml
setup.cfg
-setup.py
tox.ini
.github/workflows/ci.yml
src/xopen/__init__.py
@@ -17,13 +18,12 @@ src/xopen.egg-info/SOURCES.txt
src/xopen.egg-info/dependency_links.txt
src/xopen.egg-info/requires.txt
src/xopen.egg-info/top_level.txt
+tests/conftest.py
tests/file.txt
tests/file.txt.bz2
-tests/file.txt.bz2.test
tests/file.txt.gz
-tests/file.txt.gz.test
-tests/file.txt.test
tests/file.txt.xz
-tests/file.txt.xz.test
+tests/file.txt.zst
tests/hello.gz
+tests/test_piped.py
tests/test_xopen.py
\ No newline at end of file
=====================================
src/xopen.egg-info/requires.txt
=====================================
@@ -1,6 +1,12 @@
-[:platform_machine == "x86_64" or platform_machine == "AMD64" or platform_machine == "aarch64"]
-isal>=0.9.0
+[:platform_python_implementation == "CPython" and (platform_machine == "x86_64" or platform_machine == "AMD64")]
+isal>=1.0.0
+
+[:python_version < "3.8"]
+typing_extensions
[dev]
pytest
+
+[zstd]
+zstandard<1
=====================================
src/xopen/__init__.py
=====================================
@@ -12,6 +12,10 @@ __all__ = [
"PipedPigzWriter",
"PipedPBzip2Reader",
"PipedPBzip2Writer",
+ "PipedXzReader",
+ "PipedXzWriter",
+ "PipedZstdReader",
+ "PipedZstdWriter",
"PipedPythonIsalReader",
"PipedPythonIsalWriter",
"__version__",
@@ -31,10 +35,18 @@ import tempfile
import time
from abc import ABC, abstractmethod
from subprocess import Popen, PIPE, DEVNULL
-from typing import Optional, TextIO, AnyStr, IO, List, Set
+from typing import Optional, Union, TextIO, AnyStr, IO, List, Set, overload, BinaryIO
+
+if sys.version_info >= (3, 8):
+ from typing import Literal
+else:
+ from typing_extensions import Literal
from ._version import version as __version__
+# 128K buffer size also used by cat, pigz etc. It is faster than the 8K default.
+BUFFER_SIZE = max(io.DEFAULT_BUFFER_SIZE, 128 * 1024)
+
try:
from isal import igzip, isal_zlib # type: ignore
@@ -42,8 +54,14 @@ except ImportError:
igzip = None
isal_zlib = None
+try:
+ import zstandard # type: ignore
+except ImportError:
+ zstandard = None
+
try:
import fcntl
+
# fcntl.F_SETPIPE_SZ will be available in python 3.10.
# https://github.com/python/cpython/pull/21921
# If not available: set it to the correct value for known platforms.
@@ -54,11 +72,16 @@ except ImportError:
_MAX_PIPE_SIZE_PATH = pathlib.Path("/proc/sys/fs/pipe-max-size")
try:
- _MAX_PIPE_SIZE = int(_MAX_PIPE_SIZE_PATH.read_text()) # type: Optional[int]
+ _MAX_PIPE_SIZE = int(
+ _MAX_PIPE_SIZE_PATH.read_text(encoding="ascii")
+ ) # type: Optional[int]
except OSError: # Catches file not found and permission errors. Possible other errors too.
_MAX_PIPE_SIZE = None
+FilePath = Union[str, bytes, os.PathLike]
+
+
def _available_cpu_count() -> int:
"""
Number of available virtual or physical CPUs on this system
@@ -69,21 +92,19 @@ def _available_cpu_count() -> int:
except AttributeError:
pass
import re
+
try:
- with open('/proc/self/status') as f:
+ with open("/proc/self/status") as f:
status = f.read()
- m = re.search(r'(?m)^Cpus_allowed:\s*(.*)$', status)
+ m = re.search(r"(?m)^Cpus_allowed:\s*(.*)$", status)
if m:
- res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
+ res = bin(int(m.group(1).replace(",", ""), 16)).count("1")
if res > 0:
return res
except OSError:
pass
- try:
- import multiprocessing
- return multiprocessing.cpu_count()
- except (ImportError, NotImplementedError):
- return 1
+ count = os.cpu_count()
+ return 1 if count is None else count
def _set_pipe_size_to_max(fd: int) -> None:
@@ -111,8 +132,9 @@ def _can_read_concatenated_gz(program: str) -> bool:
with open(temp_path, "wb") as temp_file:
temp_file.write(gzip.compress(b"AB") + gzip.compress(b"CD"))
try:
- result = subprocess.run([program, "-c", "-d", temp_path],
- check=True, stderr=PIPE, stdout=PIPE)
+ result = subprocess.run(
+ [program, "-c", "-d", temp_path], check=True, stderr=PIPE, stdout=PIPE
+ )
return result.stdout == b"ABCD"
except subprocess.CalledProcessError:
# Program can't read zip
@@ -149,10 +171,20 @@ class PipedCompressionWriter(Closing):
"""
Write Compressed files by running an external process and piping into it.
"""
- def __init__(self, path, program_args: List[str], mode='wt',
- compresslevel: Optional[int] = None,
- threads_flag: Optional[str] = None,
- threads: Optional[int] = None):
+
+ def __init__(
+ self,
+ path: FilePath,
+ program_args: List[str],
+ mode="wt",
+ compresslevel: Optional[int] = None,
+ threads_flag: Optional[str] = None,
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
"""
mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
compresslevel -- compression level
@@ -162,14 +194,17 @@ class PipedCompressionWriter(Closing):
used. At the moment, this means that the number of available CPU cores is used, capped
at four to avoid creating too many threads. Use 0 to use all available cores.
"""
- if mode not in ('w', 'wt', 'wb', 'a', 'at', 'ab'):
+ if mode not in ("w", "wt", "wb", "a", "at", "ab"):
raise ValueError(
- "Mode is '{}', but it must be 'w', 'wt', 'wb', 'a', 'at' or 'ab'".format(mode))
+ "Mode is '{}', but it must be 'w', 'wt', 'wb', 'a', 'at' or 'ab'".format(
+ mode
+ )
+ )
# TODO use a context manager
- self.outfile = open(path, mode)
+ self.outfile = open(path, mode[0] + "b")
self.closed: bool = False
- self.name: str = path
+ self.name: str = str(os.fspath(path))
self._mode: str = mode
self._program_args: List[str] = program_args
self._threads_flag: Optional[str] = threads_flag
@@ -179,15 +214,18 @@ class PipedCompressionWriter(Closing):
self._threads = threads
try:
self.process = self._open_process(
- mode, compresslevel, threads, self.outfile)
+ mode, compresslevel, threads, self.outfile
+ )
except OSError:
self.outfile.close()
raise
assert self.process.stdin is not None
_set_pipe_size_to_max(self.process.stdin.fileno())
- if 'b' not in mode:
- self._file = io.TextIOWrapper(self.process.stdin) # type: IO
+ if "b" not in mode:
+ self._file = io.TextIOWrapper(
+ self.process.stdin, encoding=encoding, errors=errors, newline=newline
+ ) # type: IO
else:
self._file = self.process.stdin
@@ -201,14 +239,18 @@ class PipedCompressionWriter(Closing):
)
def _open_process(
- self, mode: str, compresslevel: Optional[int], threads: int, outfile: TextIO,
+ self,
+ mode: str,
+ compresslevel: Optional[int],
+ threads: int,
+ outfile: TextIO,
) -> Popen:
program_args: List[str] = self._program_args[:] # prevent list aliasing
if threads != 0 and self._threads_flag is not None:
program_args += [f"{self._threads_flag}{threads}"]
extra_args = []
- if 'w' in mode and compresslevel is not None:
- extra_args += ['-' + str(compresslevel)]
+ if "w" in mode and compresslevel is not None:
+ extra_args += ["-" + str(compresslevel)]
kwargs = dict(stdin=PIPE, stdout=outfile, stderr=DEVNULL)
@@ -216,8 +258,8 @@ class PipedCompressionWriter(Closing):
# <http://bugs.python.org/issue12786>.
# However, close_fds is not supported on Windows. See
# <https://github.com/marcelm/cutadapt/issues/315>.
- if sys.platform != 'win32':
- kwargs['close_fds'] = True
+ if sys.platform != "win32":
+ kwargs["close_fds"] = True
process = Popen(program_args + extra_args, **kwargs) # type: ignore
return process
@@ -233,17 +275,27 @@ class PipedCompressionWriter(Closing):
retcode = self.process.wait()
self.outfile.close()
if retcode != 0:
+ try:
+ cause = (
+ f". Possible cause: {os.strerror(retcode)}" if retcode > 1 else ""
+ )
+ except ValueError:
+ cause = ""
raise OSError(
- "Output {} process terminated with exit code {}".format(
- " ".join(self._program_args), retcode))
+ "Output process '{}' terminated with exit code {}{}".format(
+ " ".join(self._program_args),
+ retcode,
+ cause,
+ )
+ )
- def __iter__(self): # type: ignore
+ def __iter__(self):
# For compatibility with Pandas, which checks for an __iter__ method
# to determine whether an object is file-like.
return self
def __next__(self):
- raise io.UnsupportedOperation('not readable')
+ raise io.UnsupportedOperation("not readable")
class PipedCompressionReader(Closing):
@@ -259,19 +311,28 @@ class PipedCompressionReader(Closing):
def __init__(
self,
- path,
- program_args: List[str],
+ path: FilePath,
+ program_args: List[Union[str, bytes]],
mode: str = "r",
threads_flag: Optional[str] = None,
threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
):
"""
Raise an OSError when pigz could not be found.
"""
- if mode not in ('r', 'rt', 'rb'):
- raise ValueError("Mode is '{}', but it must be 'r', 'rt' or 'rb'".format(mode))
+ if mode not in ("r", "rt", "rb"):
+ raise ValueError(
+ "Mode is '{}', but it must be 'r', 'rt' or 'rb'".format(mode)
+ )
self._program_args = program_args
- program_args = program_args + ['-cd', path]
+ path = os.fspath(path)
+ if isinstance(path, bytes) and sys.platform == "win32":
+ path = path.decode()
+ program_args = program_args + ["-cd", path]
if threads_flag is not None:
if threads is None:
@@ -291,8 +352,10 @@ class PipedCompressionReader(Closing):
_set_pipe_size_to_max(self.process.stdout.fileno())
self._mode = mode
- if 'b' not in mode:
- self._file: IO = io.TextIOWrapper(self.process.stdout)
+ if "b" not in mode:
+ self._file: IO = io.TextIOWrapper(
+ self.process.stdout, encoding=encoding, errors=errors, newline=newline
+ )
else:
self._file = self.process.stdout
self.closed = False
@@ -342,13 +405,14 @@ class PipedCompressionReader(Closing):
# stdout is io.BufferedReader if set to PIPE
while True:
- first_output = self.process.stdout.peek(1) # type: ignore
+ first_output = self.process.stdout.peek(1)
if first_output or self.process.poll() is not None:
break
time.sleep(0.01)
- def _raise_if_error(self, check_allowed_code_and_message: bool = False,
- stderr_message: bytes = b"") -> None:
+ def _raise_if_error(
+ self, check_allowed_code_and_message: bool = False, stderr_message: bytes = b""
+ ) -> None:
"""
Raise OSError if process is not running anymore and the exit code is
nonzero. If check_allowed_code_and_message is set, OSError is not raised when
@@ -358,6 +422,11 @@ class PipedCompressionReader(Closing):
"""
retcode = self.process.poll()
+ if sys.platform == "win32" and retcode == 1 and stderr_message == b"":
+ # Special case for Windows. Winapi terminates processes with exit code 1
+ # and an empty error message.
+ return
+
if retcode is None:
# process still running
return
@@ -369,9 +438,8 @@ class PipedCompressionReader(Closing):
if retcode == self._allowed_exit_code:
# terminated with allowed exit code
return
- if (
+ if self._allowed_exit_message and stderr_message.startswith(
self._allowed_exit_message
- and stderr_message.startswith(self._allowed_exit_message)
):
# terminated with another exit code, but message is allowed
return
@@ -395,6 +463,12 @@ class PipedCompressionReader(Closing):
def seekable(self) -> bool:
return self._file.seekable()
+ def tell(self) -> int:
+ return self._file.tell()
+
+ def seek(self, offset, whence=0) -> int:
+ return self._file.seek(offset, whence)
+
def peek(self, n: int = None):
if hasattr(self._file, "peek"):
return self._file.peek(n) # type: ignore
@@ -415,8 +489,13 @@ class PipedGzipReader(PipedCompressionReader):
"""
Open a pipe to gzip for reading a gzipped file.
"""
- def __init__(self, path, mode: str = "r"):
- super().__init__(path, ["gzip"], mode)
+
+ def __init__(
+ self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None
+ ):
+ super().__init__(
+ path, ["gzip"], mode, encoding=encoding, errors=errors, newline=newline
+ )
class PipedGzipWriter(PipedCompressionWriter):
@@ -426,7 +505,17 @@ class PipedGzipWriter(PipedCompressionWriter):
but running an external gzip can still reduce wall-clock time because
the compression happens in a separate process.
"""
- def __init__(self, path, mode: str = "wt", compresslevel: Optional[int] = None):
+
+ def __init__(
+ self,
+ path,
+ mode: str = "wt",
+ compresslevel: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
"""
mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
compresslevel -- compression level
@@ -436,7 +525,16 @@ class PipedGzipWriter(PipedCompressionWriter):
"""
if compresslevel is not None and compresslevel not in range(1, 10):
raise ValueError("compresslevel must be between 1 and 9")
- super().__init__(path, ["gzip"], mode, compresslevel, None)
+ super().__init__(
+ path,
+ ["gzip", "--no-name"],
+ mode,
+ compresslevel,
+ None,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPigzReader(PipedCompressionReader):
@@ -446,8 +544,27 @@ class PipedPigzReader(PipedCompressionReader):
also faster when reading, even when forced to use a single thread
(ca. 2x speedup).
"""
- def __init__(self, path, mode: str = "r", threads: Optional[int] = None):
- super().__init__(path, ["pigz"], mode, "-p", threads)
+
+ def __init__(
+ self,
+ path,
+ mode: str = "r",
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ super().__init__(
+ path,
+ ["pigz"],
+ mode,
+ "-p",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPigzWriter(PipedCompressionWriter):
@@ -457,6 +574,7 @@ class PipedPigzWriter(PipedCompressionWriter):
efficient than gzip on only one core. (But then igzip is even faster and
should be preferred if the compression level allows it.)
"""
+
_accepted_compression_levels: Set[int] = set(list(range(10)) + [11])
def __init__(
@@ -465,6 +583,10 @@ class PipedPigzWriter(PipedCompressionWriter):
mode: str = "wt",
compresslevel: Optional[int] = None,
threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
):
"""
mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
@@ -473,9 +595,22 @@ class PipedPigzWriter(PipedCompressionWriter):
used. At the moment, this means that the number of available CPU cores is used, capped
at four to avoid creating too many threads. Use 0 to let pigz use all available cores.
"""
- if compresslevel is not None and compresslevel not in self._accepted_compression_levels:
+ if (
+ compresslevel is not None
+ and compresslevel not in self._accepted_compression_levels
+ ):
raise ValueError("compresslevel must be between 0 and 9 or 11")
- super().__init__(path, ["pigz"], mode, compresslevel, "-p", threads)
+ super().__init__(
+ path,
+ ["pigz", "--no-name"],
+ mode,
+ compresslevel,
+ "-p",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPBzip2Reader(PipedCompressionReader):
@@ -486,8 +621,26 @@ class PipedPBzip2Reader(PipedCompressionReader):
_allowed_exit_code = None
_allowed_exit_message = b"\n *Control-C or similar caught [sig=15], quitting..."
- def __init__(self, path, mode: str = "r", threads: Optional[int] = None):
- super().__init__(path, ["pbzip2"], mode, "-p", threads)
+ def __init__(
+ self,
+ path,
+ mode: str = "r",
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ super().__init__(
+ path,
+ ["pbzip2"],
+ mode,
+ "-p",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPBzip2Writer(PipedCompressionWriter):
@@ -501,9 +654,98 @@ class PipedPBzip2Writer(PipedCompressionWriter):
path,
mode: str = "wt",
threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
):
# Use default compression level for pbzip2: 9
- super().__init__(path, ["pbzip2"], mode, 9, "-p", threads)
+ super().__init__(
+ path,
+ ["pbzip2"],
+ mode,
+ 9,
+ "-p",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
+
+
+class PipedXzReader(PipedCompressionReader):
+ """
+ Open a pipe to xz for reading an xz-compressed file. A future
+ version of xz will be able to decompress using multiple cores.
+
+ (N.B. As of 21 March 2022, this feature is only implemented in xz's
+ master branch.)
+ """
+
+ def __init__(
+ self,
+ path,
+ mode: str = "r",
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ super().__init__(
+ path,
+ ["xz"],
+ mode,
+ "-T",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
+
+
+class PipedXzWriter(PipedCompressionWriter):
+ """
+ Write xz-compressed files by running an external xz process and
+ piping into it. xz can compress using multiple cores.
+ """
+
+ _accepted_compression_levels: Set[int] = set(range(10))
+
+ def __init__(
+ self,
+ path,
+ mode: str = "wt",
+ compresslevel: Optional[int] = None,
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ """
+ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
+ compresslevel -- compression level
+ threads (int) -- number of xz threads. If this is set to None, a reasonable default is
+ used. At the moment, this means that the number of available CPU cores is used, capped
+ at four to avoid creating too many threads. Use 0 to let xz use all available cores.
+ """
+ if (
+ compresslevel is not None
+ and compresslevel not in self._accepted_compression_levels
+ ):
+ raise ValueError("compresslevel must be between 0 and 9")
+ super().__init__(
+ path,
+ ["xz"],
+ mode,
+ compresslevel,
+ "-T",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedIGzipReader(PipedCompressionReader):
@@ -513,7 +755,10 @@ class PipedIGzipReader(PipedCompressionReader):
can only run on x86 and ARM architectures, but is able to use more
architecture-specific optimizations as a result.
"""
- def __init__(self, path, mode: str = "r"):
+
+ def __init__(
+ self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None
+ ):
if not _can_read_concatenated_gz("igzip"):
# Instead of elaborate version string checking once the problem is
# fixed, it is much easier to use this, "proof in the pudding" type
@@ -521,8 +766,79 @@ class PipedIGzipReader(PipedCompressionReader):
raise ValueError(
"This version of igzip does not support reading "
"concatenated gzip files and is therefore not "
- "safe to use. See: https://github.com/intel/isa-l/issues/143")
- super().__init__(path, ["igzip"], mode)
+ "safe to use. See: https://github.com/intel/isa-l/issues/143"
+ )
+ super().__init__(
+ path, ["igzip"], mode, encoding=encoding, errors=errors, newline=newline
+ )
+
+
+class PipedZstdReader(PipedCompressionReader):
+ """
+ Open a pipe to zstd for reading a zstandard-compressed file (.zst).
+ """
+
+ def __init__(
+ self,
+ path,
+ mode: str = "r",
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ super().__init__(
+ path,
+ ["zstd"],
+ mode,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
+
+
+class PipedZstdWriter(PipedCompressionWriter):
+ """
+ Write Zstandard-compressed files by running an external xz process and
+ piping into it. xz can compress using multiple cores.
+ """
+
+ _accepted_compression_levels: Set[int] = set(range(1, 20))
+
+ def __init__(
+ self,
+ path,
+ mode: str = "wt",
+ compresslevel: Optional[int] = None,
+ threads: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
+ """
+ mode -- one of 'w', 'wt', 'wb', 'a', 'at', 'ab'
+ compresslevel -- compression level
+ threads (int) -- number of zstd threads. If this is set to None, a reasonable default is
+ used. At the moment, this means that the number of available CPU cores is used, capped
+ at four to avoid creating too many threads. Use 0 to let zstd use all available cores.
+ """
+ if (
+ compresslevel is not None
+ and compresslevel not in self._accepted_compression_levels
+ ):
+ raise ValueError("compresslevel must be between 1 and 19")
+ super().__init__(
+ path,
+ ["zstd"],
+ mode,
+ compresslevel,
+ "-T",
+ threads,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedIGzipWriter(PipedCompressionWriter):
@@ -539,112 +855,275 @@ class PipedIGzipWriter(PipedCompressionWriter):
filesizes from their pigz/gzip counterparts.
See: https://gist.github.com/rhpvorderman/4f1201c3f39518ff28dde45409eb696b
"""
- def __init__(self, path, mode: str = "wt", compresslevel: Optional[int] = None):
+
+ def __init__(
+ self,
+ path,
+ mode: str = "wt",
+ compresslevel: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
if compresslevel is not None and compresslevel not in range(0, 4):
raise ValueError("compresslevel must be between 0 and 3")
- super().__init__(path, ["igzip"], mode, compresslevel)
+ super().__init__(
+ path,
+ ["igzip", "--no-name"],
+ mode,
+ compresslevel,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPythonIsalReader(PipedCompressionReader):
- def __init__(self, path, mode: str = "r"):
- super().__init__(path, [sys.executable, "-m", "isal.igzip"], mode)
+ def __init__(
+ self, path, mode: str = "r", *, encoding="utf-8", errors=None, newline=None
+ ):
+ super().__init__(
+ path,
+ [sys.executable, "-m", "isal.igzip"],
+ mode,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
class PipedPythonIsalWriter(PipedCompressionWriter):
- def __init__(self, path, mode: str = "wt", compresslevel: Optional[int] = None):
+ def __init__(
+ self,
+ path,
+ mode: str = "wt",
+ compresslevel: Optional[int] = None,
+ *,
+ encoding="utf-8",
+ errors=None,
+ newline=None,
+ ):
if compresslevel is not None and compresslevel not in range(0, 4):
raise ValueError("compresslevel must be between 0 and 3")
- super().__init__(path, [sys.executable, "-m", "isal.igzip"], mode, compresslevel)
+ super().__init__(
+ path,
+ [sys.executable, "-m", "isal.igzip", "--no-name"],
+ mode,
+ compresslevel,
+ encoding=encoding,
+ errors=errors,
+ newline=newline,
+ )
-def _open_stdin_or_out(mode: str) -> IO:
+def _open_stdin_or_out(mode: str, **text_mode_kwargs) -> IO:
# Do not return sys.stdin or sys.stdout directly as we want the returned object
# to be closable without closing sys.stdout.
std = dict(r=sys.stdin, w=sys.stdout)[mode[0]]
- return open(std.fileno(), mode=mode, closefd=False)
+ return open(std.fileno(), mode=mode, closefd=False, **text_mode_kwargs)
-def _open_bz2(filename, mode: str, threads: Optional[int]):
+def _open_bz2(filename, mode: str, threads: Optional[int], **text_mode_kwargs):
if threads != 0:
try:
if "r" in mode:
- return PipedPBzip2Reader(filename, mode, threads)
+ return PipedPBzip2Reader(filename, mode, threads, **text_mode_kwargs)
else:
- return PipedPBzip2Writer(filename, mode, threads)
+ return PipedPBzip2Writer(filename, mode, threads, **text_mode_kwargs)
except OSError:
pass # We try without threads.
- return bz2.open(filename, mode)
+ return bz2.open(filename, mode, **text_mode_kwargs)
-def _open_xz(filename, mode: str) -> IO:
- return lzma.open(filename, mode)
+def _open_xz(
+ filename,
+ mode: str,
+ compresslevel: Optional[int],
+ threads: Optional[int],
+ **text_mode_kwargs,
+):
+ if compresslevel is None:
+ compresslevel = 6
+ if threads != 0:
+ try:
+ if "r" in mode:
+ return PipedXzReader(filename, mode, threads, **text_mode_kwargs)
+ else:
+ return PipedXzWriter(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
+ except OSError:
+ pass # We try without threads.
-def _open_external_gzip_reader(filename, mode, compresslevel, threads):
- assert "r" in mode
+ return lzma.open(
+ filename,
+ mode,
+ preset=compresslevel if "w" in mode else None,
+ **text_mode_kwargs,
+ )
+
+
+def _open_zst( # noqa: C901
+ filename,
+ mode: str,
+ compresslevel: Optional[int],
+ threads: Optional[int],
+ **text_mode_kwargs,
+):
+ assert compresslevel != 0
+ if compresslevel is None:
+ compresslevel = 3
+ if threads != 0:
+ try:
+ if "r" in mode:
+ return PipedZstdReader(filename, mode, **text_mode_kwargs)
+ else:
+ return PipedZstdWriter(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
+ except OSError:
+ if zstandard is None:
+ # No fallback available
+ raise
+
+ if zstandard is None:
+ raise ImportError("zstandard module (python-zstandard) not available")
+ if compresslevel is not None and "w" in mode:
+ cctx = zstandard.ZstdCompressor(level=compresslevel)
+ else:
+ cctx = None
+ f = zstandard.open(
+ filename,
+ mode,
+ cctx=cctx,
+ **text_mode_kwargs,
+ )
+ if mode == "rb":
+ return io.BufferedReader(f)
+ elif mode == "wb":
+ return io.BufferedWriter(f)
+ return f
+
+
+def _open_external_gzip_reader(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+):
+ assert mode in ("rt", "rb")
try:
- return PipedIGzipReader(filename, mode)
+ return PipedIGzipReader(filename, mode, **text_mode_kwargs)
except (OSError, ValueError):
# No igzip installed or version does not support reading
# concatenated files.
pass
if igzip:
- return PipedPythonIsalReader(filename, mode)
+ return PipedPythonIsalReader(filename, mode, **text_mode_kwargs)
try:
- return PipedPigzReader(filename, mode, threads=threads)
+ return PipedPigzReader(filename, mode, threads=threads, **text_mode_kwargs)
except OSError:
- return PipedGzipReader(filename, mode)
+ return PipedGzipReader(filename, mode, **text_mode_kwargs)
-def _open_external_gzip_writer(filename, mode, compresslevel, threads):
- assert "r" not in mode
+def _open_external_gzip_writer(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+):
+ assert mode in ("wt", "wb", "at", "ab")
try:
- return PipedIGzipWriter(filename, mode, compresslevel)
+ return PipedIGzipWriter(filename, mode, compresslevel, **text_mode_kwargs)
except (OSError, ValueError):
# No igzip installed or compression level higher than 3
pass
if igzip: # We can use the CLI from isal.igzip
try:
- return PipedPythonIsalWriter(filename, mode, compresslevel)
+ return PipedPythonIsalWriter(
+ filename, mode, compresslevel, **text_mode_kwargs
+ )
except ValueError: # Wrong compression level
pass
try:
- return PipedPigzWriter(filename, mode, compresslevel, threads=threads)
+ return PipedPigzWriter(
+ filename, mode, compresslevel, threads=threads, **text_mode_kwargs
+ )
except OSError:
- return PipedGzipWriter(filename, mode, compresslevel)
+ return PipedGzipWriter(filename, mode, compresslevel, **text_mode_kwargs)
-def _open_gz(filename, mode: str, compresslevel, threads):
+def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs):
+ assert mode in ("rt", "rb", "wt", "wb", "at", "ab")
if threads != 0:
try:
if "r" in mode:
- return _open_external_gzip_reader(filename, mode, compresslevel, threads)
+ return _open_external_gzip_reader(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
else:
- return _open_external_gzip_writer(filename, mode, compresslevel, threads)
+ return _open_external_gzip_writer(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
except OSError:
pass # We try without threads.
- if 'r' in mode:
+ if "r" in mode:
if igzip is not None:
- return igzip.open(filename, mode)
- return gzip.open(filename, mode)
+ return igzip.open(filename, mode, **text_mode_kwargs)
+ return gzip.open(filename, mode, **text_mode_kwargs)
+
+ g = _open_reproducible_gzip(
+ filename,
+ mode=mode[0] + "b",
+ compresslevel=compresslevel,
+ )
+ if "t" in mode:
+ return io.TextIOWrapper(g, **text_mode_kwargs)
+ return g
+
+def _open_reproducible_gzip(filename, mode, compresslevel):
+ """
+ Open a gzip file for writing (without external processes)
+ that has neither mtime nor the file name in the header
+ (equivalent to gzip --no-name)
+ """
+ assert mode in ("rb", "wb", "ab")
+ # Neither gzip.open nor igzip.open have an mtime option, and they will
+ # always write the file name, so we need to open the file separately
+ # and pass it to gzip.GzipFile/igzip.IGzipFile.
+ binary_file = open(filename, mode=mode)
+ kwargs = dict(
+ fileobj=binary_file,
+ filename="",
+ mode=mode,
+ mtime=0,
+ )
+ gzip_file = None
if igzip is not None:
try:
- return igzip.open(filename, mode,
- compresslevel=isal_zlib.ISAL_DEFAULT_COMPRESSION
- if compresslevel is None else compresslevel)
+ gzip_file = igzip.IGzipFile(
+ **kwargs,
+ compresslevel=isal_zlib.ISAL_DEFAULT_COMPRESSION
+ if compresslevel is None
+ else compresslevel,
+ )
except ValueError:
# Compression level not supported, move to built-in gzip.
pass
-
- # Override gzip.open's default of 9 for consistency with command-line gzip.
- return gzip.open(filename, mode,
- compresslevel=6 if compresslevel is None else compresslevel)
+ if gzip_file is None:
+ gzip_file = gzip.GzipFile(
+ **kwargs,
+ # Override gzip.open's default of 9 for consistency
+ # with command-line gzip.
+ compresslevel=6 if compresslevel is None else compresslevel,
+ )
+ # When (I)GzipFile is created with a fileobj instead of a filename,
+ # the passed file object is not closed when (I)GzipFile.close()
+ # is called. This forces it to be closed.
+ gzip_file.myfileobj = binary_file
+ return gzip_file
-def _detect_format_from_content(filename: str) -> Optional[str]:
+def _detect_format_from_content(filename: FilePath) -> Optional[str]:
"""
Attempts to detect file format from the content by reading the first
6 bytes. Returns None if no format could be detected.
@@ -653,52 +1132,91 @@ def _detect_format_from_content(filename: str) -> Optional[str]:
if stat.S_ISREG(os.stat(filename).st_mode):
with open(filename, "rb") as fh:
bs = fh.read(6)
- if bs[:2] == b'\x1f\x8b':
+ if bs[:2] == b"\x1f\x8b":
# https://tools.ietf.org/html/rfc1952#page-6
return "gz"
- elif bs[:3] == b'\x42\x5a\x68':
+ elif bs[:3] == b"\x42\x5a\x68":
# https://en.wikipedia.org/wiki/List_of_file_signatures
return "bz2"
- elif bs[:6] == b'\xfd\x37\x7a\x58\x5a\x00':
+ elif bs[:6] == b"\xfd\x37\x7a\x58\x5a\x00":
# https://tukaani.org/xz/xz-file-format.txt
return "xz"
+ elif bs[:4] == b"\x28\xb5\x2f\xfd":
+ # https://datatracker.ietf.org/doc/html/rfc8478#section-3.1.1
+ return "zst"
except OSError:
pass
return None
-def _detect_format_from_extension(filename: str) -> Optional[str]:
+def _detect_format_from_extension(filename: Union[str, bytes]) -> Optional[str]:
"""
- Attempts to detect file format from the filename extension.
- Returns None if no format could be detected.
+ Attempt to detect file format from the filename extension.
+ Return None if no format could be detected.
"""
- if filename.endswith('.bz2'):
- return "bz2"
- elif filename.endswith('.xz'):
- return "xz"
- elif filename.endswith('.gz'):
- return "gz"
- else:
- return None
+ for ext in ("bz2", "xz", "gz", "zst"):
+ if isinstance(filename, bytes):
+ if filename.endswith(b"." + ext.encode()):
+ return ext
+ else:
+ if filename.endswith("." + ext):
+ return ext
+ return None
+ at overload
def xopen(
- filename,
- mode: str = "r",
+ filename: FilePath,
+ mode: Literal["r", "w", "a", "rt", "wt", "at"] = ...,
+ compresslevel: Optional[int] = ...,
+ threads: Optional[int] = ...,
+ *,
+ encoding: str = ...,
+ errors: Optional[str] = ...,
+ newline: Optional[str] = ...,
+ format: Optional[str] = ...,
+) -> TextIO:
+ ...
+
+
+ at overload
+def xopen(
+ filename: FilePath,
+ mode: Literal["rb", "wb", "ab"],
+ compresslevel: Optional[int] = ...,
+ threads: Optional[int] = ...,
+ *,
+ encoding: str = ...,
+ errors: None = ...,
+ newline: None = ...,
+ format: Optional[str] = ...,
+) -> BinaryIO:
+ ...
+
+
+def xopen( # noqa: C901 # The function is complex, but readable.
+ filename: FilePath,
+ mode: Literal["r", "w", "a", "rt", "rb", "wt", "wb", "at", "ab"] = "r",
compresslevel: Optional[int] = None,
threads: Optional[int] = None,
+ *,
+ encoding: str = "utf-8",
+ errors: Optional[str] = None,
+ newline: Optional[str] = None,
+ format: Optional[str] = None,
) -> IO:
"""
A replacement for the "open" function that can also read and write
compressed files transparently. The supported compression formats are gzip,
- bzip2 and xz. If the filename is '-', standard output (mode 'w') or
+ bzip2, xz and zstandard. If the filename is '-', standard output (mode 'w') or
standard input (mode 'r') is returned.
When writing, the file format is chosen based on the file name extension:
- .gz uses gzip compression
- .bz2 uses bzip2 compression
- .xz uses xz/lzma compression
+ - .zst uses zstandard compression
- otherwise, no compression is used
When reading, if a file name extension is available, the format is detected
@@ -707,45 +1225,81 @@ def xopen(
mode can be: 'rt', 'rb', 'at', 'ab', 'wt', or 'wb'. Also, the 't' can be omitted,
so instead of 'rt', 'wt' and 'at', the abbreviations 'r', 'w' and 'a' can be used.
- compresslevel is the compression level for writing to gzip files.
- This parameter is ignored for the other compression formats. If set to
- None (default), level 6 is used.
-
- threads only has a meaning when reading or writing gzip files.
-
- When threads is None (the default), reading or writing a gzip file is done with a pigz
- (parallel gzip) subprocess if possible. See PipedGzipWriter and PipedGzipReader.
-
- When threads = 0, no subprocess is used.
+ compresslevel is the compression level for writing to gzip, xz and zst files.
+ This parameter is ignored for the other compression formats.
+ If set to None, a default depending on the format is used:
+ gzip: 6, xz: 6, zstd: 3.
+
+ When threads is None (the default), compressed file formats are read or written
+ using a pipe to a subprocess running an external tool such as ``igzip``,
+ ``pbzip2``, ``pigz`` etc., see PipedIGzipWriter, PipedIGzipReader etc.
+ If the external tool supports multiple threads, *threads* can be set to an int
+ specifying the number of threads to use.
+ If no external tool supporting the compression format is available, the file is
+ opened calling the appropriate Python function
+ (that is, no subprocess is spawned).
+
+ Set threads to 0 to force opening the file without using a subprocess.
+
+ encoding, errors and newline are used when opening a file in text mode.
+ The parameters have the same meaning as in the built-in open function,
+ except that the default encoding is always UTF-8 instead of the
+ preferred locale encoding.
+
+ format overrides the autodetection of input and output formats. This can be
+ useful when compressed output needs to be written to a file without an
+ extension. Possible values are "gz", "xz", "bz2", "zst".
"""
- if mode in ('r', 'w', 'a'):
- mode += 't'
- if mode not in ('rt', 'rb', 'wt', 'wb', 'at', 'ab'):
+ if mode in ("r", "w", "a"):
+ mode += "t" # type: ignore
+ if mode not in ("rt", "rb", "wt", "wb", "at", "ab"):
raise ValueError("Mode '{}' not supported".format(mode))
filename = os.fspath(filename)
- if filename == '-':
- return _open_stdin_or_out(mode)
-
- detected_format = _detect_format_from_extension(filename)
+ if "b" in mode:
+ # Do not pass encoding etc. in binary mode as this raises errors..
+ text_mode_kwargs = dict()
+ else:
+ text_mode_kwargs = dict(encoding=encoding, errors=errors, newline=newline)
+ if filename == "-":
+ return _open_stdin_or_out(mode, **text_mode_kwargs)
+
+ if format not in (None, "gz", "xz", "bz2", "zst"):
+ raise ValueError(
+ f"Format not supported: {format}. "
+ f"Choose one of: 'gz', 'xz', 'bz2', 'zst'"
+ )
+ detected_format = format or _detect_format_from_extension(filename)
if detected_format is None and "w" not in mode:
detected_format = _detect_format_from_content(filename)
if detected_format == "gz":
- opened_file = _open_gz(filename, mode, compresslevel, threads)
+ opened_file = _open_gz(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
elif detected_format == "xz":
- opened_file = _open_xz(filename, mode)
+ opened_file = _open_xz(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
elif detected_format == "bz2":
- opened_file = _open_bz2(filename, mode, threads)
+ opened_file = _open_bz2(filename, mode, threads, **text_mode_kwargs)
+ elif detected_format == "zst":
+ opened_file = _open_zst(
+ filename, mode, compresslevel, threads, **text_mode_kwargs
+ )
else:
- opened_file = open(filename, mode)
+ opened_file = open(filename, mode, **text_mode_kwargs) # type: ignore
# The "write" method for GzipFile is very costly. Lots of python calls are
# made. To a lesser extent this is true for LzmaFile and BZ2File. By
# putting a buffer in between, the expensive write method is called much
# less. The effect is very noticeable when writing small units such as
# lines or FASTQ records.
- if (isinstance(opened_file, (gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile))
- and "w" in mode):
- opened_file = io.BufferedWriter(opened_file) # type: ignore
+ if (
+ isinstance(opened_file, (gzip.GzipFile, bz2.BZ2File, lzma.LZMAFile)) # FIXME
+ and "w" in mode
+ ):
+ opened_file = io.BufferedWriter(
+ opened_file, buffer_size=BUFFER_SIZE # type: ignore
+ )
return opened_file
=====================================
src/xopen/_version.py
=====================================
@@ -1,5 +1,5 @@
# coding: utf-8
# file generated by setuptools_scm
# don't change, don't track in version control
-version = '1.2.1'
-version_tuple = (1, 2, 1)
+__version__ = version = '1.7.0'
+__version_tuple__ = version_tuple = (1, 7, 0)
=====================================
tests/conftest.py
=====================================
@@ -0,0 +1,32 @@
+import os
+import random
+import string
+import pytest
+
+from xopen import xopen
+
+
+ at pytest.fixture
+def create_large_file(tmp_path):
+ def _create_large_file(extension):
+ path = tmp_path / f"large{extension}"
+ random_text = "".join(random.choices(string.ascii_lowercase, k=1024))
+ # Make the text a lot bigger in order to ensure that it is larger than the
+ # pipe buffer size.
+ random_text *= 2048
+ with xopen(path, "w") as f:
+ f.write(random_text)
+ return path
+
+ return _create_large_file
+
+
+ at pytest.fixture
+def create_truncated_file(create_large_file):
+ def _create_truncated_file(extension):
+ large_file = create_large_file(extension)
+ with open(large_file, "a", encoding="ascii") as f:
+ f.truncate(os.stat(large_file).st_size - 10)
+ return large_file
+
+ return _create_truncated_file
=====================================
tests/file.txt.bz2.test deleted
=====================================
Binary files a/tests/file.txt.bz2.test and /dev/null differ
=====================================
tests/file.txt.gz.test deleted
=====================================
Binary files a/tests/file.txt.gz.test and /dev/null differ
=====================================
tests/file.txt.xz.test deleted
=====================================
Binary files a/tests/file.txt.xz.test and /dev/null differ
=====================================
tests/file.txt.test → tests/file.txt.zst
=====================================
Binary files a/tests/file.txt.test and b/tests/file.txt.zst differ
=====================================
tests/test_piped.py
=====================================
@@ -0,0 +1,384 @@
+"""
+Tests for the PipedCompression classes
+"""
+import gzip
+import io
+import os
+import shutil
+import sys
+import time
+import pytest
+from pathlib import Path
+from itertools import cycle
+
+from xopen import (
+ xopen,
+ PipedCompressionReader,
+ PipedCompressionWriter,
+ PipedGzipReader,
+ PipedGzipWriter,
+ PipedPBzip2Reader,
+ PipedPBzip2Writer,
+ PipedPigzReader,
+ PipedPigzWriter,
+ PipedIGzipReader,
+ PipedIGzipWriter,
+ PipedPythonIsalReader,
+ PipedPythonIsalWriter,
+ PipedXzReader,
+ PipedXzWriter,
+ PipedZstdReader,
+ PipedZstdWriter,
+ _MAX_PIPE_SIZE,
+ _can_read_concatenated_gz,
+ igzip,
+)
+
+extensions = ["", ".gz", ".bz2", ".xz", ".zst"]
+
+try:
+ import fcntl
+
+ if not hasattr(fcntl, "F_GETPIPE_SZ") and sys.platform == "linux":
+ setattr(fcntl, "F_GETPIPE_SZ", 1032)
+except ImportError:
+ fcntl = None
+
+base = os.path.join(os.path.dirname(__file__), "file.txt")
+files = [base + ext for ext in extensions]
+TEST_DIR = Path(__file__).parent
+CONTENT_LINES = ["Testing, testing ...\n", "The second line.\n"]
+CONTENT = "".join(CONTENT_LINES)
+
+
+def available_gzip_readers_and_writers():
+ readers = [
+ klass
+ for prog, klass in [
+ ("gzip", PipedGzipReader),
+ ("pigz", PipedPigzReader),
+ ("igzip", PipedIGzipReader),
+ ]
+ if shutil.which(prog)
+ ]
+ if PipedIGzipReader in readers and not _can_read_concatenated_gz("igzip"):
+ readers.remove(PipedIGzipReader)
+
+ writers = [
+ klass
+ for prog, klass in [
+ ("gzip", PipedGzipWriter),
+ ("pigz", PipedPigzWriter),
+ ("igzip", PipedIGzipWriter),
+ ]
+ if shutil.which(prog)
+ ]
+ if igzip is not None:
+ readers.append(PipedPythonIsalReader)
+ writers.append(PipedPythonIsalWriter)
+ return readers, writers
+
+
+PIPED_GZIP_READERS, PIPED_GZIP_WRITERS = available_gzip_readers_and_writers()
+
+
+def available_bzip2_readers_and_writers():
+ if shutil.which("pbzip2"):
+ return [PipedPBzip2Reader], [PipedPBzip2Writer]
+ return [], []
+
+
+def available_xz_readers_and_writers():
+ result = [], []
+ if shutil.which("xz"):
+ result = [PipedXzReader], [PipedXzWriter]
+ return result
+
+
+def available_zstd_readers_and_writers():
+ result = [], []
+ if shutil.which("zstd"):
+ result = [PipedZstdReader], [PipedZstdWriter]
+ return result
+
+
+PIPED_BZIP2_READERS, PIPED_BZIP2_WRITERS = available_bzip2_readers_and_writers()
+PIPED_XZ_READERS, PIPED_XZ_WRITERS = available_xz_readers_and_writers()
+PIPED_ZST_READERS, PIPED_ZST_WRITERS = available_zstd_readers_and_writers()
+
+ALL_READERS_WITH_EXTENSION = (
+ list(zip(PIPED_GZIP_READERS, cycle([".gz"])))
+ + list(zip(PIPED_BZIP2_READERS, cycle([".bz2"])))
+ + list(zip(PIPED_XZ_READERS, cycle([".xz"])))
+ + list(zip(PIPED_ZST_READERS, cycle([".zst"])))
+)
+ALL_WRITERS_WITH_EXTENSION = (
+ list(zip(PIPED_GZIP_WRITERS, cycle([".gz"])))
+ + list(zip(PIPED_BZIP2_WRITERS, cycle([".bz2"])))
+ + list(zip(PIPED_XZ_WRITERS, cycle([".xz"])))
+ + list(zip(PIPED_ZST_WRITERS, cycle([".zst"])))
+)
+
+
+THREADED_READERS = set([(PipedPigzReader, ".gz"), (PipedPBzip2Reader, ".bz2")]) & set(
+ ALL_READERS_WITH_EXTENSION
+)
+
+
+ at pytest.fixture(params=PIPED_GZIP_WRITERS)
+def gzip_writer(request):
+ return request.param
+
+
+ at pytest.fixture(params=ALL_READERS_WITH_EXTENSION)
+def reader(request):
+ return request.param
+
+
+ at pytest.fixture(params=THREADED_READERS)
+def threaded_reader(request):
+ return request.param
+
+
+ at pytest.fixture(params=ALL_WRITERS_WITH_EXTENSION)
+def writer(request):
+ return request.param
+
+
+def test_reader_readinto(reader):
+ opener, extension = reader
+ content = CONTENT.encode("utf-8")
+ with opener(TEST_DIR / f"file.txt{extension}", "rb") as f:
+ b = bytearray(len(content) + 100)
+ length = f.readinto(b)
+ assert length == len(content)
+ assert b[:length] == content
+
+
+def test_reader_textiowrapper(reader):
+ opener, extension = reader
+ with opener(TEST_DIR / f"file.txt{extension}", "rb") as f:
+ wrapped = io.TextIOWrapper(f, encoding="utf-8")
+ assert wrapped.read() == CONTENT
+
+
+def test_reader_readline(reader):
+ opener, extension = reader
+ first_line = CONTENT_LINES[0].encode("utf-8")
+ with opener(TEST_DIR / f"file.txt{extension}", "rb") as f:
+ assert f.readline() == first_line
+
+
+def test_reader_readline_text(reader):
+ opener, extension = reader
+ with opener(TEST_DIR / f"file.txt{extension}", "r") as f:
+ assert f.readline() == CONTENT_LINES[0]
+
+
+ at pytest.mark.parametrize("threads", [None, 1, 2])
+def test_piped_reader_iter(threads, threaded_reader):
+ opener, extension = threaded_reader
+ with opener(TEST_DIR / f"file.txt{extension}", mode="r", threads=threads) as f:
+ lines = list(f)
+ assert lines[0] == CONTENT_LINES[0]
+
+
+def test_writer_has_iter_method(tmp_path, writer):
+ opener, extension = writer
+ with opener(tmp_path / f"out.{extension}") as f:
+ f.write("hello")
+ assert hasattr(f, "__iter__")
+
+
+def test_reader_iter_without_with(reader):
+ opener, extension = reader
+ it = iter(opener(TEST_DIR / f"file.txt{extension}"))
+ assert CONTENT_LINES[0] == next(it)
+
+
+ at pytest.mark.parametrize("mode", ["rb", "rt"])
+def test_reader_close(mode, reader, create_large_file):
+ reader, extension = reader
+ large_file = create_large_file(extension)
+ with reader(large_file, mode=mode) as f:
+ f.readline()
+ time.sleep(0.2)
+ # The subprocess should be properly terminated now
+
+
+def test_invalid_gzip_compression_level(gzip_writer, tmp_path):
+ with pytest.raises(ValueError) as e:
+ with gzip_writer(tmp_path / "out.gz", mode="w", compresslevel=17) as f:
+ f.write("hello") # pragma: no cover
+ assert "compresslevel must be" in e.value.args[0]
+
+
+def test_invalid_xz_compression_level(tmp_path):
+ with pytest.raises(ValueError) as e:
+ with PipedXzWriter(tmp_path / "out.xz", mode="w", compresslevel=10) as f:
+ f.write("hello") # pragma: no cover
+ assert "compresslevel must be" in e.value.args[0]
+
+
+def test_invalid_zstd_compression_level(tmp_path):
+ with pytest.raises(ValueError) as e:
+ with PipedZstdWriter(tmp_path / "out.zst", mode="w", compresslevel=25) as f:
+ f.write("hello") # pragma: no cover
+ assert "compresslevel must be" in e.value.args[0]
+
+
+def test_readers_read(reader):
+ opener, extension = reader
+ with opener(TEST_DIR / f"file.txt{extension}", "rt") as f:
+ assert f.read() == CONTENT
+
+
+ at pytest.mark.skipif(
+ sys.platform.startswith("win"),
+ reason="Windows does not have a gzip application by default.",
+)
+def test_concatenated_gzip_function():
+ assert _can_read_concatenated_gz("gzip") is True
+ assert _can_read_concatenated_gz("pigz") is True
+ assert _can_read_concatenated_gz("cat") is False
+
+
+ at pytest.mark.skipif(
+ not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None,
+ reason="Pipe size modifications not available on this platform.",
+)
+def test_pipesize_changed(tmp_path):
+ with xopen(tmp_path / "hello.gz", "wb") as f:
+ assert isinstance(f, PipedCompressionWriter)
+ assert fcntl.fcntl(f._file.fileno(), fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE
+
+
+def test_pipedcompressionwriter_wrong_mode(tmp_path):
+ with pytest.raises(ValueError) as error:
+ PipedCompressionWriter(tmp_path / "test", ["gzip"], "xb")
+ error.match("Mode is 'xb', but it must be")
+
+
+def test_pipedcompressionwriter_wrong_program(tmp_path):
+ with pytest.raises(OSError):
+ PipedCompressionWriter(tmp_path / "test", ["XVXCLSKDLA"], "wb")
+
+
+def test_compression_level(tmp_path, gzip_writer):
+ # Currently only the gzip writers handle compression levels.
+ path = tmp_path / "test.gz"
+ with gzip_writer(path, "wt", 2) as test_h:
+ test_h.write("test")
+ assert gzip.decompress(path.read_bytes()) == b"test"
+
+
+def test_iter_method_writers(writer, tmp_path):
+ opener, extension = writer
+ writer = opener(tmp_path / f"test{extension}", "wb")
+ assert iter(writer) == writer
+
+
+def test_next_method_writers(writer, tmp_path):
+ opener, extension = writer
+ writer = opener(tmp_path / f"test.{extension}", "wb")
+ with pytest.raises(io.UnsupportedOperation) as error:
+ next(writer)
+ error.match("not readable")
+
+
+def test_pipedcompressionreader_wrong_mode():
+ with pytest.raises(ValueError) as error:
+ PipedCompressionReader("test", ["gzip"], "xb")
+ error.match("Mode is 'xb', but it must be")
+
+
+def test_piped_compression_reader_peek_binary(reader):
+ opener, extension = reader
+ filegz = TEST_DIR / f"file.txt{extension}"
+ with opener(filegz, "rb") as read_h:
+ # Peek returns at least the amount of characters but maybe more
+ # depending on underlying stream. Hence startswith not ==.
+ assert read_h.peek(1).startswith(b"T")
+
+
+ at pytest.mark.skipif(
+ sys.platform != "win32", reason="seeking only works on Windows for now"
+)
+def test_piped_compression_reader_seek_and_tell(reader):
+ opener, extension = reader
+ filegz = TEST_DIR / f"file.txt{extension}"
+ with opener(filegz, "rb") as f:
+ original_position = f.tell()
+ assert f.read(4) == b"Test"
+ f.seek(original_position)
+ assert f.read(8) == b"Testing,"
+
+
+ at pytest.mark.parametrize("mode", ["r", "rt"])
+def test_piped_compression_reader_peek_text(reader, mode):
+ opener, extension = reader
+ compressed_file = TEST_DIR / f"file.txt{extension}"
+ with opener(compressed_file, mode) as read_h:
+ with pytest.raises(AttributeError):
+ read_h.peek(1)
+
+
+def writers_and_levels():
+ for writer in PIPED_GZIP_WRITERS:
+ if writer == PipedGzipWriter:
+ # Levels 1-9 are supported
+ yield from ((writer, i) for i in range(1, 10))
+ elif writer == PipedPigzWriter:
+ # Levels 0-9 + 11 are supported
+ yield from ((writer, i) for i in list(range(10)) + [11])
+ elif writer == PipedIGzipWriter or writer == PipedPythonIsalWriter:
+ # Levels 0-3 are supported
+ yield from ((writer, i) for i in range(4))
+ else:
+ raise NotImplementedError(
+ f"Test should be implemented for " f"{writer}"
+ ) # pragma: no cover
+
+
+ at pytest.mark.parametrize(["writer", "level"], writers_and_levels())
+def test_valid_compression_levels(writer, level, tmp_path):
+ path = tmp_path / "test.gz"
+ with writer(path, "wb", level) as handle:
+ handle.write(b"test")
+ assert gzip.decompress(path.read_bytes()) == b"test"
+
+
+ at pytest.mark.skipif(
+ sys.platform.startswith("win"), reason="cat is not available on Windows"
+)
+def test_compression_writer_unusual_encoding(tmp_path):
+ with PipedCompressionWriter(
+ tmp_path / "out.txt", program_args=["cat"], mode="wt", encoding="utf-16-le"
+ ) as f:
+ f.write("Hello")
+ assert (tmp_path / "out.txt").read_bytes() == b"H\0e\0l\0l\0o\0"
+
+
+def test_reproducible_gzip_compression(gzip_writer, tmp_path):
+ path = tmp_path / "file.gz"
+ with gzip_writer(path, mode="wb") as f:
+ f.write(b"hello")
+
+ data = path.read_bytes()
+ assert (data[3] & gzip.FNAME) == 0, "gzip header contains file name"
+ assert data[4:8] == b"\0\0\0\0", "gzip header contains mtime"
+
+
+def test_piped_tool_fails_on_close(tmp_path):
+ # This test exercises the retcode != 0 case in PipedCompressionWriter.close()
+ with pytest.raises(OSError) as e:
+ with PipedCompressionWriter(
+ tmp_path / "out.txt",
+ [
+ sys.executable,
+ "-c",
+ "import sys\nfor line in sys.stdin: pass\nprint()\nsys.exit(5)",
+ ],
+ ) as f:
+ f.write(b"Hello")
+ assert "terminated with exit code 5" in e.value.args[0]
=====================================
tests/test_xopen.py
=====================================
@@ -1,126 +1,36 @@
-import gzip
+"""
+Tests for the xopen.xopen function
+"""
import bz2
-import lzma
+from contextlib import contextmanager
+import functools
+import gzip
import io
+import itertools
+import lzma
import os
-import random
+from pathlib import Path
import shutil
-import signal
-import sys
-import time
+
import pytest
-from pathlib import Path
-from contextlib import contextmanager
-from itertools import cycle
-
-from xopen import (
- xopen,
- PipedCompressionReader,
- PipedCompressionWriter,
- PipedGzipReader,
- PipedGzipWriter,
- PipedPBzip2Reader,
- PipedPBzip2Writer,
- PipedPigzReader,
- PipedPigzWriter,
- PipedIGzipReader,
- PipedIGzipWriter,
- PipedPythonIsalReader,
- PipedPythonIsalWriter,
- _MAX_PIPE_SIZE,
- _can_read_concatenated_gz,
- igzip,
-)
-extensions = ["", ".gz", ".bz2", ".xz"]
+
+from xopen import xopen, _detect_format_from_content
try:
- import fcntl
- if not hasattr(fcntl, "F_GETPIPE_SZ") and sys.platform == "linux":
- setattr(fcntl, "F_GETPIPE_SZ", 1032)
+ import zstandard
except ImportError:
- fcntl = None
-
-base = "tests/file.txt"
-files = [base + ext for ext in extensions]
-CONTENT_LINES = ['Testing, testing ...\n', 'The second line.\n']
-CONTENT = ''.join(CONTENT_LINES)
-
-
-def available_gzip_readers_and_writers():
- readers = [
- klass for prog, klass in [
- ("gzip", PipedGzipReader),
- ("pigz", PipedPigzReader),
- ("igzip", PipedIGzipReader),
- ]
- if shutil.which(prog)
- ]
- if PipedIGzipReader in readers and not _can_read_concatenated_gz("igzip"):
- readers.remove(PipedIGzipReader)
-
- writers = [
- klass for prog, klass in [
- ("gzip", PipedGzipWriter),
- ("pigz", PipedPigzWriter),
- ("igzip", PipedIGzipWriter),
- ]
- if shutil.which(prog)
- ]
- if igzip is not None:
- readers.append(PipedPythonIsalReader)
- writers.append(PipedPythonIsalWriter)
- return readers, writers
-
-
-PIPED_GZIP_READERS, PIPED_GZIP_WRITERS = available_gzip_readers_and_writers()
-
-
-def available_bzip2_readers_and_writers():
- if shutil.which("pbzip2"):
- return [PipedPBzip2Reader], [PipedPBzip2Writer]
- return [], []
-
-
-PIPED_BZIP2_READERS, PIPED_BZIP2_WRITERS = available_bzip2_readers_and_writers()
-
-ALL_READERS_WITH_EXTENSION = list(zip(PIPED_GZIP_READERS, cycle([".gz"]))) + \
- list(zip(PIPED_BZIP2_READERS, cycle([".bz2"])))
-ALL_WRITERS_WITH_EXTENSION = list(zip(PIPED_GZIP_WRITERS, cycle([".gz"]))) + \
- list(zip(PIPED_BZIP2_WRITERS, cycle([".bz2"])))
-
-
-THREADED_READERS = set([(PipedPigzReader, ".gz"), (PipedPBzip2Reader, ".bz2")]) & \
- set(ALL_READERS_WITH_EXTENSION)
-
-
- at pytest.fixture(params=PIPED_GZIP_WRITERS)
-def gzip_writer(request):
- return request.param
-
-
- at pytest.fixture(params=extensions)
-def ext(request):
- return request.param
-
-
- at pytest.fixture(params=files)
-def fname(request):
- return request.param
-
-
- at pytest.fixture(params=ALL_READERS_WITH_EXTENSION)
-def reader(request):
- return request.param
+ zstandard = None
- at pytest.fixture(params=THREADED_READERS)
-def threaded_reader(request):
- return request.param
-
-
- at pytest.fixture(params=ALL_WRITERS_WITH_EXTENSION)
-def writer(request):
- return request.param
+# TODO this is duplicated in test_piped.py
+TEST_DIR = Path(__file__).parent
+CONTENT_LINES = ["Testing, testing ...\n", "The second line.\n"]
+CONTENT = "".join(CONTENT_LINES)
+extensions = ["", ".gz", ".bz2", ".xz"]
+if shutil.which("zstd") or zstandard:
+ extensions += [".zst"]
+base = os.path.join(os.path.dirname(__file__), "file.txt")
+files = [base + ext for ext in extensions]
@contextmanager
@@ -130,18 +40,28 @@ def disable_binary(tmp_path, binary_name):
the binary with permissions set to 000. If no suitable binary could be found,
PATH is set to an empty directory
"""
+ binary_path = shutil.which(binary_name)
+ if binary_path:
+ shutil.copy(binary_path, tmp_path)
+ os.chmod(tmp_path / Path(binary_path).name, 0)
+ path = os.environ["PATH"]
try:
- binary_path = shutil.which(binary_name)
- if binary_path:
- shutil.copy(binary_path, str(tmp_path))
- os.chmod(str(tmp_path / binary_name), 0)
- path = os.environ["PATH"]
os.environ["PATH"] = str(tmp_path)
yield
finally:
os.environ["PATH"] = path
+ at pytest.fixture(params=extensions)
+def ext(request):
+ return request.param
+
+
+ at pytest.fixture(params=files)
+def fname(request):
+ return request.param
+
+
@pytest.fixture
def lacking_pigz_permissions(tmp_path):
with disable_binary(tmp_path, "pigz"):
@@ -154,163 +74,138 @@ def lacking_pbzip2_permissions(tmp_path):
yield
- at pytest.fixture(params=[1024, 2048, 4096])
-def create_large_file(tmpdir, request):
- def _create_large_file(extension):
- path = str(tmpdir.join(f"large{extension}"))
- random_text = ''.join(random.choice('ABCDEFGHIJKLMNOPQRSTUVWXYZ\n') for _ in range(1024))
- # Make the text a lot bigger in order to ensure that it is larger than the
- # pipe buffer size.
- random_text *= request.param
- with xopen(path, 'w') as f:
- f.write(random_text)
- return path
- return _create_large_file
-
-
@pytest.fixture
-def create_truncated_file(create_large_file):
- def _create_truncated_file(extension):
- large_file = create_large_file(extension)
- with open(large_file, 'a') as f:
- f.truncate(os.stat(large_file).st_size - 10)
- return large_file
- return _create_truncated_file
+def lacking_xz_permissions(tmp_path):
+ with disable_binary(tmp_path, "xz"):
+ yield
@pytest.fixture
def xopen_without_igzip(monkeypatch):
import xopen # xopen local overrides xopen global variable
+
monkeypatch.setattr(xopen, "igzip", None)
return xopen.xopen
-def test_xopen_text(fname):
- with xopen(fname, 'rt') as f:
+def test_text(fname):
+ with xopen(fname, "rt") as f:
lines = list(f)
assert len(lines) == 2
- assert lines[1] == 'The second line.\n', fname
+ assert lines[1] == "The second line.\n", fname
-def test_xopen_binary(fname):
- with xopen(fname, 'rb') as f:
+def test_binary(fname):
+ with xopen(fname, "rb") as f:
lines = list(f)
assert len(lines) == 2
- assert lines[1] == b'The second line.\n', fname
-
-
-def test_xopen_binary_no_isal_no_threads(fname, xopen_without_igzip):
- with xopen_without_igzip(fname, 'rb', threads=0) as f:
+ assert lines[1] == b"The second line.\n", fname
+
+
+ at pytest.mark.parametrize("mode", ["b", "", "t"])
+ at pytest.mark.parametrize("threads", [None, 0])
+def test_roundtrip(ext, tmp_path, threads, mode):
+ if ext == ".zst" and threads == 0 and zstandard is None:
+ return
+ path = tmp_path / f"file{ext}"
+ data = b"Hello" if mode == "b" else "Hello"
+ with xopen(path, "w" + mode, threads=threads) as f:
+ f.write(data)
+ with xopen(path, "r" + mode, threads=threads) as f:
+ assert f.read() == data
+
+
+def test_binary_no_isal_no_threads(fname, xopen_without_igzip):
+ if fname.endswith(".zst") and zstandard is None:
+ return
+ with xopen_without_igzip(fname, "rb", threads=0) as f:
lines = list(f)
assert len(lines) == 2
- assert lines[1] == b'The second line.\n', fname
+ assert lines[1] == b"The second line.\n", fname
-def test_xopen_binary_no_isal(fname, xopen_without_igzip):
- with xopen_without_igzip(fname, 'rb', threads=1) as f:
+def test_binary_no_isal(fname, xopen_without_igzip):
+ with xopen_without_igzip(fname, "rb", threads=1) as f:
lines = list(f)
assert len(lines) == 2
- assert lines[1] == b'The second line.\n', fname
+ assert lines[1] == b"The second line.\n", fname
def test_no_context_manager_text(fname):
- f = xopen(fname, 'rt')
+ f = xopen(fname, "rt")
lines = list(f)
assert len(lines) == 2
- assert lines[1] == 'The second line.\n', fname
+ assert lines[1] == "The second line.\n", fname
f.close()
assert f.closed
def test_no_context_manager_binary(fname):
- f = xopen(fname, 'rb')
+ f = xopen(fname, "rb")
lines = list(f)
assert len(lines) == 2
- assert lines[1] == b'The second line.\n', fname
+ assert lines[1] == b"The second line.\n", fname
f.close()
assert f.closed
-def test_readinto(fname):
- content = CONTENT.encode('utf-8')
- with xopen(fname, 'rb') as f:
- b = bytearray(len(content) + 100)
- length = f.readinto(b)
- assert length == len(content)
- assert b[:length] == content
+def test_bytes_path(fname):
+ path = fname.encode("utf-8")
+ with xopen(path, "rt") as f:
+ lines = list(f)
+ assert len(lines) == 2
+ assert lines[1] == "The second line.\n", fname
-def test_reader_readinto(reader):
- opener, extension = reader
- content = CONTENT.encode('utf-8')
- with opener(f"tests/file.txt{extension}", "rb") as f:
+def test_readinto(fname):
+ content = CONTENT.encode("utf-8")
+ with xopen(fname, "rb") as f:
b = bytearray(len(content) + 100)
length = f.readinto(b)
assert length == len(content)
assert b[:length] == content
-def test_reader_textiowrapper(reader):
- opener, extension = reader
- with opener(f"tests/file.txt{extension}", "rb") as f:
- wrapped = io.TextIOWrapper(f)
- assert wrapped.read() == CONTENT
+def test_detect_format_from_content(ext):
+ detected = _detect_format_from_content(Path(__file__).parent / f"file.txt{ext}")
+ if ext == "":
+ assert detected is None
+ else:
+ assert ext[1:] == detected
-def test_detect_file_format_from_content(ext):
- with xopen(f"tests/file.txt{ext}.test", "rb") as fh:
+def test_detect_file_format_from_content(ext, tmp_path):
+ path = tmp_path / f"file.txt{ext}.test"
+ shutil.copy(TEST_DIR / f"file.txt{ext}", path)
+ with xopen(path, "rb") as fh:
assert fh.readline() == CONTENT_LINES[0].encode("utf-8")
def test_readline(fname):
- first_line = CONTENT_LINES[0].encode('utf-8')
- with xopen(fname, 'rb') as f:
+ first_line = CONTENT_LINES[0].encode("utf-8")
+ with xopen(fname, "rb") as f:
assert f.readline() == first_line
def test_readline_text(fname):
- with xopen(fname, 'r') as f:
- assert f.readline() == CONTENT_LINES[0]
-
-
-def test_reader_readline(reader):
- opener, extension = reader
- first_line = CONTENT_LINES[0].encode('utf-8')
- with opener(f"tests/file.txt{extension}", "rb") as f:
- assert f.readline() == first_line
-
-
-def test_reader_readline_text(reader):
- opener, extension = reader
- with opener(f"tests/file.txt{extension}", "r") as f:
+ with xopen(fname, "r") as f:
assert f.readline() == CONTENT_LINES[0]
- at pytest.mark.parametrize("threads", [None, 1, 2])
-def test_piped_reader_iter(threads, threaded_reader):
- opener, extension = threaded_reader
- with opener(f"tests/file.txt{extension}", mode="r", threads=threads) as f:
- lines = list(f)
- assert lines[0] == CONTENT_LINES[0]
-
-
def test_next(fname):
with xopen(fname, "rt") as f:
_ = next(f)
line2 = next(f)
- assert line2 == 'The second line.\n', fname
+ assert line2 == "The second line.\n", fname
-def test_xopen_has_iter_method(ext, tmpdir):
- path = str(tmpdir.join("out" + ext))
- with xopen(path, mode='w') as f:
- assert hasattr(f, '__iter__')
-
-
-def test_writer_has_iter_method(tmpdir, writer):
- opener, extension = writer
- with opener(str(tmpdir.join(f"out.{extension}"))) as f:
- assert hasattr(f, '__iter__')
+def test_has_iter_method(ext, tmp_path):
+ path = tmp_path / f"out{ext}"
+ with xopen(path, mode="w") as f:
+ # Writing anything isn’t strictly necessary, but if we don’t, then
+ # pbzip2 causes a delay of one second
+ f.write("hello")
+ assert hasattr(f, "__iter__")
def test_iter_without_with(fname):
@@ -320,22 +215,6 @@ def test_iter_without_with(fname):
f.close()
-def test_reader_iter_without_with(reader):
- opener, extension = reader
- it = iter(opener(f"tests/file.txt{extension}"))
- assert CONTENT_LINES[0] == next(it)
-
-
- at pytest.mark.parametrize("mode", ["rb", "rt"])
-def test_reader_close(mode, reader, create_large_file):
- reader, extension = reader
- large_file = create_large_file(extension)
- with reader(large_file, mode=mode) as f:
- f.readline()
- time.sleep(0.2)
- # The subprocess should be properly terminated now
-
-
@pytest.mark.parametrize("extension", [".gz", ".bz2"])
def test_partial_iteration_closes_correctly(extension, create_large_file):
class LineReader:
@@ -343,8 +222,9 @@ def test_partial_iteration_closes_correctly(extension, create_large_file):
self.file = xopen(file, "rb")
def __iter__(self):
- wrapper = io.TextIOWrapper(self.file)
+ wrapper = io.TextIOWrapper(self.file, encoding="utf-8")
yield from wrapper
+
large_file = create_large_file(extension)
f = LineReader(large_file)
next(iter(f))
@@ -353,19 +233,19 @@ def test_partial_iteration_closes_correctly(extension, create_large_file):
def test_nonexisting_file(ext):
with pytest.raises(IOError):
- with xopen('this-file-does-not-exist' + ext):
+ with xopen("this-file-does-not-exist" + ext):
pass # pragma: no cover
def test_write_to_nonexisting_dir(ext):
with pytest.raises(IOError):
- with xopen('this/path/does/not/exist/file.txt' + ext, 'w'):
+ with xopen("this/path/does/not/exist/file.txt" + ext, "w"):
pass # pragma: no cover
def test_invalid_mode(ext):
with pytest.raises(ValueError):
- with xopen(f"tests/file.txt.{ext}", mode="hallo"):
+ with xopen(TEST_DIR / f"file.txt.{ext}", mode="hallo"):
pass # pragma: no cover
@@ -375,28 +255,18 @@ def test_filename_not_a_string():
pass # pragma: no cover
-def test_invalid_compression_level(tmpdir):
- path = str(tmpdir.join("out.gz"))
+def test_invalid_compression_level(tmp_path):
with pytest.raises(ValueError) as e:
- with xopen(path, mode="w", compresslevel=17) as f:
- f.write("hello") # pragma: no cover
- assert "compresslevel must be" in e.value.args[0]
-
-
-def test_invalid_compression_level_writers(gzip_writer, tmpdir):
- # Currently only gzip writers handle compression levels
- path = str(tmpdir.join("out.gz"))
- with pytest.raises(ValueError) as e:
- with gzip_writer(path, mode="w", compresslevel=17) as f:
+ with xopen(tmp_path / "out.gz", mode="w", compresslevel=17) as f:
f.write("hello") # pragma: no cover
assert "compresslevel must be" in e.value.args[0]
@pytest.mark.parametrize("ext", extensions)
-def test_append(ext, tmpdir):
+def test_append(ext, tmp_path):
text = b"AB"
reference = text + text
- path = str(tmpdir.join("the-file" + ext))
+ path = tmp_path / f"the-file{ext}"
with xopen(path, "ab") as f:
f.write(text)
with xopen(path, "ab") as f:
@@ -409,10 +279,10 @@ def test_append(ext, tmpdir):
@pytest.mark.parametrize("ext", extensions)
-def test_append_text(ext, tmpdir):
+def test_append_text(ext, tmp_path):
text = "AB"
reference = text + text
- path = str(tmpdir.join("the-file" + ext))
+ path = tmp_path / f"the-file{ext}"
with xopen(path, "at") as f:
f.write(text)
with xopen(path, "at") as f:
@@ -423,91 +293,50 @@ def test_append_text(ext, tmpdir):
assert appended == reference
-class TookTooLongError(Exception):
- pass
-
-
-class timeout:
- # copied from https://stackoverflow.com/a/22348885/715090
- def __init__(self, seconds=1):
- self.seconds = seconds
-
- def handle_timeout(self, signum, frame):
- raise TookTooLongError() # pragma: no cover
-
- def __enter__(self):
- signal.signal(signal.SIGALRM, self.handle_timeout)
- signal.alarm(self.seconds)
-
- def __exit__(self, type, value, traceback):
- signal.alarm(0)
-
-
- at pytest.mark.parametrize("extension", [".gz", ".bz2"])
+ at pytest.mark.timeout(5)
+ at pytest.mark.parametrize("extension", [".gz", ".bz2", ".xz"])
def test_truncated_file(extension, create_truncated_file):
truncated_file = create_truncated_file(extension)
- with timeout(seconds=2):
- with pytest.raises((EOFError, IOError)):
- f = xopen(truncated_file, "r")
- f.read()
- f.close() # pragma: no cover
+ with pytest.raises((EOFError, IOError)):
+ f = xopen(truncated_file, "r")
+ f.read()
+ f.close() # pragma: no cover
- at pytest.mark.parametrize("extension", [".gz", ".bz2"])
+ at pytest.mark.timeout(5)
+ at pytest.mark.parametrize("extension", [".gz", ".bz2", ".xz"])
def test_truncated_iter(extension, create_truncated_file):
truncated_file = create_truncated_file(extension)
- with timeout(seconds=2):
- with pytest.raises((EOFError, IOError)):
- f = xopen(truncated_file, 'r')
- for line in f:
- pass
- f.close() # pragma: no cover
+ with pytest.raises((EOFError, IOError)):
+ f = xopen(truncated_file, "r")
+ for line in f:
+ pass
+ f.close() # pragma: no cover
- at pytest.mark.parametrize("extension", [".gz", ".bz2"])
+ at pytest.mark.timeout(5)
+ at pytest.mark.parametrize("extension", [".gz", ".bz2", ".xz"])
def test_truncated_with(extension, create_truncated_file):
truncated_file = create_truncated_file(extension)
- with timeout(seconds=2):
- with pytest.raises((EOFError, IOError)):
- with xopen(truncated_file, 'r') as f:
- f.read()
+ with pytest.raises((EOFError, IOError)):
+ with xopen(truncated_file, "r") as f:
+ f.read()
- at pytest.mark.parametrize("extension", [".gz", ".bz2"])
+ at pytest.mark.timeout(5)
+ at pytest.mark.parametrize("extension", [".gz", ".bz2", ".xz"])
def test_truncated_iter_with(extension, create_truncated_file):
truncated_file = create_truncated_file(extension)
- with timeout(seconds=2):
- with pytest.raises((EOFError, IOError)):
- with xopen(truncated_file, 'r') as f:
- for line in f:
- pass
+ with pytest.raises((EOFError, IOError)):
+ with xopen(truncated_file, "r") as f:
+ for line in f:
+ pass
def test_bare_read_from_gz():
- with xopen('tests/hello.gz', 'rt') as f:
- assert f.read() == 'hello'
-
-
-def test_readers_read(reader):
- opener, extension = reader
- with opener(f'tests/file.txt{extension}', 'rt') as f:
- assert f.read() == CONTENT
-
-
-def test_write_threads(tmpdir, ext):
- path = str(tmpdir.join(f'out.{ext}'))
- with xopen(path, mode='w', threads=3) as f:
- f.write('hello')
- with xopen(path) as f:
- assert f.read() == 'hello'
-
-
-def test_write_pigz_threads_no_isal(tmpdir, xopen_without_igzip):
- path = str(tmpdir.join('out.gz'))
- with xopen_without_igzip(path, mode='w', threads=3) as f:
- f.write('hello')
- with xopen_without_igzip(path) as f:
- assert f.read() == 'hello'
+ hello_file = TEST_DIR / "hello.gz"
+ with xopen(hello_file, "rt") as f:
+ assert f.read() == "hello"
def test_read_no_threads(ext):
@@ -515,37 +344,59 @@ def test_read_no_threads(ext):
".bz2": bz2.BZ2File,
".gz": gzip.GzipFile,
".xz": lzma.LZMAFile,
+ ".zst": io.BufferedReader,
"": io.BufferedReader,
}
+ if ext == ".zst" and zstandard is None:
+ return
klass = klasses[ext]
- with xopen(f"tests/file.txt{ext}", "rb", threads=0) as f:
+ with xopen(TEST_DIR / f"file.txt{ext}", "rb", threads=0) as f:
assert isinstance(f, klass), f
-def test_write_no_threads(tmpdir, ext):
+def test_write_threads(tmp_path, ext):
+ path = tmp_path / f"out.{ext}"
+ with xopen(path, mode="w", threads=3) as f:
+ f.write("hello")
+ with xopen(path) as f:
+ assert f.read() == "hello"
+
+
+def test_write_pigz_threads_no_isal(tmp_path, xopen_without_igzip):
+ path = tmp_path / "out.gz"
+ with xopen_without_igzip(path, mode="w", threads=3) as f:
+ f.write("hello")
+ with xopen_without_igzip(path) as f:
+ assert f.read() == "hello"
+
+
+def test_write_no_threads(tmp_path, ext):
klasses = {
".bz2": bz2.BZ2File,
".gz": gzip.GzipFile,
".xz": lzma.LZMAFile,
"": io.BufferedWriter,
}
+ if ext == ".zst":
+ # Skip zst because if python-zstandard is not installed,
+ # we fall back to an external process even when threads=0
+ return
klass = klasses[ext]
- path = str(tmpdir.join(f"out.{ext}"))
- with xopen(path, "wb", threads=0) as f:
+ with xopen(tmp_path / f"out{ext}", "wb", threads=0) as f:
assert isinstance(f, io.BufferedWriter)
if ext:
assert isinstance(f.raw, klass), f
-def test_write_gzip_no_threads_no_isal(tmpdir, xopen_without_igzip):
+def test_write_gzip_no_threads_no_isal(tmp_path, xopen_without_igzip):
import gzip
- path = str(tmpdir.join("out.gz"))
- with xopen_without_igzip(path, "wb", threads=0) as f:
+
+ with xopen_without_igzip(tmp_path / "out.gz", "wb", threads=0) as f:
assert isinstance(f.raw, gzip.GzipFile), f
def test_write_stdout():
- f = xopen('-', mode='w')
+ f = xopen("-", mode="w")
print("Hello", file=f)
f.close()
# ensure stdout is not closed
@@ -554,7 +405,7 @@ def test_write_stdout():
def test_write_stdout_contextmanager():
# Do not close stdout
- with xopen('-', mode='w') as f:
+ with xopen("-", mode="w") as f:
print("Hello", file=f)
# ensure stdout is not closed
print("Still there?")
@@ -562,71 +413,58 @@ def test_write_stdout_contextmanager():
def test_read_pathlib(fname):
path = Path(fname)
- with xopen(path, mode='rt') as f:
+ with xopen(path, mode="rt") as f:
assert f.read() == CONTENT
def test_read_pathlib_binary(fname):
path = Path(fname)
- with xopen(path, mode='rb') as f:
- assert f.read() == bytes(CONTENT, 'ascii')
-
-
-def test_write_pathlib(ext, tmpdir):
- path = Path(str(tmpdir)) / ('hello.txt' + ext)
- with xopen(path, mode='wt') as f:
- f.write('hello')
- with xopen(path, mode='rt') as f:
- assert f.read() == 'hello'
-
+ with xopen(path, mode="rb") as f:
+ assert f.read() == bytes(CONTENT, "ascii")
-def test_write_pathlib_binary(ext, tmpdir):
- path = Path(str(tmpdir)) / ('hello.txt' + ext)
- with xopen(path, mode='wb') as f:
- f.write(b'hello')
- with xopen(path, mode='rb') as f:
- assert f.read() == b'hello'
+def test_write_pathlib(ext, tmp_path):
+ path = tmp_path / f"hello.txt{ext}"
+ with xopen(path, mode="wt") as f:
+ f.write("hello")
+ with xopen(path, mode="rt") as f:
+ assert f.read() == "hello"
-def test_concatenated_gzip_function():
- assert _can_read_concatenated_gz("gzip") is True
- assert _can_read_concatenated_gz("pigz") is True
- assert _can_read_concatenated_gz("xz") is False
-
- at pytest.mark.skipif(
- not hasattr(fcntl, "F_GETPIPE_SZ") or _MAX_PIPE_SIZE is None,
- reason="Pipe size modifications not available on this platform.")
-def test_pipesize_changed(tmpdir):
- path = Path(str(tmpdir), "hello.gz")
- with xopen(path, "wb") as f:
- assert isinstance(f, PipedCompressionWriter)
- assert fcntl.fcntl(f._file.fileno(),
- fcntl.F_GETPIPE_SZ) == _MAX_PIPE_SIZE
+def test_write_pathlib_binary(ext, tmp_path):
+ path = tmp_path / f"hello.txt{ext}"
+ with xopen(path, mode="wb") as f:
+ f.write(b"hello")
+ with xopen(path, mode="rb") as f:
+ assert f.read() == b"hello"
-def test_xopen_falls_back_to_gzip_open(lacking_pigz_permissions):
- with xopen("tests/file.txt.gz", "rb") as f:
+def test_falls_back_to_gzip_open(lacking_pigz_permissions):
+ with xopen(TEST_DIR / "file.txt.gz", "rb") as f:
assert f.readline() == CONTENT_LINES[0].encode("utf-8")
-def test_xopen_falls_back_to_gzip_open_no_isal(lacking_pigz_permissions,
- xopen_without_igzip):
- with xopen_without_igzip("tests/file.txt.gz", "rb") as f:
+def test_falls_back_to_gzip_open_no_isal(lacking_pigz_permissions, xopen_without_igzip):
+ with xopen_without_igzip(TEST_DIR / "file.txt.gz", "rb") as f:
assert f.readline() == CONTENT_LINES[0].encode("utf-8")
-def test_xopen_fals_back_to_gzip_open_write_no_isal(lacking_pigz_permissions,
- xopen_without_igzip,
- tmp_path):
+def test_fals_back_to_gzip_open_write_no_isal(
+ lacking_pigz_permissions, xopen_without_igzip, tmp_path
+):
tmp = tmp_path / "test.gz"
with xopen_without_igzip(tmp, "wb") as f:
f.write(b"hello")
assert gzip.decompress(tmp.read_bytes()) == b"hello"
-def test_xopen_falls_back_to_bzip2_open(lacking_pbzip2_permissions):
- with xopen("tests/file.txt.bz2", "rb") as f:
+def test_falls_back_to_bzip2_open(lacking_pbzip2_permissions):
+ with xopen(TEST_DIR / "file.txt.bz2", "rb") as f:
+ assert f.readline() == CONTENT_LINES[0].encode("utf-8")
+
+
+def test_falls_back_to_lzma_open(lacking_xz_permissions):
+ with xopen(TEST_DIR / "file.txt.xz", "rb") as f:
assert f.readline() == CONTENT_LINES[0].encode("utf-8")
@@ -644,83 +482,87 @@ def test_open_many_writers(tmp_path, ext):
f.close()
-def test_pipedcompressionwriter_wrong_mode(tmpdir):
+def test_override_output_format(tmp_path):
+ path = tmp_path / "test_gzip_compressed"
+ with xopen(path, mode="wb", format="gz") as f:
+ f.write(b"test")
+ test_contents = path.read_bytes()
+ assert test_contents.startswith(b"\x1f\x8b") # Gzip magic
+ assert gzip.decompress(test_contents) == b"test"
+
+
+def test_override_output_format_unsupported_format(tmp_path):
+ path = tmp_path / "test_fairy_format_compressed"
with pytest.raises(ValueError) as error:
- PipedCompressionWriter(tmpdir.join("test"), ["gzip"], "xb")
- error.match("Mode is 'xb', but it must be")
+ xopen(path, mode="wb", format="fairy")
+ error.match("not supported")
+ error.match("fairy")
-def test_pipedcompressionwriter_wrong_program(tmpdir):
- with pytest.raises(OSError):
- PipedCompressionWriter(tmpdir.join("test"), ["XVXCLSKDLA"], "wb")
+def test_override_output_format_wrong_format(tmp_path):
+ path = tmp_path / "not_compressed"
+ path.write_text("I am not compressed.", encoding="utf-8")
+ with pytest.raises(OSError): # BadGzipFile is a subclass of OSError
+ with xopen(path, "rt", format="gz") as opened_file:
+ opened_file.read()
-def test_compression_level(tmpdir, gzip_writer):
- # Currently only the gzip writers handle compression levels.
- with gzip_writer(tmpdir.join("test.gz"), "wt", 2) as test_h:
- test_h.write("test")
- assert gzip.decompress(Path(tmpdir.join("test.gz")).read_bytes()) == b"test"
+# Test for threaded and non-threaded.
+OPENERS = (xopen, functools.partial(xopen, threads=0))
-def test_iter_method_writers(writer, tmpdir):
- opener, extension = writer
- test_path = tmpdir.join(f"test{extension}")
- writer = opener(test_path, "wb")
- assert iter(writer) == writer
+ at pytest.mark.parametrize(
+ ["opener", "extension"], itertools.product(OPENERS, extensions)
+)
+def test_text_encoding_newline_passthrough(opener, extension, tmp_path):
+ if extension == ".zst" and zstandard is None:
+ return
+ # "Eén ree\nTwee reeën\n" latin-1 encoded with \r for as line separator.
+ encoded_text = b"E\xe9n ree\rTwee ree\xebn\r"
+ path = tmp_path / f"test.txt{extension}"
+ with opener(path, "wb") as f:
+ f.write(encoded_text)
+ with opener(path, "rt", encoding="latin-1", newline="\r") as f:
+ result = f.read()
+ assert result == "Eén ree\rTwee reeën\r"
+
+
+ at pytest.mark.parametrize(
+ ["opener", "extension"], itertools.product(OPENERS, extensions)
+)
+def test_text_encoding_errors(opener, extension, tmp_path):
+ if extension == ".zst" and zstandard is None:
+ return
+ # "Eén ree\nTwee reeën\n" latin-1 encoded. This is not valid ascii.
+ encoded_text = b"E\xe9n ree\nTwee ree\xebn\n"
+ path = tmp_path / f"test.txt{extension}"
+ with opener(path, "wb") as f:
+ f.write(encoded_text)
+ with opener(path, "rt", encoding="ascii", errors="replace") as f:
+ result = f.read()
+ assert result == "E�n ree\nTwee ree�n\n"
+
+
+ at pytest.mark.parametrize("compresslevel", [1, 6])
+def test_gzip_compression_is_reproducible_without_piping(tmp_path, compresslevel):
+ # compresslevel 1 should give us igzip and 6 should give us regular gzip
+ path = tmp_path / "test.gz"
+ with xopen(path, mode="wb", compresslevel=compresslevel, threads=0) as f:
+ f.write(b"hello")
+ data = path.read_bytes()
+ assert (data[3] & gzip.FNAME) == 0, "gzip header contains file name"
+ assert data[4:8] == b"\0\0\0\0", "gzip header contains mtime"
-def test_next_method_writers(writer, tmpdir):
- opener, extension = writer
- test_path = tmpdir.join(f"test.{extension}")
- writer = opener(test_path, "wb")
- with pytest.raises(io.UnsupportedOperation) as error:
- next(writer)
- error.match('not readable')
+def test_read_devnull():
+ with xopen(os.devnull):
+ pass
-def test_pipedcompressionreader_wrong_mode():
- with pytest.raises(ValueError) as error:
- PipedCompressionReader("test", ["gzip"], "xb")
- error.match("Mode is 'xb', but it must be")
-
-
-def test_piped_compression_reader_peek_binary(reader):
- opener, extension = reader
- filegz = Path(__file__).parent / f"file.txt{extension}"
- with opener(filegz, "rb") as read_h:
- # Peek returns at least the amount of characters but maybe more
- # depending on underlying stream. Hence startswith not ==.
- assert read_h.peek(1).startswith(b"T")
-
-
- at pytest.mark.parametrize("mode", ["r", "rt"])
-def test_piped_compression_reader_peek_text(reader, mode):
- opener, extension = reader
- compressed_file = Path(__file__).parent / f"file.txt{extension}"
- with opener(compressed_file, mode) as read_h:
- with pytest.raises(AttributeError):
- read_h.peek(1)
-
-
-def writers_and_levels():
- for writer in PIPED_GZIP_WRITERS:
- if writer == PipedGzipWriter:
- # Levels 1-9 are supported
- yield from ((writer, i) for i in range(1, 10))
- elif writer == PipedPigzWriter:
- # Levels 0-9 + 11 are supported
- yield from ((writer, i) for i in list(range(10)) + [11])
- elif writer == PipedIGzipWriter or writer == PipedPythonIsalWriter:
- # Levels 0-3 are supported
- yield from ((writer, i) for i in range(4))
- else:
- raise NotImplementedError(f"Test should be implemented for "
- f"{writer}") # pragma: no cover
-
-
- at pytest.mark.parametrize(["writer", "level"], writers_and_levels())
-def test_valid_compression_levels(writer, level, tmpdir):
- test_file = tmpdir.join("test.gz")
- with writer(test_file, "wb", level) as handle:
- handle.write(b"test")
- assert gzip.decompress(Path(test_file).read_bytes()) == b"test"
+def test_xopen_zst_fails_when_zstandard_not_available(monkeypatch):
+ import xopen
+
+ monkeypatch.setattr(xopen, "zstandard", None)
+ with pytest.raises(ImportError):
+ with xopen.xopen(TEST_DIR / "file.txt.zst", mode="rb", threads=0) as f:
+ f.read()
=====================================
tox.ini
=====================================
@@ -1,22 +1,31 @@
[tox]
-envlist = flake8,mypy,py36,py37,py38,py39,pypy3
+envlist = flake8,mypy,py37,py38,py39,py310,py311,pypy3
+isolated_build = True
[testenv]
deps =
pytest
+ pytest-timeout
coverage
-setenv = PYTHONDEVMODE = 1
+setenv =
+ PYTHONDEVMODE = 1
+ PYTHONWARNDEFAULTENCODING = 1
commands =
coverage run --branch --source=xopen,tests -m pytest -v --doctest-modules tests
coverage report
coverage xml
coverage html
-[testenv:isal]
+[testenv:zstd]
deps =
- pytest
- coverage
- isal
+ {[testenv]deps}
+ zstandard
+
+[testenv:black]
+basepython = python3.7
+deps = black==22.3.0
+skip_install = true
+commands = black --check src/ tests/
[testenv:flake8]
basepython = python3.7
@@ -39,3 +48,4 @@ extend_ignore = E731
exclude_lines =
pragma: no cover
def __repr__
+ @overload
View it on GitLab: https://salsa.debian.org/python-team/packages/python-xopen/-/commit/0cba8325fc85a54dcfb54ed137de37aeaa2a657b
--
View it on GitLab: https://salsa.debian.org/python-team/packages/python-xopen/-/commit/0cba8325fc85a54dcfb54ed137de37aeaa2a657b
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20221220/0a31dc64/attachment-0001.htm>
More information about the debian-med-commit
mailing list