[med-svn] [Git][med-team/python-dnaio][upstream] New upstream version 0.5.0

Nilesh Patra gitlab at salsa.debian.org
Sun Dec 20 12:22:26 GMT 2020



Nilesh Patra pushed to branch upstream at Debian Med / python-dnaio


Commits:
1ebf37f8 by Nilesh Patra at 2020-12-20T17:49:29+05:30
New upstream version 0.5.0
- - - - -


14 changed files:

- + .github/workflows/ci.yml
- − .travis.yml
- README.md
- buildwheels.sh
- setup.py
- src/dnaio/__init__.py
- src/dnaio/_core.pyi
- src/dnaio/_util.py
- src/dnaio/chunks.py
- src/dnaio/exceptions.py
- src/dnaio/readers.py
- src/dnaio/writers.py
- tests/test_internal.py
- tox.ini


Changes:

=====================================
.github/workflows/ci.yml
=====================================
@@ -0,0 +1,72 @@
+name: CI
+
+on: [push, pull_request]
+
+jobs:
+  lint:
+    timeout-minutes: 5
+    runs-on: ubuntu-latest
+    strategy:
+      matrix:
+        python-version: [3.7]
+        toxenv: [flake8, mypy]
+    steps:
+    - uses: actions/checkout at v2
+    - name: Set up Python ${{ matrix.python-version }}
+      uses: actions/setup-python at v2
+      with:
+        python-version: ${{ matrix.python-version }}
+    - name: Install dependencies
+      run: python -m pip install tox
+    - name: Run tox ${{ matrix.toxenv }}
+      run: tox -e ${{ matrix.toxenv }}
+
+  test:
+    timeout-minutes: 5
+    runs-on: ${{ matrix.os }}
+    strategy:
+      matrix:
+        python-version: [3.6, 3.7, 3.8, 3.9]
+        os: [ubuntu-latest]
+        include:
+        - python-version: 3.8
+          os: macos-latest
+    steps:
+    - uses: actions/checkout at v2
+    - name: Set up Python ${{ matrix.python-version }}
+      uses: actions/setup-python at v2
+      with:
+        python-version: ${{ matrix.python-version }}
+    - name: Install dependencies
+      run: python -m pip install tox
+    - name: Test
+      run: tox -e py
+    - name: Upload coverage report
+      uses: codecov/codecov-action at v1
+
+  deploy:
+    timeout-minutes: 5
+    runs-on: ubuntu-latest
+    needs: [lint, test]
+    if: startsWith(github.ref, 'refs/tags')
+    steps:
+    - uses: actions/checkout at v2
+      with:
+        fetch-depth: 0  # required for setuptools_scm
+    - name: Set up Python
+      uses: actions/setup-python at v2
+      with:
+        python-version: 3.7
+    - name: Make distributions
+      run: |
+        python -m pip install Cython
+        python setup.py sdist
+        ./buildwheels.sh
+        ls -l dist/
+    - name: Publish to PyPI
+      uses: pypa/gh-action-pypi-publish at v1.4.1
+      with:
+        user: __token__
+        password: ${{ secrets.pypi_password }}
+        #password: ${{ secrets.test_pypi_password }}
+        #repository_url: https://test.pypi.org/legacy/


