[med-svn] [Git][med-team/python-dnaio][upstream] New upstream version 0.5.0
Nilesh Patra
gitlab at salsa.debian.org
Sun Dec 20 12:22:26 GMT 2020
Nilesh Patra pushed to branch upstream at Debian Med / python-dnaio
Commits:
1ebf37f8 by Nilesh Patra at 2020-12-20T17:49:29+05:30
New upstream version 0.5.0
- - - - -
14 changed files:
- + .github/workflows/ci.yml
- − .travis.yml
- README.md
- buildwheels.sh
- setup.py
- src/dnaio/__init__.py
- src/dnaio/_core.pyi
- src/dnaio/_util.py
- src/dnaio/chunks.py
- src/dnaio/exceptions.py
- src/dnaio/readers.py
- src/dnaio/writers.py
- tests/test_internal.py
- tox.ini
Changes:
=====================================
.github/workflows/ci.yml
=====================================
@@ -0,0 +1,72 @@
+name: CI
+
+on: [push, pull_request]
+
+jobs:
+ lint:
+ timeout-minutes: 5
+ runs-on: ubuntu-latest
+ strategy:
+ matrix:
+ python-version: [3.7]
+ toxenv: [flake8, mypy]
+ steps:
+ - uses: actions/checkout at v2
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python at v2
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install dependencies
+ run: python -m pip install tox
+ - name: Run tox ${{ matrix.toxenv }}
+ run: tox -e ${{ matrix.toxenv }}
+
+ test:
+ timeout-minutes: 5
+ runs-on: ${{ matrix.os }}
+ strategy:
+ matrix:
+ python-version: [3.6, 3.7, 3.8, 3.9]
+ os: [ubuntu-latest]
+ include:
+ - python-version: 3.8
+ os: macos-latest
+ steps:
+ - uses: actions/checkout at v2
+ - name: Set up Python ${{ matrix.python-version }}
+ uses: actions/setup-python at v2
+ with:
+ python-version: ${{ matrix.python-version }}
+ - name: Install dependencies
+ run: python -m pip install tox
+ - name: Test
+ run: tox -e py
+ - name: Upload coverage report
+ uses: codecov/codecov-action at v1
+
+ deploy:
+ timeout-minutes: 5
+ runs-on: ubuntu-latest
+ needs: [lint, test]
+ if: startsWith(github.ref, 'refs/tags')
+ steps:
+ - uses: actions/checkout at v2
+ with:
+ fetch-depth: 0 # required for setuptools_scm
+ - name: Set up Python
+ uses: actions/setup-python at v2
+ with:
+ python-version: 3.7
+ - name: Make distributions
+ run: |
+ python -m pip install Cython
+ python setup.py sdist
+ ./buildwheels.sh
+ ls -l dist/
+ - name: Publish to PyPI
+ uses: pypa/gh-action-pypi-publish at v1.4.1
+ with:
+ user: __token__
+ password: ${{ secrets.pypi_password }}
+ #password: ${{ secrets.test_pypi_password }}
+ #repository_url: https://test.pypi.org/legacy/
=====================================
.travis.yml deleted
=====================================
@@ -1,54 +0,0 @@
-language: python
-
-cache:
- directories:
- - $HOME/.cache/pip
-
-python:
- - "3.5"
- - "3.6"
- - "3.7"
- - "3.8"
- - "3.9"
- - "nightly"
-
-install:
- - pip install --upgrade coverage codecov
- - pip install .[dev]
-
-script:
- - coverage run -m pytest
-
-after_success:
- - coverage combine
- - codecov
-
-env:
- global:
-# - TWINE_REPOSITORY_URL=https://test.pypi.org/legacy/
- - TWINE_USERNAME=marcelm
- # TWINE_PASSWORD is set in Travis settings
-
-jobs:
- include:
- - stage: deploy
- services:
- - docker
- python: "3.6"
- install: python3 -m pip install Cython twine
- if: tag IS present
- script:
- - |
- python3 setup.py sdist
- ./buildwheels.sh
- ls -l dist/
- python3 -m twine upload dist/*
-
- - stage: test
- name: flake8
- python: "3.6"
- install: python3 -m pip install flake8
- script: flake8 src/ tests/
-
- allow_failures:
- - python: "nightly"
=====================================
README.md
=====================================
@@ -1,10 +1,10 @@
-[![Travis](https://travis-ci.org/marcelm/dnaio.svg?branch=master)](https://travis-ci.org/marcelm/dnaio)
+![CI](https://github.com/marcelm/dnaio/workflows/CI/badge.svg)
[![PyPI](https://img.shields.io/pypi/v/dnaio.svg?branch=master)](https://pypi.python.org/pypi/dnaio)
[![Codecov](https://codecov.io/gh/marcelm/dnaio/branch/master/graph/badge.svg)](https://codecov.io/gh/marcelm/dnaio)
# dnaio parses FASTQ and FASTA
-`dnaio` is a Python 3.5+ library for fast parsing of FASTQ and also FASTA files. The code was previously part of the
+`dnaio` is a Python 3.6+ library for fast parsing of FASTQ and also FASTA files. The code was previously part of the
[Cutadapt](https://cutadapt.readthedocs.io/) tool and has been improved since it has been split out.
=====================================
buildwheels.sh
=====================================
@@ -16,7 +16,7 @@ manylinux=quay.io/pypa/manylinux2010_x86_64
# For convenience, if this script is called from outside of a docker container,
# it starts a container and runs itself inside of it.
-if ! grep -q docker /proc/1/cgroup; then
+if ! grep -q docker /proc/1/cgroup && ! test -d /opt/python; then
# We are not inside a container
docker pull ${manylinux}
exec docker run --rm -v $(pwd):/io ${manylinux} /io/$0
@@ -32,7 +32,7 @@ STRIP_FLAGS=${STRIP_FLAGS:-"-Wl,-strip-all"}
export CFLAGS="${CFLAGS:-$STRIP_FLAGS}"
export CXXFLAGS="${CXXFLAGS:-$STRIP_FLAGS}"
-for PYBIN in /opt/python/cp3[5678]-*/bin; do
+for PYBIN in /opt/python/cp3[6789]-*/bin; do
${PYBIN}/pip wheel --no-deps /io/ -w wheelhouse/
done
ls wheelhouse/
=====================================
setup.py
=====================================
@@ -1,13 +1,8 @@
-import sys
import os.path
from setuptools import setup, Extension, find_packages
from distutils.command.sdist import sdist as _sdist
from distutils.command.build_ext import build_ext as _build_ext
-if sys.version_info[:2] < (3, 5):
- sys.stdout.write('Python 3.5 or later is required\n')
- sys.exit(1)
-
def no_cythonize(extensions, **_ignore):
"""Change .pyx to .c or .cpp (copied from Cython documentation)"""
@@ -75,7 +70,7 @@ setup(
ext_modules=extensions,
cmdclass={'build_ext': BuildExt, 'sdist': SDist},
install_requires=['xopen>=0.8.2'],
- python_requires='>=3.5',
+ python_requires='>=3.6',
classifiers=[
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Science/Research",
=====================================
src/dnaio/__init__.py
=====================================
@@ -18,17 +18,19 @@ __all__ = [
'PairedSequenceReader',
'read_chunks',
'read_paired_chunks',
+ 'record_names_match',
'__version__',
]
import os
+from os import fspath, PathLike
from contextlib import ExitStack
import functools
-import pathlib
+from typing import Optional, Union, BinaryIO, Tuple, Iterator
from xopen import xopen
-from ._core import Sequence, record_names_match as _record_names_match
+from ._core import Sequence, record_names_match
from .readers import FastaReader, FastqReader
from .writers import FastaWriter, FastqWriter
from .exceptions import UnknownFileFormat, FileFormatError, FastaFormatError, FastqFormatError
@@ -36,21 +38,28 @@ from .chunks import read_chunks, read_paired_chunks
from ._version import version as __version__
from ._util import _is_path
-try:
- from os import fspath # Exists in Python 3.6+
-except ImportError:
- def fspath(path):
- if hasattr(path, "__fspath__"):
- return path.__fspath__()
- # Python 3.4 and 3.5 do not support the file system path protocol
- if isinstance(path, pathlib.Path):
- return str(path)
- return path
-
def open(
- file1, *, file2=None, fileformat=None, interleaved=False, mode="r", qualities=None, opener=xopen
-):
+ file1: Union[str, PathLike, BinaryIO],
+ *,
+ file2: Optional[Union[str, PathLike, BinaryIO]] = None,
+ fileformat: Optional[str] = None,
+ interleaved: bool = False,
+ mode: str = "r",
+ qualities: Optional[bool] = None,
+ opener=xopen
+) -> Union[
+ FastaReader,
+ FastaWriter,
+ FastqReader,
+ FastqWriter,
+ "PairedSequenceReader",
+ "PairedSequenceWriter",
+ "PairedSequenceAppender",
+ "InterleavedSequenceReader",
+ "InterleavedSequenceWriter",
+ "InterleavedSequenceAppender",
+]:
"""
Open sequence files in FASTA or FASTQ format for reading or writing. This is
a factory that returns an instance of one of the ...Reader or ...Writer
@@ -109,7 +118,7 @@ def open(
file1, opener=opener, fileformat=fileformat, mode=mode, qualities=qualities)
-def _detect_format_from_name(name):
+def _detect_format_from_name(name: str) -> Optional[str]:
"""
name -- file name
@@ -128,32 +137,37 @@ def _detect_format_from_name(name):
return None
-def _open_single(file, opener, *, fileformat=None, mode="r", qualities=None):
+def _open_single(
+ file_or_path: Union[str, PathLike, BinaryIO],
+ opener,
+ *,
+ fileformat: Optional[str] = None,
+ mode: str = "r",
+ qualities: Optional[bool] = None,
+) -> Union[FastaReader, FastaWriter, FastqReader, FastqWriter]:
"""
Open a single sequence file. See description of open() above.
"""
if mode not in ("r", "w", "a"):
raise ValueError("Mode must be 'r', 'w' or 'a'")
- if _is_path(file):
- path = fspath(file)
+ path: Optional[str]
+ if _is_path(file_or_path):
+ path = fspath(file_or_path) # type: ignore
file = opener(path, mode + "b")
close_file = True
else:
- if mode == 'r' and not hasattr(file, 'readinto'):
+ if mode == 'r' and not hasattr(file_or_path, 'readinto'):
raise ValueError(
'When passing in an open file-like object, it must have been opened in binary mode')
+ file = file_or_path
if hasattr(file, "name") and isinstance(file.name, str):
path = file.name
else:
path = None
close_file = False
- if mode == 'r':
- fastq_handler = FastqReader
- fasta_handler = FastaReader
- else:
- fastq_handler = FastqWriter
- fasta_handler = FastaWriter
+ fastq_handler = FastqReader if mode == "r" else FastqWriter
+ fasta_handler = FastaReader if mode == "r" else FastaWriter
handlers = {
'fastq': functools.partial(fastq_handler, _close_file=close_file),
'fasta': functools.partial(fasta_handler, _close_file=close_file),
@@ -198,7 +212,7 @@ def _open_single(file, opener, *, fileformat=None, mode="r", qualities=None):
return handlers[fileformat](file)
-def _detect_format_from_content(file):
+def _detect_format_from_content(file: BinaryIO) -> Optional[str]:
"""
Return 'fasta', 'fastq' or None
"""
@@ -207,7 +221,7 @@ def _detect_format_from_content(file):
if file.tell() > 0:
file.seek(-1, 1)
else:
- first_char = file.peek(1)[0:1]
+ first_char = file.peek(1)[0:1] # type: ignore
formats = {
b'@': 'fastq',
b'>': 'fasta',
@@ -226,17 +240,23 @@ class PairedSequenceReader:
"""
paired = True
- def __init__(self, file1, file2, fileformat=None, opener=xopen):
+ def __init__(
+ self,
+ file1: Union[str, PathLike, BinaryIO],
+ file2: Union[str, PathLike, BinaryIO],
+ fileformat: Optional[str] = None,
+ opener=xopen,
+ ):
with ExitStack() as stack:
self.reader1 = stack.enter_context(_open_single(file1, opener=opener, fileformat=fileformat))
self.reader2 = stack.enter_context(_open_single(file2, opener=opener, fileformat=fileformat))
self._close = stack.pop_all().close
self.delivers_qualities = self.reader1.delivers_qualities
- def __repr__(self):
+ def __repr__(self) -> str:
return "PairedSequenceReader(file1={}, file2={})".format(self.reader1, self.reader2)
- def __iter__(self):
+ def __iter__(self) -> Iterator[Tuple[Sequence, Sequence]]:
"""
Iterate over the paired reads. Each item is a pair of Sequence objects.
"""
@@ -261,13 +281,13 @@ class PairedSequenceReader:
raise FileFormatError(
"Reads are improperly paired. There are more reads in "
"file 1 than in file 2.", line=None) from None
- if not _record_names_match(r1.name, r2.name):
+ if not record_names_match(r1.name, r2.name):
raise FileFormatError(
"Reads are improperly paired. Read name '{}' "
"in file 1 does not match '{}' in file 2.".format(r1.name, r2.name), line=None) from None
yield (r1, r2)
- def close(self):
+ def close(self) -> None:
self._close()
def __enter__(self):
@@ -283,14 +303,21 @@ class InterleavedSequenceReader:
"""
paired = True
- def __init__(self, file, fileformat=None, opener=xopen):
- self.reader = _open_single(file, opener=opener, fileformat=fileformat)
+ def __init__(
+ self,
+ file: Union[str, PathLike, BinaryIO],
+ fileformat: Optional[str] = None,
+ opener=xopen,
+ ):
+ reader = _open_single(file, opener=opener, fileformat=fileformat)
+ assert isinstance(reader, (FastaReader, FastqReader)) # for Mypy
+ self.reader = reader
self.delivers_qualities = self.reader.delivers_qualities
- def __repr__(self):
+ def __repr__(self) -> str:
return "InterleavedSequenceReader({})".format(self.reader)
- def __iter__(self):
+ def __iter__(self) -> Iterator[Tuple[Sequence, Sequence]]:
it = iter(self.reader)
for r1 in it:
try:
@@ -299,13 +326,13 @@ class InterleavedSequenceReader:
raise FileFormatError(
"Interleaved input file incomplete: Last record "
"{!r} has no partner.".format(r1.name), line=None) from None
- if not _record_names_match(r1.name, r2.name):
+ if not record_names_match(r1.name, r2.name):
raise FileFormatError(
"Reads are improperly paired. Name {!r} "
"(first) does not match {!r} (second).".format(r1.name, r2.name), line=None)
yield (r1, r2)
- def close(self):
+ def close(self) -> None:
self.reader.close()
def __enter__(self):
@@ -318,8 +345,17 @@ class InterleavedSequenceReader:
class PairedSequenceWriter:
_mode = "w"
- def __init__(self, file1, file2, fileformat='fastq', qualities=None, opener=xopen):
+ def __init__(
+ self,
+ file1: Union[str, PathLike, BinaryIO],
+ file2: Union[str, PathLike, BinaryIO],
+ fileformat: Optional[str] = "fastq",
+ qualities: Optional[bool] = None,
+ opener=xopen,
+ ):
with ExitStack() as stack:
+ self._writer1: Union[FastaWriter, FastqWriter]
+ self._writer2: Union[FastaWriter, FastqWriter]
self._writer1 = stack.enter_context(
_open_single(
file1, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities))
@@ -328,14 +364,14 @@ class PairedSequenceWriter:
file2, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities))
self._close = stack.pop_all().close
- def __repr__(self):
+ def __repr__(self) -> str:
return "{}({}, {})".format(self.__class__.__name__, self._writer1, self._writer2)
- def write(self, read1, read2):
+ def write(self, read1, read2) -> None:
self._writer1.write(read1)
self._writer2.write(read2)
- def close(self):
+ def close(self) -> None:
self._close()
def __enter__(self):
@@ -356,19 +392,26 @@ class InterleavedSequenceWriter:
"""
_mode = "w"
- def __init__(self, file, fileformat='fastq', qualities=None, opener=xopen):
-
- self._writer = _open_single(
+ def __init__(
+ self,
+ file: Union[str, PathLike, BinaryIO],
+ fileformat: Optional[str] = "fastq",
+ qualities: Optional[bool] = None,
+ opener=xopen,
+ ):
+ writer = _open_single(
file, opener=opener, fileformat=fileformat, mode=self._mode, qualities=qualities)
+ assert isinstance(writer, (FastaWriter, FastqWriter)) # only for Mypy
+ self._writer = writer
- def __repr__(self):
+ def __repr__(self) -> str:
return "{}({})".format(self.__class__.__name__, self._writer)
- def write(self, read1, read2):
+ def write(self, read1: Sequence, read2: Sequence) -> None:
self._writer.write(read1)
self._writer.write(read2)
- def close(self):
+ def close(self) -> None:
self._writer.close()
def __enter__(self):
=====================================
src/dnaio/_core.pyi
=====================================
@@ -1,4 +1,5 @@
-from typing import Optional, Tuple, Union, Iterable, BinaryIO
+from typing import Optional, Tuple, Union, BinaryIO, Iterator
+
class Sequence:
name: str
@@ -14,5 +15,5 @@ class Sequence:
def paired_fastq_heads(buf1: Union[bytes,bytearray], buf2: Union[bytes,bytearray], end1: int, end2: int) -> Tuple[int, int]: ...
# TODO Sequence should be sequence_class, first yielded value is a bool
-def fastq_iter(file: BinaryIO, sequence_class, buffer_size: int) -> Iterable[Sequence]: ...
+def fastq_iter(file: BinaryIO, sequence_class, buffer_size: int) -> Iterator[Sequence]: ...
def record_names_match(header1: str, header2: str) -> bool: ...
=====================================
src/dnaio/_util.py
=====================================
@@ -1,7 +1,7 @@
import pathlib
-def _is_path(obj):
+def _is_path(obj: object) -> bool:
"""
Return whether the given object looks like a path (str, pathlib.Path or pathlib2.Path)
"""
@@ -9,14 +9,15 @@ def _is_path(obj):
# On Python 3.6+, this function can be replaced with isinstance(obj, os.PathLike)
import sys
if "pathlib2" in sys.modules:
- import pathlib2
- path_classes = (str, pathlib.Path, pathlib2.Path)
+ import pathlib2 # type: ignore
+ path_classes = [str, pathlib.Path, pathlib2.Path]
else:
- path_classes = (str, pathlib.Path)
- return isinstance(obj, path_classes)
+ path_classes = [str, pathlib.Path]
+ return isinstance(obj, tuple(path_classes))
-def shorten(s, n=100):
+def shorten(s: str, n: int = 100) -> str:
+
"""Shorten string s to at most n characters, appending "..." if necessary."""
if s is None:
return None
=====================================
src/dnaio/chunks.py
=====================================
@@ -1,10 +1,12 @@
"""Chunked reading of FASTA and FASTQ files"""
+from io import RawIOBase
+from typing import Optional, Iterator, Tuple
from ._core import paired_fastq_heads as _paired_fastq_heads
from .exceptions import FileFormatError, FastaFormatError, UnknownFileFormat
-def _fasta_head(buf, end):
+def _fasta_head(buf: bytes, end: Optional[int] = None) -> int:
"""
Search for the end of the last complete FASTA record within buf[:end]
@@ -19,7 +21,7 @@ def _fasta_head(buf, end):
raise FastaFormatError('File does not start with ">"', line=None)
-def _fastq_head(buf, end=None):
+def _fastq_head(buf: bytes, end: Optional[int] = None) -> int:
"""
Search for the end of the last complete *two* FASTQ records in buf[:end].
@@ -33,10 +35,10 @@ def _fastq_head(buf, end=None):
# Note that this works even if linebreaks == 0:
# rfind() returns -1 and adding 1 gives index 0,
# which is correct.
- return right + 1
+ return right + 1 # type: ignore
-def read_chunks(f, buffer_size=4*1024**2):
+def read_chunks(f: RawIOBase, buffer_size: int = 4 * 1024**2) -> Iterator[memoryview]:
"""
Read a chunk of complete FASTA or FASTQ records from a file.
The size of a chunk is at most buffer_size.
@@ -76,7 +78,7 @@ def read_chunks(f, buffer_size=4*1024**2):
while True:
if start == len(buf):
raise OverflowError('FASTA/FASTQ record does not fit into buffer')
- bufend = f.readinto(memoryview(buf)[start:]) + start
+ bufend = f.readinto(memoryview(buf)[start:]) + start # type: ignore
if start == bufend:
# End of file
break
@@ -92,7 +94,11 @@ def read_chunks(f, buffer_size=4*1024**2):
yield memoryview(buf)[0:start]
-def read_paired_chunks(f, f2, buffer_size=4*1024**2):
+def read_paired_chunks(
+ f: RawIOBase,
+ f2: RawIOBase,
+ buffer_size: int = 4 * 1024**2,
+) -> Iterator[Tuple[memoryview, memoryview]]:
if buffer_size < 1:
raise ValueError("Buffer size too small")
@@ -100,8 +106,8 @@ def read_paired_chunks(f, f2, buffer_size=4*1024**2):
buf2 = bytearray(buffer_size)
# Read one byte to make sure we are processing FASTQ
- start1 = f.readinto(memoryview(buf1)[0:1])
- start2 = f2.readinto(memoryview(buf2)[0:1])
+ start1 = f.readinto(memoryview(buf1)[0:1]) # type: ignore
+ start2 = f2.readinto(memoryview(buf2)[0:1]) # type: ignore
if (start1 == 1 and buf1[0:1] != b'@') or (start2 == 1 and buf2[0:1] != b'@'):
raise FileFormatError(
"Paired-end data must be in FASTQ format when using multiple cores", line=None)
@@ -109,8 +115,8 @@ def read_paired_chunks(f, f2, buffer_size=4*1024**2):
while True:
if start1 == len(buf1) or start2 == len(buf2):
raise ValueError("FASTQ record does not fit into buffer")
- bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1
- bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2
+ bufend1 = f.readinto(memoryview(buf1)[start1:]) + start1 # type: ignore
+ bufend2 = f2.readinto(memoryview(buf2)[start2:]) + start2 # type: ignore
if start1 == bufend1 and start2 == bufend2:
break
=====================================
src/dnaio/exceptions.py
=====================================
@@ -1,10 +1,13 @@
+from typing import Optional
+
+
class FileFormatError(Exception):
"""
The file is not formatted correctly
"""
format = 'sequence' # Something generic that works for both FASTA and FASTQ
- def __init__(self, msg, line):
+ def __init__(self, msg: str, line: Optional[int]):
super().__init__(msg, line)
self.message = msg
self.line = line # starts at 0!
=====================================
src/dnaio/readers.py
=====================================
@@ -4,6 +4,8 @@ Classes for reading FASTA and FASTQ files
__all__ = ['FastaReader', 'FastqReader']
import io
+from typing import Union, BinaryIO, Optional, Iterator, List
+
from xopen import xopen
from ._core import fastq_iter as _fastq_iter, Sequence
from ._util import shorten as _shorten
@@ -15,25 +17,27 @@ class BinaryFileReader:
A mixin for readers that ensures that a file or a path can be passed in to the constructor.
"""
_close_on_exit = False
- paired = False
- mode = 'rb'
+ paired: bool = False
+ mode: str = 'rb'
- def __init__(self, file, opener=xopen, _close_file=None):
+ def __init__(self, file: Union[str, BinaryIO], opener=xopen, _close_file: Optional[bool] = None):
"""
The file is a path or a file-like object. In both cases, the file may
be compressed (.gz, .bz2, .xz).
"""
if isinstance(file, str):
- file = opener(file, self.mode)
+ self._file = opener(file, self.mode)
self._close_on_exit = True
elif _close_file:
self._close_on_exit = True
- self._file = file
+ self._file = file
+ else:
+ self._file = file
- def __repr__(self):
+ def __repr__(self) -> str:
return "{}({!r})".format(self.__class__.__name__, getattr(self._file, "name", self._file))
- def close(self):
+ def close(self) -> None:
if self._close_on_exit and self._file is not None:
self._file.close()
self._file = None
@@ -52,7 +56,14 @@ class FastaReader(BinaryFileReader):
Reader for FASTA files.
"""
- def __init__(self, file, keep_linebreaks=False, sequence_class=Sequence, opener=xopen, _close_file=None):
+ def __init__(
+ self,
+ file: Union[str, BinaryIO],
+ keep_linebreaks: bool = False,
+ sequence_class=Sequence,
+ opener=xopen,
+ _close_file: Optional[bool] = None,
+ ):
"""
file is a path or a file-like object. In both cases, the file may
be compressed (.gz, .bz2, .xz).
@@ -64,12 +75,12 @@ class FastaReader(BinaryFileReader):
self.delivers_qualities = False
self._delimiter = '\n' if keep_linebreaks else ''
- def __iter__(self):
+ def __iter__(self) -> Iterator[Sequence]:
"""
Read next entry from the file (single entry at a time).
"""
name = None
- seq = []
+ seq: List[str] = []
f = io.TextIOWrapper(self._file)
for i, line in enumerate(f):
# strip() also removes DOS line breaks
@@ -101,7 +112,14 @@ class FastqReader(BinaryFileReader):
Reader for FASTQ files. Does not support multi-line FASTQ files.
"""
- def __init__(self, file, sequence_class=Sequence, buffer_size=1048576, opener=xopen, _close_file=None):
+ def __init__(
+ self,
+ file: Union[str, BinaryIO],
+ sequence_class=Sequence,
+ buffer_size: int = 1048576,
+ opener=xopen,
+ _close_file: Optional[bool] = None,
+ ):
"""
file is a filename or a file-like object.
If file is a filename, then .gz files are supported.
@@ -114,8 +132,9 @@ class FastqReader(BinaryFileReader):
# whether the file has repeated headers
self._iter = _fastq_iter(self._file, self.sequence_class, self.buffer_size)
try:
- self.two_headers = next(self._iter)
- assert self.two_headers in (True, False)
+ th = next(self._iter)
+ assert isinstance(th, bool)
+ self.two_headers: bool = th
except StopIteration:
# Empty file
self.two_headers = False
@@ -124,5 +143,5 @@ class FastqReader(BinaryFileReader):
self.close()
raise
- def __iter__(self):
+ def __iter__(self) -> Iterator[Sequence]:
return self._iter
=====================================
src/dnaio/writers.py
=====================================
@@ -1,21 +1,30 @@
+from os import PathLike
+from typing import Union, BinaryIO, Optional
+
from xopen import xopen
+from . import Sequence
from ._util import _is_path
class FileWriter:
- def __init__(self, file, opener=xopen, _close_file=None):
- self._file = file
+ def __init__(
+ self,
+ file: Union[PathLike, str, BinaryIO],
+ opener=xopen,
+ _close_file: Optional[bool] = None,
+ ):
if _is_path(file):
self._file = opener(file, "wb")
self._close_on_exit = True
else:
+ self._file = file
self._close_on_exit = bool(_close_file)
- def __repr__(self):
+ def __repr__(self) -> str:
return "{}({!r})".format(self.__class__.__name__, getattr(self._file, "name", self._file))
- def close(self):
+ def close(self) -> None:
if self._close_on_exit:
self._file.close()
@@ -33,7 +42,13 @@ class FastaWriter(FileWriter):
Write FASTA-formatted sequences to a file.
"""
- def __init__(self, file, line_length=None, opener=xopen, _close_file=None):
+ def __init__(
+ self,
+ file: Union[PathLike, str, BinaryIO],
+ line_length: Optional[int] = None,
+ opener=xopen,
+ _close_file: Optional[bool] = None,
+ ):
"""
If line_length is not None, the lines will
be wrapped after line_length characters.
@@ -41,10 +56,10 @@ class FastaWriter(FileWriter):
super().__init__(file, opener=opener, _close_file=_close_file)
self.line_length = line_length if line_length != 0 else None
- def __repr__(self):
+ def __repr__(self) -> str:
return "FastaWriter('{}')".format(getattr(self._file, "name", self._file))
- def write(self, name_or_record, sequence=None):
+ def write(self, name_or_record, sequence: Optional[str] = None):
"""Write an entry to the the FASTA file.
If only one parameter (name_or_record) is given, it must have
@@ -70,8 +85,8 @@ class FastaWriter(FileWriter):
s.append(sequence[i:i + self.line_length] + '\n')
self._file.write(''.join(s).encode('ascii'))
else:
- s = '>' + name + '\n' + sequence + '\n'
- self._file.write(s.encode('ascii'))
+ text = '>' + name + '\n' + sequence + '\n'
+ self._file.write(text.encode('ascii'))
class FastqWriter(FileWriter):
@@ -86,28 +101,34 @@ class FastqWriter(FileWriter):
"""
file_mode = 'wb'
- def __init__(self, file, two_headers=False, opener=xopen, _close_file=None):
+ def __init__(
+ self,
+ file: Union[PathLike, str, BinaryIO],
+ two_headers: bool = False,
+ opener=xopen,
+ _close_file: Optional[bool] = None,
+ ):
super().__init__(file, opener=opener, _close_file=_close_file)
self._two_headers = two_headers
self.write = self._write_two_headers if self._two_headers else self._write
- def __repr__(self):
+ def __repr__(self) -> str:
return "FastqWriter('{}')".format(getattr(self._file, "name", self._file))
- def _write(self, record):
+ def _write(self, record: Sequence) -> None:
"""
Write a Sequence record to the FASTQ file.
"""
self._file.write(record.fastq_bytes())
- def _write_two_headers(self, record):
+ def _write_two_headers(self, record: Sequence) -> None:
"""
Write a Sequence record to the FASTQ file, repeating the header
in the third line after the "+" .
"""
self._file.write(record.fastq_bytes_two_headers())
- def writeseq(self, name, sequence, qualities):
+ def writeseq(self, name: str, sequence: str, qualities: str) -> None:
self._file.write("@{0:s}\n{1:s}\n+\n{2:s}\n".format(
name, sequence, qualities).encode('ascii'))
=====================================
tests/test_internal.py
=====================================
@@ -15,7 +15,7 @@ from dnaio import (
FastaWriter, FastqWriter, InterleavedSequenceWriter,
PairedSequenceReader,
)
-from dnaio import _record_names_match, Sequence
+from dnaio import record_names_match, Sequence
from dnaio.writers import FileWriter
from dnaio.readers import BinaryFileReader
@@ -468,7 +468,7 @@ class TestPairedSequenceReader:
] == list(psr)
def test_record_names_match(self):
- match = _record_names_match
+ match = record_names_match
assert match('abc', 'abc')
assert match('abc/1', 'abc/2')
assert match('abc.1', 'abc.2')
=====================================
tox.ini
=====================================
@@ -1,5 +1,6 @@
[tox]
-envlist = flake8,py35,py36,py37,py38,py39
+envlist = flake8,mypy,py36,py37,py38,py39
+requires = Cython>=0.29.13
[testenv]
deps =
@@ -16,6 +17,11 @@ basepython = python3.6
deps = flake8
commands = flake8 src/ tests/
+[testenv:mypy]
+basepython = python3.6
+deps = mypy
+commands = mypy src/
+
[coverage:run]
branch = True
parallel = True
View it on GitLab: https://salsa.debian.org/med-team/python-dnaio/-/commit/1ebf37f8dd8f850a5aef3a4e0cc8407ffaa4e18f
--
View it on GitLab: https://salsa.debian.org/med-team/python-dnaio/-/commit/1ebf37f8dd8f850a5aef3a4e0cc8407ffaa4e18f
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20201220/b77289cb/attachment-0001.html>
More information about the debian-med-commit
mailing list