=====================================
.travis.yml deleted
=====================================
@@ -1,54 +0,0 @@
-language: python
-
-cache:
-  directories:
-    - $HOME/.cache/pip
-
-python:
-  - "3.5"
-  - "3.6"
-  - "3.7"
-  - "3.8"
-  - "3.9"
-  - "nightly"
-
-install:
-  - pip install --upgrade coverage codecov
-  - pip install .[dev]
-
-script:
-  - coverage run -m pytest
-
-after_success:
-  - coverage combine
-  - codecov
-
-env:
-  global:
-#    - TWINE_REPOSITORY_URL=https://test.pypi.org/legacy/
-    - TWINE_USERNAME=marcelm
-    # TWINE_PASSWORD is set in Travis settings
-
-jobs:
-  include:
-    - stage: deploy
-      services:
-        - docker
-      python: "3.6"
-      install: python3 -m pip install Cython twine
-      if: tag IS present
-      script:
-        - |
-          python3 setup.py sdist
-          ./buildwheels.sh
-          ls -l dist/
-          python3 -m twine upload dist/*
-
-    - stage: test
-      name: flake8
-      python: "3.6"
-      install: python3 -m pip install flake8
-      script: flake8 src/ tests/
-
-  allow_failures:
-    - python: "nightly"


=====================================
README.md
=====================================
@@ -1,10 +1,10 @@
-[![Travis](https://travis-ci.org/marcelm/dnaio.svg?branch=master)](https://travis-ci.org/marcelm/dnaio)
+![CI](https://github.com/marcelm/dnaio/workflows/CI/badge.svg)
 [![PyPI](https://img.shields.io/pypi/v/dnaio.svg?branch=master)](https://pypi.python.org/pypi/dnaio)
 [![Codecov](https://codecov.io/gh/marcelm/dnaio/branch/master/graph/badge.svg)](https://codecov.io/gh/marcelm/dnaio)
 
 # dnaio parses FASTQ and FASTA
 
-`dnaio` is a Python 3.5+ library for fast parsing of FASTQ and also FASTA files. The code was previously part of the
+`dnaio` is a Python 3.6+ library for fast parsing of FASTQ and also FASTA files. The code was previously part of the
 [Cutadapt](https://cutadapt.readthedocs.io/) tool and has been improved since it has been split out.
 
 


=====================================
buildwheels.sh
=====================================
@@ -16,7 +16,7 @@ manylinux=quay.io/pypa/manylinux2010_x86_64
 
 # For convenience, if this script is called from outside of a docker container,
 # it starts a container and runs itself inside of it.
-if ! grep -q docker /proc/1/cgroup; then
+if ! grep -q docker /proc/1/cgroup && ! test -d /opt/python; then
   # We are not inside a container
   docker pull ${manylinux}
   exec docker run --rm -v $(pwd):/io ${manylinux} /io/$0
@@ -32,7 +32,7 @@ STRIP_FLAGS=${STRIP_FLAGS:-"-Wl,-strip-all"}
 export CFLAGS="${CFLAGS:-$STRIP_FLAGS}"
 export CXXFLAGS="${CXXFLAGS:-$STRIP_FLAGS}"
 
-for PYBIN in /opt/python/cp3[5678]-*/bin; do
+for PYBIN in /opt/python/cp3[6789]-*/bin; do
     ${PYBIN}/pip wheel --no-deps /io/ -w wheelhouse/
 done
 ls wheelhouse/


=====================================
setup.py
=====================================
@@ -1,13 +1,8 @@
-import sys
 import os.path
 from setuptools import setup, Extension, find_packages
 from distutils.command.sdist import sdist as _sdist
 from distutils.command.build_ext import build_ext as _build_ext
 
-if sys.version_info[:2] < (3, 5):
-    sys.stdout.write('Python 3.5 or later is required\n')
-    sys.exit(1)
-
 
 def no_cythonize(extensions, **_ignore):
     """Change .pyx to .c or .cpp (copied from Cython documentation)"""
@@ -75,7 +70,7 @@ setup(
     ext_modules=extensions,
     cmdclass={'build_ext': BuildExt, 'sdist': SDist},
     install_requires=['xopen>=0.8.2'],
-    python_requires='>=3.5',
+    python_requires='>=3.6',
     classifiers=[
             "Development Status :: 5 - Production/Stable",
             "Intended Audience :: Science/Research",


=====================================
src/dnaio/__init__.py
=====================================
@@ -18,17 +18,19 @@ __all__ = [
     'PairedSequenceReader',
     'read_chunks',
     'read_paired_chunks',
+    'record_names_match',
     '__version__',
 ]
 
 import os
+from os import fspath, PathLike
 from contextlib import ExitStack
 import functools
-import pathlib
+from typing import Optional, Union, BinaryIO, Tuple, Iterator
 
 from xopen import xopen
 
-from ._core import Sequence, record_names_match as _record_names_match
+from ._core import Sequence, record_names_match
 from .readers import FastaReader, FastqReader
 from .writers import FastaWriter, FastqWriter
 from .exceptions import UnknownFileFormat, FileFormatError, FastaFormatError, FastqFormatError
@@ -36,21 +38,28 @@ from .chunks import read_chunks, read_paired_chunks
 from ._version import version as __version__
 from ._util import _is_path
 
-try:
-    from os import fspath  # Exists in Python 3.6+
-except ImportError:
-    def fspath(path):
-        if hasattr(path, "__fspath__"):
-            return path.__fspath__()
-        # Python 3.4 and 3.5 do not support the file system path protocol
-        if isinstance(path, pathlib.Path):
-            return str(path)
-        return path
-
 
 def open(
-    file1, *, file2=None, fileformat=None, interleaved=False, mode="r", qualities=None, opener=xopen
-):
+    file1: Union[str, PathLike, BinaryIO],
+    *,
+    file2: Optional[Union[str, PathLike, BinaryIO]] = None,
+    fileformat: Optional[str] = None,
+    interleaved: bool = False,
+    mode: str = "r",
+    qualities: Optional[bool] = None,
+    opener=xopen
+) -> Union[
+    FastaReader,
+    FastaWriter,
+    FastqReader,
+    FastqWriter,
+    "PairedSequenceReader",
+    "PairedSequenceWriter",
+    "PairedSequenceAppender",
+    "InterleavedSequenceReader",
+    "InterleavedSequenceWriter",
+    "InterleavedSequenceAppender",
+]:
     """
     Open sequence files in FASTA or FASTQ format for reading or writing. This is
     a factory that returns an instance of one of the ...Reader or ...Writer
@@ -109,7 +118,7 @@ def open(
         file1, opener=opener, fileformat=fileformat, mode=mode, qualities=qualities)
 
 
-def _detect_format_from_name(name):
+def _detect_format_from_name(name: str) -> Optional[str]:
     """
     name -- file name
 
@@ -128,32 +137,37 @@ def _detect_format_from_name(name):
     return None
 
 
-def _open_single(file, opener, *, fileformat=None, mode="r", qualities=None):
+def _open_single(
+    file_or_path: Union[str, PathLike, BinaryIO],
+    opener,
+    *,
+    fileformat: Optional[str] = None,
+    mode: str = "r",
+    qualities: Optional[bool] = None,
+) -> Union[FastaReader, FastaWriter, FastqReader, FastqWriter]:
     """
     Open a single sequence file. See description of open() above.
     """
     if mode not in ("r", "w", "a"):
         raise ValueError("Mode must be 'r', 'w' or 'a'")
 
-    if _is_path(file):
-        path = fspath(file)
+    path: Optional[str]
+    if _is_path(file_or_path):
+        path = fspath(file_or_path)  # type: ignore
         file = opener(path, mode + "b")
         close_file = True
     else:
-        if mode == 'r' and not hasattr(file, 'readinto'):
+        if mode == 'r' and not hasattr(file_or_path, 'readinto'):
             raise ValueError(
                 'When passing in an open file-like object, it must have been opened in binary mode')
+        file = file_or_path
         if hasattr(file, "name") and isinstance(file.name, str):
             path = file.name
         else:
             path = None
         close_file = False
-    if mode == 'r':
-        fastq_handler = FastqReader
-        fasta_handler = FastaReader
-    else:
-        fastq_handler = FastqWriter
-        fasta_handler = FastaWriter
+    fastq_handler = FastqReader if mode == "r" else FastqWriter
+    fasta_handler = FastaReader if mode == "r" else FastaWriter
     handlers = {
         'fastq': functools.partial(fastq_handler, _close_file=close_file),
         'fasta': functools.partial(fasta_handler, _close_file=close_file),
@@ -198,7 +212,7 @@ def _open_single(file, opener, *, fileformat=None, mode="r", qualities=None):
     return handlers[fileformat](file)
 
 
-def _detect_format_from_content(file):
+def _detect_format_from_content(file: BinaryIO) -> Optional[str]:
     """
     Return 'fasta', 'fastq' or None
     """
@@ -207,7 +221,7 @@ def _detect_format_from_content(file):
         if file.tell() > 0:
             file.seek(-1, 1)
     else:
-        first_char = file.peek(1)[0:1]
+        first_char = file.peek(1)[0:1]  # type: ignore
     formats = {
         b'@': 'fastq',
         b'>': 'fasta',
@@ -226,17 +240,23 @@ class PairedSequenceReader:
     """
     paired = True
 
-    def __init__(self, file1, file2, fileformat=None, opener=xopen):
+    def __init__(
+        self,
+        file1: Union[str, PathLike, BinaryIO],
+        file2: Union[str, PathLike, BinaryIO],
+        fileformat: Optional[str] = None,
+        opener=xopen,
+    ):
         with ExitStack() as stack:
             self.reader1 = stack.enter_context(_open_single(file1, opener=opener, fileformat=fileformat))
             self.reader2 = stack.enter_context(_open_single(file2, opener=opener, fileformat=fileformat))
             self._close = stack.pop_all().close
         self.delivers_qualities = self.reader1.delivers_qualities
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "PairedSequenceReader(file1={}, file2={})".format(self.reader1, self.reader2)
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Tuple[Sequence, Sequence]]:
         """
         Iterate over the paired reads. Each item is a pair of Sequence objects.
         """
@@ -261,13 +281,13 @@ class PairedSequenceReader:
                 raise FileFormatError(
                     "Reads are improperly paired. There are more reads in "
                     "file 1 than in file 2.", line=None) from None
-            if not _record_names_match(r1.name, r2.name):
+            if not record_names_match(r1.name, r2.name):
                 raise FileFormatError(
                     "Reads are improperly paired. Read name '{}' "
                     "in file 1 does not match '{}' in file 2.".format(r1.name, r2.name), line=None) from None
             yield (r1, r2)
 
-    def close(self):
+    def close(self) -> None:
         self._close()
 
     def __enter__(self):
@@ -283,14 +303,21 @@ class InterleavedSequenceReader:
     """
     paired = True
 
-    def __init__(self, file, fileformat=None, opener=xopen):
-        self.reader = _open_single(file, opener=opener, fileformat=fileformat)
+    def __init__(
+        self,
+        file: Union[str, PathLike, BinaryIO],
+        fileformat: Optional[str] = None,
+        opener=xopen,
+    ):
+        reader = _open_single(file, opener=opener, fileformat=fileformat)
+        assert isinstance(reader, (FastaReader, FastqReader))  # for Mypy
+        self.reader = reader
         self.delivers_qualities = self.reader.delivers_qualities
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "InterleavedSequenceReader({})".format(self.reader)
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Tuple[Sequence, Sequence]]:
         it = iter(self.reader)
         for r1 in it:
             try:
@@ -299,13 +326,13 @@ class InterleavedSequenceReader:
                 raise FileFormatError(
                     "Interleaved input file incomplete: Last record "
                     "{!r} has no partner.".format(r1.name), line=None) from None
-            if not _record_names_match(r1.name, r2.name):
+            if not record_names_match(r1.name, r2.name):
                 raise FileFormatError(
                     "Reads are improperly paired. Name {!r} "
                     "(first) does not match {!r} (second).".format(r1.name, r2.name), line=None)
             yield (r1, r2)
 
-    def close(self):
+    def close(self) -> None:
         self.reader.close()
 
     def __enter__(self):
@@ -318,8 +345,17 @@ class InterleavedSequenceReader:
 class PairedSequenceWriter:
     _mode = "w"
 
-    def __init__(self, file1, file2, fileformat='fastq', qualities=None, opener=xopen):
+    def __init__(
+        self,
+        file1: Union[str, PathLike, BinaryIO],
+        file2: Union[str, PathLike, BinaryIO],
+        fileformat: Optional[str] = "fastq",
+        qualities: Optional[bool] = None,
+        opener=xopen,
+    ):
         with ExitStack() as stack:
+            self._writer1: Union[FastaWriter, FastqWriter]
+            self._writer2: Union[FastaWriter, FastqWriter]
             self._writer1 = stack.enter_context(
                 _open_single(
                     file1, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities))
@@ -328,14 +364,14 @@ class PairedSequenceWriter:
                     file2, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities))
             self._close = stack.pop_all().close
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({}, {})".format(self.__class__.__name__, self._writer1, self._writer2)
 
-    def write(self, read1, read2):
+    def write(self, read1, read2) -> None:
         self._writer1.write(read1)
         self._writer2.write(read2)
 
-    def close(self):
+    def close(self) -> None:
         self._close()
 
     def __enter__(self):
@@ -356,19 +392,26 @@ class InterleavedSequenceWriter:
     """
     _mode = "w"
 
-    def __init__(self, file, fileformat='fastq', qualities=None, opener=xopen):
-
-        self._writer = _open_single(
+    def __init__(
+        self,
+        file: Union[str, PathLike, BinaryIO],
+        fileformat: Optional[str] = "fastq",
+        qualities: Optional[bool] = None,
+        opener=xopen,
+    ):
+        writer = _open_single(
             file, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities)
+        assert isinstance(writer, (FastaWriter, FastqWriter))  # only for Mypy
+        self._writer = writer
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({})".format(self.__class__.__name__, self._writer)
 
-    def write(self, read1, read2):
+    def write(self, read1: Sequence, read2: Sequence) -> None:
         self._writer.write(read1)
         self._writer.write(read2)
 
-    def close(self):
+    def close(self) -> None:
         self._writer.close()
 
     def __enter__(self):


=====================================
src/dnaio/_core.pyi
=====================================
@@ -1,4 +1,5 @@
-from typing import Optional, Tuple, Union, Iterable, BinaryIO
+from typing import Optional, Tuple, Union, BinaryIO, Iterator
+
 
 class Sequence:
     name: str
@@ -14,5 +15,5 @@ class Sequence:
 
 def paired_fastq_heads(buf1: Union[bytes,bytearray], buf2: Union[bytes,bytearray], end1: int, end2: int) -> Tuple[int, int]: ...
 # TODO Sequence should be sequence_class, first yielded value is a bool
-def fastq_iter(file: BinaryIO, sequence_class, buffer_size: int) -> Iterable[Sequence]: ...
+def fastq_iter(file: BinaryIO, sequence_class, buffer_size: int) -> Iterator[Sequence]: ...
 def record_names_match(header1: str, header2: str) -> bool: ...


=====================================
src/dnaio/_util.py
=====================================
@@ -1,7 +1,7 @@
 import pathlib
 
 
-def _is_path(obj):
+def _is_path(obj: object) -> bool:
     """
     Return whether the given object looks like a path (str, pathlib.Path or pathlib2.Path)
     """
@@ -9,14 +9,15 @@ def _is_path(obj):
     # On Python 3.6+, this function can be replaced with isinstance(obj, os.PathLike)
     import sys
     if "pathlib2" in sys.modules:
-        import pathlib2
-        path_classes = (str, pathlib.Path, pathlib2.Path)
+        import pathlib2  # type: ignore
+        path_classes = [str, pathlib.Path, pathlib2.Path]
     else:
-        path_classes = (str, pathlib.Path)
-    return isinstance(obj, path_classes)
+        path_classes = [str, pathlib.Path]
+    return isinstance(obj, tuple(path_classes))
 
 
-def shorten(s, n=100):
+def shorten(s: str, n: int = 100) -> str:
+
     """Shorten string s to at most n characters, appending "..." if necessary."""
     if s is None:
         return None


=====================================
src/dnaio/chunks.py
=====================================
@@ -1,10 +1,12 @@
 """Chunked reading of FASTA and FASTQ files"""
+from io import RawIOBase
+from typing import Optional, Iterator, Tuple
 
 from ._core import paired_fastq_heads as _paired_fastq_heads
 from .exceptions import FileFormatError, FastaFormatError, UnknownFileFormat
 
 
-def _fasta_head(buf, end):
+def _fasta_head(buf: bytes, end: Optional[int] = None) -> int:
     """
     Search for the end of the last complete FASTA record within buf[:end]
 
@@ -19,7 +21,7 @@ def _fasta_head(buf, end):
     raise FastaFormatError('File does not start with ">"', line=None)
 
 
-def _fastq_head(buf, end=None):
+def _fastq_head(buf: bytes, end: Optional[int] = None) -> int:
     """
     Search for the end of the last complete *two* FASTQ records in buf[:end].
 
@@ -33,10 +35,10 @@ def _fastq_head(buf, end=None):
     # Note that this works even if linebreaks == 0:
     # rfind() returns -1 and adding 1 gives index 0,
     # which is correct.
-    return right + 1
+    return right + 1  # type: ignore
 
 
-def read_chunks(f, buffer_size=4*1024**2):
+def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memoryview]:
     """
     Read a chunk of complete FASTA or FASTQ records from a file.
     The size of a chunk is at most buffer_size.
@@ -76,7 +78,7 @@ def read_chunks(f, buffer_size=4*1024**2):
     while True:
         if start == len(buf):
             raise OverflowError('FASTA/FASTQ record does not fit into buffer')
-        bufend = f.readinto(memoryview(buf)[start:]) + start
+        bufend = f.readinto(memoryview(buf)[start:]) + start  # type: ignore
         if start == bufend:
             # End of file
             break
@@ -92,7 +94,11 @@ def read_chunks(f, buffer_size=4*1024**2):
         yield memoryview(buf)[0:start]
 
 
-def read_paired_chunks(f, f2, buffer_size=4*1024**2):
+def read_paired_chunks(
+    f: RawIOBase,
+    f2: RawIOBase,
+    buffer_size: int = 4 * 1024**2,
+) -> Iterator[Tuple[memoryview, memoryview]]:
     if buffer_size < 1:
         raise ValueError("Buffer size too small")
 
@@ -100,8 +106,8 @@ def read_paired_chunks(f, f2, buffer_size=4*1024**2):
     buf2 = bytearray(buffer_size)
 
     # Read one byte to make sure we are processing FASTQ
-    start1 = f.readinto(memoryview(buf1)[0:1])
-    start2 = f2.readinto(memoryview(buf2)[0:1])
+    start1 = f.readinto(memoryview(buf1)[0:1])  # type: ignore
+    start2 = f2.readinto(memoryview(buf2)[0:1])  # type: ignore
     if (start1 == 1 and buf1[0:1] != b'@') or (start2 == 1 and buf2[0:1] != b'@'):
         raise FileFormatError(
             "Paired-end data must be in FASTQ format when using multiple cores", line=None)
@@ -109,8 +115,8 @@ def read_paired_chunks(f, f2, buffer_size=4*1024**2):
     while True:
         if start1 == len(buf1) or start2 == len(buf2):
             raise ValueError("FASTQ record does not fit into buffer")
-        bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1
-        bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2
+        bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1  # type: ignore
+        bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2  # type: ignore
         if start1 == bufend1 and start2 == bufend2:
             break
 


=====================================
src/dnaio/exceptions.py
=====================================
@@ -1,10 +1,13 @@
+from typing import Optional
+
+
 class FileFormatError(Exception):
     """
     The file is not formatted correctly
     """
     format = 'sequence'  # Something generic that works for both FASTA and FASTQ
 
-    def __init__(self, msg, line):
+    def __init__(self, msg: str, line: Optional[int]):
         super().__init__(msg, line)
         self.message = msg
         self.line = line  # starts at 0!


=====================================
src/dnaio/readers.py
=====================================
@@ -4,6 +4,8 @@ Classes for reading FASTA and FASTQ files
 __all__ = ['FastaReader', 'FastqReader']
 
 import io
+from typing import Union, BinaryIO, Optional, Iterator, List
+
 from xopen import xopen
 from ._core import fastq_iter as _fastq_iter, Sequence
 from ._util import shorten as _shorten
@@ -15,25 +17,27 @@ class BinaryFileReader:
     A mixin for readers that ensures that a file or a path can be passed in to the constructor.
     """
     _close_on_exit = False
-    paired = False
-    mode = 'rb'
+    paired: bool = False
+    mode: str = 'rb'
 
-    def __init__(self, file, opener=xopen, _close_file=None):
+    def __init__(self, file: Union[str, BinaryIO], opener=xopen, _close_file: Optional[bool] = None):
         """
         The file is a path or a file-like object. In both cases, the file may
         be compressed (.gz, .bz2, .xz).
         """
         if isinstance(file, str):
-            file = opener(file, self.mode)
+            self._file = opener(file, self.mode)
             self._close_on_exit = True
         elif _close_file:
             self._close_on_exit = True
-        self._file = file
+            self._file = file
+        else:
+            self._file = file
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({!r})".format(self.__class__.__name__, getattr(self._file, "name", self._file))
 
-    def close(self):
+    def close(self) -> None:
         if self._close_on_exit and self._file is not None:
             self._file.close()
             self._file = None
@@ -52,7 +56,14 @@ class FastaReader(BinaryFileReader):
     Reader for FASTA files.
     """
 
-    def __init__(self, file, keep_linebreaks=False, sequence_class=Sequence, opener=xopen, _close_file=None):
+    def __init__(
+        self,
+        file: Union[str, BinaryIO],
+        keep_linebreaks: bool = False,
+        sequence_class=Sequence,
+        opener=xopen,
+        _close_file: Optional[bool] = None,
+    ):
         """
         file is a path or a file-like object. In both cases, the file may
         be compressed (.gz, .bz2, .xz).
@@ -64,12 +75,12 @@ class FastaReader(BinaryFileReader):
         self.delivers_qualities = False
         self._delimiter = '\n' if keep_linebreaks else ''
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Sequence]:
         """
         Read next entry from the file (single entry at a time).
         """
         name = None
-        seq = []
+        seq: List[str] = []
         f = io.TextIOWrapper(self._file)
         for i, line in enumerate(f):
             # strip() also removes DOS line breaks
@@ -101,7 +112,14 @@ class FastqReader(BinaryFileReader):
     Reader for FASTQ files. Does not support multi-line FASTQ files.
     """
 
-    def __init__(self, file, sequence_class=Sequence, buffer_size=1048576, opener=xopen, _close_file=None):
+    def __init__(
+        self,
+        file: Union[str, BinaryIO],
+        sequence_class=Sequence,
+        buffer_size: int = 1048576,
+        opener=xopen,
+        _close_file: Optional[bool] = None,
+    ):
         """
         file is a filename or a file-like object.
         If file is a filename, then .gz files are supported.
@@ -114,8 +132,9 @@ class FastqReader(BinaryFileReader):
         # whether the file has repeated headers
         self._iter = _fastq_iter(self._file, self.sequence_class, self.buffer_size)
         try:
-            self.two_headers = next(self._iter)
-            assert self.two_headers in (True, False)
+            th = next(self._iter)
+            assert isinstance(th, bool)
+            self.two_headers: bool = th
         except StopIteration:
             # Empty file
             self.two_headers = False
@@ -124,5 +143,5 @@ class FastqReader(BinaryFileReader):
             self.close()
             raise
 
-    def __iter__(self):
+    def __iter__(self) -> Iterator[Sequence]:
         return self._iter


=====================================
src/dnaio/writers.py
=====================================
@@ -1,21 +1,30 @@
+from os import PathLike
+from typing import Union, BinaryIO, Optional
+
 from xopen import xopen
 
+from . import Sequence
 from ._util import _is_path
 
 
 class FileWriter:
-    def __init__(self, file, opener=xopen, _close_file=None):
-        self._file = file
+    def __init__(
+        self,
+        file: Union[PathLike, str, BinaryIO],
+        opener=xopen,
+        _close_file: Optional[bool] = None,
+    ):
         if _is_path(file):
             self._file = opener(file, "wb")
             self._close_on_exit = True
         else:
+            self._file = file
             self._close_on_exit = bool(_close_file)
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "{}({!r})".format(self.__class__.__name__, getattr(self._file, "name", self._file))
 
-    def close(self):
+    def close(self) -> None:
         if self._close_on_exit:
             self._file.close()
 
@@ -33,7 +42,13 @@ class FastaWriter(FileWriter):
     Write FASTA-formatted sequences to a file.
     """
 
-    def __init__(self, file, line_length=None, opener=xopen, _close_file=None):
+    def __init__(
+        self,
+        file: Union[PathLike, str, BinaryIO],
+        line_length: Optional[int] = None,
+        opener=xopen,
+        _close_file: Optional[bool] = None,
+    ):
         """
         If line_length is not None, the lines will
         be wrapped after line_length characters.
@@ -41,10 +56,10 @@ class FastaWriter(FileWriter):
         super().__init__(file, opener=opener, _close_file=_close_file)
         self.line_length = line_length if line_length != 0 else None
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "FastaWriter('{}')".format(getattr(self._file, "name", self._file))
 
-    def write(self, name_or_record, sequence=None):
+    def write(self, name_or_record, sequence: Optional[str] = None):
         """Write an entry to the the FASTA file.
 
         If only one parameter (name_or_record) is given, it must have
@@ -70,8 +85,8 @@ class FastaWriter(FileWriter):
                 s.append(sequence[i:i + self.line_length] + '\n')
             self._file.write(''.join(s).encode('ascii'))
         else:
-            s = '>' + name + '\n' + sequence + '\n'
-            self._file.write(s.encode('ascii'))
+            text = '>' + name + '\n' + sequence + '\n'
+            self._file.write(text.encode('ascii'))
 
 
 class FastqWriter(FileWriter):
@@ -86,28 +101,34 @@ class FastqWriter(FileWriter):
     """
     file_mode = 'wb'
 
-    def __init__(self, file, two_headers=False, opener=xopen, _close_file=None):
+    def __init__(
+        self,
+        file: Union[PathLike, str, BinaryIO],
+        two_headers: bool = False,
+        opener=xopen,
+        _close_file: Optional[bool] = None,
+    ):
         super().__init__(file, opener=opener, _close_file=_close_file)
         self._two_headers = two_headers
         self.write = self._write_two_headers if self._two_headers else self._write
 
-    def __repr__(self):
+    def __repr__(self) -> str:
         return "FastqWriter('{}')".format(getattr(self._file, "name", self._file))
 
-    def _write(self, record):
+    def _write(self, record: Sequence) -> None:
         """
         Write a Sequence record to the FASTQ file.
 
         """
         self._file.write(record.fastq_bytes())
 
-    def _write_two_headers(self, record):
+    def _write_two_headers(self, record: Sequence) -> None:
         """
         Write a Sequence record to the FASTQ file, repeating the header
         in the third line after the "+" .
         """
         self._file.write(record.fastq_bytes_two_headers())
 
-    def writeseq(self, name, sequence, qualities):
+    def writeseq(self, name: str, sequence: str, qualities: str) -> None:
         self._file.write("@{0:s}\n{1:s}\n+\n{2:s}\n".format(
             name, sequence, qualities).encode('ascii'))


=====================================
tests/test_internal.py
=====================================
@@ -15,7 +15,7 @@ from dnaio import (
     FastaWriter, FastqWriter, InterleavedSequenceWriter,
     PairedSequenceReader,
 )
-from dnaio import _record_names_match, Sequence
+from dnaio import record_names_match, Sequence
 from dnaio.writers import FileWriter
 from dnaio.readers import BinaryFileReader
 
@@ -468,7 +468,7 @@ class TestPairedSequenceReader:
             ] == list(psr)
 
     def test_record_names_match(self):
-        match = _record_names_match
+        match = record_names_match
         assert match('abc', 'abc')
         assert match('abc/1', 'abc/2')
         assert match('abc.1', 'abc.2')


=====================================
tox.ini
=====================================
@@ -1,5 +1,6 @@
 [tox]
-envlist = flake8,py35,py36,py37,py38,py39
+envlist = flake8,mypy,py36,py37,py38,py39
+requires = Cython>=0.29.13
 
 [testenv]
 deps =
@@ -16,6 +17,11 @@ basepython = python3.6
 deps = flake8
 commands = flake8 src/ tests/
 
+[testenv:mypy]
+basepython = python3.6
+deps = mypy
+commands = mypy src/
+
 [coverage:run]
 branch = True
 parallel = True



View it on GitLab: https://salsa.debian.org/med-team/python-dnaio/-/commit/1ebf37f8dd8f850a5aef3a4e0cc8407ffaa4e18f

-- 
View it on GitLab: https://salsa.debian.org/med-team/python-dnaio/-/commit/1ebf37f8dd8f850a5aef3a4e0cc8407ffaa4e18f
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20201220/b77289cb/attachment-0001.html>


More information about the debian-med-commit mailing list