[med-svn] [Git][med-team/python-cutadapt][upstream] New upstream version 2.7

Steffen Möller gitlab at salsa.debian.org
Mon Nov 25 13:20:38 GMT 2019



Steffen Möller pushed to branch upstream at Debian Med / python-cutadapt


Commits:
31e5476e by Steffen Moeller at 2019-11-25T13:17:52Z
New upstream version 2.7
- - - - -


14 changed files:

- .travis.yml
- CHANGES.rst
- buildwheels.sh
- doc/conf.py
- doc/guide.rst
- doc/installation.rst
- setup.py
- src/cutadapt/__main__.py
- src/cutadapt/adapters.py
- src/cutadapt/filters.py
- src/cutadapt/parser.py
- src/cutadapt/pipeline.py
- src/cutadapt/utils.py
- tests/test_commandline.py


Changes:

=====================================
.travis.yml
=====================================
@@ -10,7 +10,7 @@ python:
   - "3.5"
   - "3.6"
   - "3.7"
-  - "3.8-dev"
+  - "3.8"
   - "nightly"
 
 install:


=====================================
CHANGES.rst
=====================================
@@ -2,6 +2,18 @@
 Changes
 =======
 
+v2.7 (2019-11-22)
+-----------------
+
+* :issue:`427`: Multicore is now supported even when using ``--info-file``,
+  ``--rest-file`` or ``--wildcard-file``. The only remaining feature that
+  still does not work with multicore is now demultiplexing.
+* :issue:`290`: When running on a single core, Cutadapt no longer spawns
+  external ``pigz`` processes for writing gzip-compressed files. This is a first
+  step towards ensuring that using ``--cores=n`` uses only at most *n* CPU
+  cores.
+* This release adds support for Python 3.8.
+
 v2.6 (2019-10-26)
 -----------------
 


=====================================
buildwheels.sh
=====================================
@@ -1,28 +1,28 @@
 #!/bin/bash
 #
-# Build manylinux1 wheels for cutadapt. Based on the example at
+# Build manylinux wheels. Based on the example at
 # <https://github.com/pypa/python-manylinux-demo>
 #
 # It is best to run this in a fresh clone of the repository!
 #
 # Run this within the repository root:
-#   docker run --rm -v $(pwd):/io quay.io/pypa/manylinux1_x86_64 /io/buildwheels.sh
+#   ./buildwheels.sh
 #
 # The wheels will be put into the wheelhouse/ subdirectory.
 #
 # For interactive tests:
-#   docker run -it -v $(pwd):/io quay.io/pypa/manylinux1_x86_64 /bin/bash
+#   docker run -it -v $(pwd):/io quay.io/pypa/manylinux2010_x86_64 /bin/bash
 
 set -xeuo pipefail
 
-MANYLINUX=quay.io/pypa/manylinux2010_x86_64
+manylinux=quay.io/pypa/manylinux2010_x86_64
 
 # For convenience, if this script is called from outside of a docker container,
 # it starts a container and runs itself inside of it.
 if ! grep -q docker /proc/1/cgroup; then
   # We are not inside a container
-  docker pull ${MANYLINUX}
-  exec docker run --rm -v $(pwd):/io ${MANYLINUX} /io/$0
+  docker pull ${manylinux}
+  exec docker run --rm -v $(pwd):/io ${manylinux} /io/$0
 fi
 
 # Strip binaries (copied from multibuild)
@@ -37,11 +37,12 @@ PYBINS="/opt/python/*/bin"
 HAS_CYTHON=0
 for PYBIN in ${PYBINS}; do
 #    ${PYBIN}/pip install -r /io/requirements.txt
-    ${PYBIN}/pip wheel /io/ -w wheelhouse/
+    ${PYBIN}/pip wheel --no-deps /io/ -w wheelhouse/
 done
+ls wheelhouse/
 
 # Bundle external shared libraries into the wheels
-for whl in wheelhouse/cutadapt-*.whl; do
+for whl in wheelhouse/*.whl; do
     auditwheel repair "$whl" --plat manylinux1_x86_64 -w repaired/
 done
 


=====================================
doc/conf.py
=====================================
@@ -46,7 +46,7 @@ source_suffix = '.rst'
 master_doc = 'index'
 
 # General information about the project.
-project = u'cutadapt'
+project = u'Cutadapt'
 copyright = u'2010-2019, Marcel Martin'
 
 # The version info for the project you're documenting, acts as replacement for


=====================================
doc/guide.rst
=====================================
@@ -129,22 +129,9 @@ Make also sure that you have ``pigz`` (parallel gzip) installed if you use
 multiple cores and write to a ``.gz`` output file. Otherwise, compression of
 the output will be done in a single thread and therefore be a bottleneck.
 
-There are some limitations at the moment:
-
-* The following command-line arguments are not compatible with
-  multi-core:
-
-      - ``--info-file``
-      - ``--rest-file``
-      - ``--wildcard-file``
-      - ``--format``
-
-* Multi-core is not available when you use Cutadapt for demultiplexing.
-
-If you try to use multiple cores with an incompatible commandline option, you
-will get an error message.
-
-Some of these limitations will be lifted in the future, as time allows.
+Currently, multi-core support is not available when demultiplexing. You
+will get an error message if you try to use it. This limitations may be
+lifted in the future.
 
 .. versionadded:: 1.15
 
@@ -154,6 +141,9 @@ Some of these limitations will be lifted in the future, as time allows.
 .. versionadded:: 2.5
     Multicore works with ``--untrimmed/too-short/too-long-(paired)-output``
 
+.. versionadded:: 2.7
+    Muticore works with ``--info-file``, ``--rest-file``, ``--wildcard-file``
+
 
 Speed-up tricks
 ---------------
@@ -1037,7 +1027,7 @@ each read. Steps not requested on the command-line are skipped.
 Filtering reads
 ===============
 
-By default, all processed reads, no matter whether they were trimmed are not,
+By default, all processed reads, no matter whether they were trimmed or not,
 are written to the output file specified by the ``-o`` option (or to standard
 output if ``-o`` was not provided). For paired-end reads, the second read in a
 pair is always written to the file specified by the ``-p`` option.
@@ -1047,23 +1037,22 @@ them entirely or by redirecting them to other files. When redirecting reads,
 the basic rule is that *each read is written to at most one file*. You cannot
 write reads to more than one output file.
 
-In the following, the term "processed read" refers to a read to which all
-modifications have been applied (adapter removal, quality trimming etc.). A
-processed read can be identical to the input read if no modifications were done.
-
+Filters are applied to *all* processed reads, no matter whether they have been
+modified by adapter- or quality trimming.
 
 ``--minimum-length LENGTH`` or ``-m LENGTH``
-    Discard processed reads that are shorter than LENGTH. Reads that are too
-    short even before adapter removal are also discarded. Without this option,
-    reads that have a length of zero (empty reads) are kept in the output.
+    Discard processed reads that are shorter than LENGTH.
+
+    If you do not use this option, reads that have a length of zero (empty
+    reads) are kept in the output. Some downstream tools may have problems
+    with zero-length sequences. In that case, specify at least ``-m 1``.
 
 ``--too-short-output FILE``
     Instead of discarding the reads that are too short according to ``-m``,
     write them to *FILE* (in FASTA/FASTQ format).
 
 ``--maximum-length LENGTH`` or ``-M LENGTH``
-    Discard processed reads that are longer than LENGTH. Reads that are too
-    long even before adapter removal are also discarded.
+    Discard processed reads that are longer than LENGTH.
 
 ``--too-long-output FILE``
     Instead of discarding reads that are too long (according to ``-M``),
@@ -1074,11 +1063,11 @@ processed read can be identical to the input read if no modifications were done.
     of writing them to the regular output file.
 
 ``--discard-trimmed``
-   Discard reads in which an adapter was found.
+    Discard reads in which an adapter was found.
 
 ``--discard-untrimmed``
-   Discard reads in which *no* adapter was found. This has the same effect as
-   specifying ``--untrimmed-output /dev/null``.
+    Discard reads in which *no* adapter was found. This has the same effect as
+    specifying ``--untrimmed-output /dev/null``.
 
 The options ``--too-short-output`` and ``--too-long-output`` are applied first.
 This means, for example, that a read that is too long will never end up in the
@@ -1089,11 +1078,11 @@ The options ``--untrimmed-output``, ``--discard-trimmed`` and ``-discard-untrimm
 are mutually exclusive.
 
 The following filtering options do not have a corresponding option for redirecting
-reads. They always discard reads for which the filtering criterion applies.
+reads. They always discard those reads for which the filtering criterion applies.
 
 ``--max-n COUNT_or_FRACTION``
-    Discard reads with more than COUNT ``N`` bases. If ``COUNT_or_FRACTION`` is an
-    number between 0 and 1, it is interpreted as a fraction of the read length
+    Discard reads with more than COUNT ``N`` bases. If ``COUNT_or_FRACTION`` is
+    a number between 0 and 1, it is interpreted as a fraction of the read length
 
 ``--discard-casava``
     Discard reads that did not pass CASAVA filtering. Illumina’s CASAVA pipeline in


=====================================
doc/installation.rst
=====================================
@@ -38,6 +38,24 @@ Then install Cutadapt like this::
 If neither ``pip`` nor ``conda`` installation works, keep reading.
 
 
+Installation on a Debian-based Linux distribution
+-------------------------------------------------
+
+Cutadapt is also included in Debian-based Linux distributions, such as Ubuntu.
+Simply use your favorite package manager to install Cutadapt. On the
+command-line, this should work ::
+
+    sudo apt install cutadapt
+
+or possibly ::
+
+    sudo apt install python3-cutadapt
+
+Please be aware that this will likely give you an old version of Cutadapt. If
+you encounter unexpected behavior, please use one of the other installation
+methods to get an up-to-date version before reporting bugs.
+
+
 .. _dependencies:
 
 Dependencies


=====================================
setup.py
=====================================
@@ -103,11 +103,11 @@ setup(
     packages=find_packages('src'),
     entry_points={'console_scripts': ['cutadapt = cutadapt.__main__:main']},
     install_requires=[
-        'dnaio~=0.4.0',
+        'dnaio~=0.4.1',
         'xopen~=0.8.4',
     ],
     extras_require={
-        'dev': ['Cython', 'pytest', 'pytest-timeout', 'sphinx', 'sphinx_issues'],
+        'dev': ['Cython', 'pytest', 'pytest-timeout', 'pytest-mock', 'sphinx', 'sphinx_issues'],
     },
     python_requires='>=3.5',
     classifiers=[


=====================================
src/cutadapt/__main__.py
=====================================
@@ -60,7 +60,6 @@ import logging
 import platform
 from argparse import ArgumentParser, SUPPRESS, HelpFormatter
 
-from xopen import xopen
 import dnaio
 
 from cutadapt import __version__
@@ -72,7 +71,7 @@ from cutadapt.modifiers import (LengthTagModifier, SuffixRemover, PrefixSuffixAd
 from cutadapt.report import full_report, minimal_report
 from cutadapt.pipeline import (SingleEndPipeline, PairedEndPipeline, InputFiles, OutputFiles,
     SerialPipelineRunner, ParallelPipelineRunner)
-from cutadapt.utils import available_cpu_count, Progress, DummyProgress
+from cutadapt.utils import available_cpu_count, Progress, DummyProgress, FileOpener
 from cutadapt.log import setup_logging, REPORT
 
 logger = logging.getLogger()
@@ -185,7 +184,7 @@ def get_argument_parser():
             "mask: replace with 'N' characters; "
             "lowercase: convert to lowercase; "
             "none: leave unchanged (useful with "
-            "--discard-untrimmed). Default: trim")
+            "--discard-untrimmed). Default: %(default)s")
     group.add_argument("--no-trim", dest='action', action='store_const', const='none',
         help=SUPPRESS)  # Deprecated, use --action=none
     group.add_argument("--mask-adapter", dest='action', action='store_const', const='mask',
@@ -381,25 +380,24 @@ def parse_lengths(s):
     return tuple(values)
 
 
-def open_output_files(args, default_outfile, interleaved):
+def open_output_files(args, default_outfile, interleaved, file_opener):
     """
     Return an OutputFiles instance. If demultiplex is True, the untrimmed, untrimmed2, out and out2
     attributes are not opened files, but paths (out and out2 with the '{name}' template).
     """
-    compression_level = args.compression_level
 
     def open1(path):
         """Return opened file (or None if path is None)"""
         if path is None:
             return None
-        return xopen(path, "w", compresslevel=compression_level)
+        return file_opener.xopen(path, "wb")
 
     def open2(path1, path2):
         file1 = file2 = None
         if path1 is not None:
-            file1 = xopen(path1, 'wb', compresslevel=compression_level)
+            file1 = file_opener.xopen(path1, "wb")
             if path2 is not None:
-                file2 = xopen(path2, 'wb', compresslevel=compression_level)
+                file2 = file_opener.xopen(path2, "wb")
         return file1, file2
 
     rest_file = open1(args.rest_file)
@@ -599,7 +597,7 @@ def check_arguments(args, paired, is_interleaved_output):
         raise CommandLineError("--pair-adapters cannot be used with --times")
 
 
-def pipeline_from_parsed_args(args, paired, is_interleaved_output):
+def pipeline_from_parsed_args(args, paired, is_interleaved_output, file_opener):
     """
     Setup a processing pipeline from parsed command-line arguments.
 
@@ -632,9 +630,9 @@ def pipeline_from_parsed_args(args, paired, is_interleaved_output):
     # Create the processing pipeline
     if paired:
         pair_filter_mode = 'any' if args.pair_filter is None else args.pair_filter
-        pipeline = PairedEndPipeline(pair_filter_mode)
+        pipeline = PairedEndPipeline(pair_filter_mode, file_opener)
     else:
-        pipeline = SingleEndPipeline()
+        pipeline = SingleEndPipeline(file_opener)
 
     # When adapters are being trimmed only in R1 or R2, override the pair filter mode
     # as using the default of 'any' would regard all read pairs as untrimmed.
@@ -780,30 +778,30 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
 
     # Print the header now because some of the functions below create logging output
     log_header(cmdlineargs)
+    if args.cores < 0:
+        parser.error('Value for --cores cannot be negative')
+
+    cores = available_cpu_count() if args.cores == 0 else args.cores
+    file_opener = FileOpener(
+        compression_level=args.compression_level, threads=0 if cores == 1 else None)
     try:
         is_interleaved_input, is_interleaved_output = determine_interleaved(args)
         input_filename, input_paired_filename = input_files_from_parsed_args(args.inputs,
             paired, is_interleaved_input)
-        pipeline = pipeline_from_parsed_args(args, paired, is_interleaved_output)
-        outfiles = open_output_files(args, default_outfile, is_interleaved_output)
+        pipeline = pipeline_from_parsed_args(args, paired, is_interleaved_output, file_opener)
+        outfiles = open_output_files(args, default_outfile, is_interleaved_output, file_opener)
     except CommandLineError as e:
         parser.error(str(e))
         return  # avoid IDE warnings below
 
-    if args.cores < 0:
-        parser.error('Value for --cores cannot be negative')
-    cores = available_cpu_count() if args.cores == 0 else args.cores
     if cores > 1:
         if ParallelPipelineRunner.can_output_to(outfiles):
             runner_class = ParallelPipelineRunner
             runner_kwargs = dict(n_workers=cores, buffer_size=args.buffer_size)
         else:
-            parser.error('Running in parallel is currently not supported for '
-                'the given combination of command-line parameters.\nThese '
-                'options are not supported: --info-file, --rest-file, '
-                '--wildcard-file, --format\n'
-                'Also, demultiplexing is not supported.\n'
-                'Omit --cores/-j to continue.')
+            parser.error("Running in parallel is currently not supported "
+                "when using --format or when demultiplexing.\n"
+                "Omit --cores/-j to continue.")
             return  # avoid IDE warnings below
     else:
         runner_class = SerialPipelineRunner


=====================================
src/cutadapt/adapters.py
=====================================
@@ -73,6 +73,15 @@ class EndStatistics:
         self._remove = adapter.remove
         self.adjacent_bases = {'A': 0, 'C': 0, 'G': 0, 'T': 0, '': 0}
 
+    def __repr__(self):
+        errors = {k: dict(v) for k, v in self.errors.items()}
+        return "EndStatistics(where={}, max_error_rate={}, errors={}, adjacent_bases={})".format(
+            self.where,
+            self.max_error_rate,
+            errors,
+            self.adjacent_bases,
+        )
+
     def __iadd__(self, other):
         if not isinstance(other, self.__class__):
             raise ValueError("Cannot compare")
@@ -140,6 +149,14 @@ class AdapterStatistics:
         else:
             self.back = EndStatistics(adapter2)
 
+    def __repr__(self):
+        return "AdapterStatistics(name={}, where={}, front={}, back={})".format(
+            self.name,
+            self.where,
+            self.front,
+            self.back,
+        )
+
     def __iadd__(self, other):
         if self.where != other.where:  # TODO self.name != other.name or
             raise ValueError('incompatible objects')
@@ -297,7 +314,7 @@ class Adapter:
         self.name = _generate_adapter_name() if name is None else name
         self.sequence = sequence.upper().replace('U', 'T')
         if not self.sequence:
-            raise ValueError('Sequence is empty')
+            raise ValueError("Adapter sequence is empty")
         self.where = where
         if remove not in (None, 'prefix', 'suffix', 'auto'):
             raise ValueError('remove parameter must be "prefix", "suffix", "auto" or None')


=====================================
src/cutadapt/filters.py
=====================================
@@ -14,11 +14,6 @@ The read is then assumed to have been "consumed", that is, either written
 somewhere or filtered (should be discarded).
 """
 from abc import ABC, abstractmethod
-import errno
-
-import dnaio
-
-from .utils import raise_open_files_limit
 
 
 # Constants used when returning from a Filter’s __call__ method to improve
@@ -230,29 +225,13 @@ class CasavaFilter(SingleEndFilter):
         return right[1:4] == ':Y:'  # discard if :Y: found
 
 
-def _open_raise_limit(path, qualities):
-    """
-    Open a FASTA/FASTQ file for writing. If it fails because the number of open files
-    would be exceeded, try to raise the soft limit and re-try.
-    """
-    try:
-        f = dnaio.open(path, mode="w", qualities=qualities)
-    except OSError as e:
-        if e.errno == errno.EMFILE:  # Too many open files
-            raise_open_files_limit(8)
-            f = dnaio.open(path, mode="w", qualities=qualities)
-        else:
-            raise
-    return f
-
-
 class Demultiplexer(SingleEndFilter):
     """
     Demultiplex trimmed reads. Reads are written to different output files
     depending on which adapter matches. Files are created when the first read
     is written to them.
     """
-    def __init__(self, path_template, untrimmed_path, qualities):
+    def __init__(self, path_template, untrimmed_path, qualities, file_opener):
         """
         path_template must contain the string '{name}', which will be replaced
         with the name of the adapter to form the final output path.
@@ -267,6 +246,7 @@ class Demultiplexer(SingleEndFilter):
         self.written = 0
         self.written_bp = [0, 0]
         self.qualities = qualities
+        self.file_opener = file_opener
 
     def __call__(self, read, matches):
         """
@@ -275,14 +255,14 @@ class Demultiplexer(SingleEndFilter):
         if matches:
             name = matches[-1].adapter.name
             if name not in self.writers:
-                self.writers[name] = _open_raise_limit(
+                self.writers[name] = self.file_opener.dnaio_open_raise_limit(
                     self.template.replace('{name}', name), self.qualities)
             self.written += 1
             self.written_bp[0] += len(read)
             self.writers[name].write(read)
         else:
             if self.untrimmed_writer is None and self.untrimmed_path is not None:
-                self.untrimmed_writer = _open_raise_limit(
+                self.untrimmed_writer = self.file_opener.dnaio_open_raise_limit(
                     self.untrimmed_path, self.qualities)
             if self.untrimmed_writer is not None:
                 self.written += 1
@@ -303,16 +283,16 @@ class PairedDemultiplexer(PairedEndFilter):
     depending on which adapter (in read 1) matches.
     """
     def __init__(self, path_template, path_paired_template, untrimmed_path, untrimmed_paired_path,
-            qualities):
+            qualities, file_opener):
         """
         The path templates must contain the string '{name}', which will be replaced
         with the name of the adapter to form the final output path.
         Read pairs without an adapter match are written to the files named by
         untrimmed_path.
         """
-        self._demultiplexer1 = Demultiplexer(path_template, untrimmed_path, qualities)
+        self._demultiplexer1 = Demultiplexer(path_template, untrimmed_path, qualities, file_opener)
         self._demultiplexer2 = Demultiplexer(path_paired_template, untrimmed_paired_path,
-            qualities)
+            qualities, file_opener)
 
     @property
     def written(self):
@@ -337,7 +317,7 @@ class CombinatorialDemultiplexer(PairedEndFilter):
     Demultiplex reads depending on which adapter matches, taking into account both matches
     on R1 and R2.
     """
-    def __init__(self, path_template, path_paired_template, untrimmed_name, qualities):
+    def __init__(self, path_template, path_paired_template, untrimmed_name, qualities, file_opener):
         """
         path_template must contain the string '{name1}' and '{name2}', which will be replaced
         with the name of the adapters found on R1 and R2, respectively to form the final output
@@ -355,6 +335,7 @@ class CombinatorialDemultiplexer(PairedEndFilter):
         self.written = 0
         self.written_bp = [0, 0]
         self.qualities = qualities
+        self.file_opener = file_opener
 
     @staticmethod
     def _make_path(template, name1, name2):
@@ -379,8 +360,8 @@ class CombinatorialDemultiplexer(PairedEndFilter):
             path1 = self._make_path(self.template, name1, name2)
             path2 = self._make_path(self.paired_template, name1, name2)
             self.writers[key] = (
-                _open_raise_limit(path1, qualities=self.qualities),
-                _open_raise_limit(path2, qualities=self.qualities),
+                self.file_opener.dnaio_open_raise_limit(path1, qualities=self.qualities),
+                self.file_opener.dnaio_open_raise_limit(path2, qualities=self.qualities),
             )
         writer1, writer2 = self.writers[key]
         self.written += 1


=====================================
src/cutadapt/parser.py
=====================================
@@ -3,6 +3,7 @@ Parse adapter specifications
 """
 import re
 import logging
+from xopen import xopen
 from dnaio.readers import FastaReader
 from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, BackOrFrontAdapter, LinkedAdapter
 
@@ -397,7 +398,8 @@ class AdapterParser:
         """
         if spec.startswith('file:'):
             # read adapter sequences from a file
-            with FastaReader(spec[5:]) as fasta:
+            with xopen(spec[5:], mode="rb", threads=0) as f:
+                fasta = FastaReader(f)
                 for record in fasta:
                     name = record.name.split(None, 1)
                     name = name[0] if name else None


=====================================
src/cutadapt/pipeline.py
=====================================
@@ -6,13 +6,14 @@ import logging
 import functools
 from abc import ABC, abstractmethod
 from multiprocessing import Process, Pipe, Queue
+from pathlib import Path
 import multiprocessing.connection
 import traceback
 
 from xopen import xopen
 import dnaio
 
-from .utils import Progress
+from .utils import Progress, FileOpener
 from .modifiers import PairedModifier
 from .report import Statistics
 from .filters import (Redirector, PairedRedirector, NoFilter, PairedNoFilter, InfoFileWriter,
@@ -93,13 +94,14 @@ class Pipeline(ABC):
     """
     n_adapters = 0
 
-    def __init__(self):
+    def __init__(self, file_opener: FileOpener):
         self._close_files = []
         self._reader = None
         self._filters = None
         self._modifiers = []
         self._outfiles = None
         self._demultiplexer = None
+        self._textiowrappers = []
 
         # Filter settings
         self._minimum_length = None
@@ -108,6 +110,7 @@ class Pipeline(ABC):
         self.discard_casava = False
         self.discard_trimmed = False
         self.discard_untrimmed = False
+        self.file_opener = file_opener
 
     def connect_io(self, infiles: InputFiles, outfiles: OutputFiles):
         self._reader = dnaio.open(infiles.file1, file2=infiles.file2,
@@ -119,6 +122,10 @@ class Pipeline(ABC):
         # for all outputs
         if force_fasta:
             kwargs['fileformat'] = 'fasta'
+        # file and file2 must already be file-like objects because we don’t want to
+        # take care of threads and compression levels here.
+        for f in (file, file2):
+            assert not (f is None and isinstance(f, (str, bytes, Path)))
         return dnaio.open(file, file2=file2, mode='w', qualities=self.uses_qualities,
             **kwargs)
 
@@ -133,7 +140,9 @@ class Pipeline(ABC):
             (WildcardFileWriter, outfiles.wildcard),
         ):
             if outfile:
-                self._filters.append(filter_wrapper(None, filter_class(outfile), None))
+                textiowrapper = io.TextIOWrapper(outfile)
+                self._textiowrappers.append(textiowrapper)
+                self._filters.append(filter_wrapper(None, filter_class(textiowrapper), None))
 
         # minimum length and maximum length
         for lengths, file1, file2, filter_class in (
@@ -184,7 +193,16 @@ class Pipeline(ABC):
                     untrimmed_filter_wrapper(untrimmed_writer, DiscardUntrimmedFilter(), DiscardUntrimmedFilter()))
             self._filters.append(self._final_filter(outfiles))
 
+    def flush(self):
+        for f in self._textiowrappers:
+            f.flush()
+        for f in self._outfiles:
+            if f is not None:
+                f.flush()
+
     def close(self):
+        for f in self._textiowrappers:
+            f.close()  # This also closes the underlying files; a second close occurs below
         for f in self._outfiles:
             # TODO do not use hasattr
             if f is not None and f is not sys.stdin and f is not sys.stdout and hasattr(f, 'close'):
@@ -223,8 +241,8 @@ class SingleEndPipeline(Pipeline):
     """
     paired = False
 
-    def __init__(self):
-        super().__init__()
+    def __init__(self, file_opener: FileOpener):
+        super().__init__(file_opener)
         self._modifiers = []
 
     def add(self, modifier):
@@ -261,7 +279,8 @@ class SingleEndPipeline(Pipeline):
         return NoFilter(writer)
 
     def _create_demultiplexer(self, outfiles):
-        return Demultiplexer(outfiles.out, outfiles.untrimmed, qualities=self.uses_qualities)
+        return Demultiplexer(outfiles.out, outfiles.untrimmed, qualities=self.uses_qualities,
+            file_opener=self.file_opener)
 
     @property
     def minimum_length(self):
@@ -288,8 +307,8 @@ class PairedEndPipeline(Pipeline):
     """
     paired = True
 
-    def __init__(self, pair_filter_mode):
-        super().__init__()
+    def __init__(self, pair_filter_mode, file_opener: FileOpener):
+        super().__init__(file_opener)
         self._pair_filter_mode = pair_filter_mode
         self._reader = None
         # Whether to ignore pair_filter mode for discard-untrimmed filter
@@ -359,10 +378,11 @@ class PairedEndPipeline(Pipeline):
     def _create_demultiplexer(self, outfiles):
         if '{name1}' in outfiles.out and '{name2}' in outfiles.out:
             return CombinatorialDemultiplexer(outfiles.out, outfiles.out2,
-                outfiles.untrimmed, qualities=self.uses_qualities)
+                outfiles.untrimmed, qualities=self.uses_qualities, file_opener=self.file_opener)
         else:
             return PairedDemultiplexer(outfiles.out, outfiles.out2,
-            outfiles.untrimmed, outfiles.untrimmed2, qualities=self.uses_qualities)
+                outfiles.untrimmed, outfiles.untrimmed2, qualities=self.uses_qualities,
+                file_opener=self.file_opener)
 
     @property
     def minimum_length(self):
@@ -472,6 +492,7 @@ class WorkerProcess(Process):
                 outfiles = self._make_output_files()
                 self._pipeline.connect_io(infiles, outfiles)
                 (n, bp1, bp2) = self._pipeline.process_reads()
+                self._pipeline.flush()
                 cur_stats = Statistics().collect(n, bp1, bp2, [], self._pipeline._filters)
                 stats += cur_stats
                 self._send_outfiles(outfiles, chunk_index, n)
@@ -502,7 +523,6 @@ class WorkerProcess(Process):
         that has BytesIO instances for each non-None output file
         """
         output_files = copy.copy(self._orig_outfiles)
-        # TODO info, rest, wildcard need to be StringIO()
         for attr in (
             "out", "out2", "untrimmed", "untrimmed2", "too_short", "too_short2", "too_long",
             "too_long2", "info", "rest", "wildcard"
@@ -632,13 +652,7 @@ class ParallelPipelineRunner(PipelineRunner):
 
     @staticmethod
     def can_output_to(outfiles):
-        return (
-            outfiles.out is not None
-            and outfiles.rest is None
-            and outfiles.info is None
-            and outfiles.wildcard is None
-            and not outfiles.demultiplex
-        )
+        return outfiles.out is not None and not outfiles.demultiplex
 
     def _assign_output(self, outfiles):
         if not self.can_output_to(outfiles):
@@ -682,8 +696,7 @@ class ParallelPipelineRunner(PipelineRunner):
                         # this happens only when there is an exception sending
                         # the statistics)
                         e, tb_str = connection.recv()
-                        # TODO traceback should only be printed in development
-                        logger.debug('%s', tb_str)
+                        logger.error('%s', tb_str)
                         raise e
                     if stats is None:
                         stats = cur_stats
@@ -695,17 +708,16 @@ class ParallelPipelineRunner(PipelineRunner):
                     # An exception has occurred in the worker
                     e, tb_str = connection.recv()
 
-                    # TODO traceback should only be printed in development
                     # We should use the worker's actual traceback object
                     # here, but traceback objects are not picklable.
-                    logger.debug('%s', tb_str)
+                    logger.error('%s', tb_str)
                     raise e
 
                 # No. of reads processed in this chunk
                 chunk_n = connection.recv()
                 if chunk_n == -2:
                     e, tb_str = connection.recv()
-                    logger.debug('%s', tb_str)
+                    logger.error('%s', tb_str)
                     raise e
                 n += chunk_n
                 self._progress.update(n)


=====================================
src/cutadapt/utils.py
=====================================
@@ -1,8 +1,10 @@
 import re
 import sys
 import time
+import errno
 import multiprocessing
 
+from xopen import xopen
 import dnaio
 
 
@@ -142,3 +144,31 @@ def reverse_complemented_sequence(sequence: dnaio.Sequence):
     else:
         qualities = sequence.qualities[::-1]
     return dnaio.Sequence(sequence.name, reverse_complement(sequence.sequence), qualities)
+
+
+class FileOpener:
+    def __init__(self, compression_level: int = 6, threads: int = None):
+        self.compression_level = compression_level
+        self.threads = threads
+
+    def xopen(self, path, mode):
+        return xopen(path, mode, compresslevel=self.compression_level, threads=self.threads)
+
+    def dnaio_open(self, *args, **kwargs):
+        kwargs["opener"] = self.xopen
+        return dnaio.open(*args, **kwargs)
+
+    def dnaio_open_raise_limit(self, path, qualities):
+        """
+        Open a FASTA/FASTQ file for writing. If it fails because the number of open files
+        would be exceeded, try to raise the soft limit and re-try.
+        """
+        try:
+            f = self.dnaio_open(path, mode="w", qualities=qualities)
+        except OSError as e:
+            if e.errno == errno.EMFILE:  # Too many open files
+                raise_open_files_limit(8)
+                f = self.dnaio_open(path, mode="w", qualities=qualities)
+            else:
+                raise
+        return f


=====================================
tests/test_commandline.py
=====================================
@@ -46,10 +46,10 @@ def test_lowercase(run):
     run('-a ttagacatatctccgtcg', 'lowercase.fastq', 'small.fastq')
 
 
-def test_rest(run, tmpdir):
+def test_rest(run, tmpdir, cores):
     """-r/--rest-file"""
     rest = str(tmpdir.join("rest.tmp"))
-    run(['-b', 'ADAPTER', '-N', '-r', rest], "rest.fa", "rest.fa")
+    run(['--cores', str(cores), '-b', 'ADAPTER', '-N', '-r', rest], "rest.fa", "rest.fa")
     assert_files_equal(datapath('rest.txt'), rest)
 
 
@@ -228,11 +228,14 @@ def test_read_wildcard(run):
     ("-a", "wildcard_adapter.fa"),
     ("-b", "wildcard_adapter_anywhere.fa"),
 ])
-def test_adapter_wildcard(adapter_type, expected, run, tmpdir):
+def test_adapter_wildcard(adapter_type, expected, run, tmpdir, cores):
     """wildcards in adapter"""
     wildcard_path = str(tmpdir.join("wildcards.txt"))
-    run("--wildcard-file {} {} ACGTNNNACGT".format(wildcard_path, adapter_type),
-        expected, "wildcard_adapter.fa")
+    run([
+            "--cores", str(cores),
+            "--wildcard-file", wildcard_path,
+            adapter_type, "ACGTNNNACGT"
+        ],  expected, "wildcard_adapter.fa")
     with open(wildcard_path) as wct:
         lines = wct.readlines()
     lines = [line.strip() for line in lines]
@@ -309,26 +312,26 @@ def test_strip_suffix(run):
     run("--strip-suffix _sequence -a XXXXXXX", "stripped.fasta", "simple.fasta")
 
 
-def test_info_file(run, tmpdir):
+def test_info_file(run, tmpdir, cores):
     # The true adapter sequence in the illumina.fastq.gz data set is
     # GCCTAACTTCTTAGACTGCCTTAAGGACGT (fourth base is different from the sequence shown here)
     info_path = str(tmpdir.join("info.txt"))
-    run(["--info-file", info_path, "-a", "adapt=GCCGAACTTCTTAGACTGCCTTAAGGACGT"],
+    run(["--cores", str(cores), "--info-file", info_path, "-a", "adapt=GCCGAACTTCTTAGACTGCCTTAAGGACGT"],
         "illumina.fastq", "illumina.fastq.gz")
     assert_files_equal(cutpath("illumina.info.txt"), info_path)
 
 
-def test_info_file_times(run, tmpdir):
+def test_info_file_times(run, tmpdir, cores):
     info_path = str(tmpdir.join("info.txt"))
-    run(["--info-file", info_path, "--times", "2", "-a", "adapt=GCCGAACTTCTTA",
+    run(["--cores", str(cores), "--info-file", info_path, "--times", "2", "-a", "adapt=GCCGAACTTCTTA",
         "-a", "adapt2=GACTGCCTTAAGGACGT"], "illumina5.fastq", "illumina5.fastq")
     assert_files_equal(cutpath('illumina5.info.txt'), info_path)
 
 
-def test_info_file_fasta(run, tmpdir):
+def test_info_file_fasta(run, tmpdir, cores):
     info_path = str(tmpdir.join("info.txt"))
     # Just make sure that it runs
-    run(["--info-file", info_path, "-a", "TTAGACATAT", "-g", "GAGATTGCCA", "--no-indels"],
+    run(["--cores", str(cores), "--info-file", info_path, "-a", "TTAGACATAT", "-g", "GAGATTGCCA", "--no-indels"],
         "no_indels.fasta", "no_indels.fasta")
 
 



View it on GitLab: https://salsa.debian.org/med-team/python-cutadapt/commit/31e5476ee9dc8503e7d2a812af0957c982687b8b

-- 
View it on GitLab: https://salsa.debian.org/med-team/python-cutadapt/commit/31e5476ee9dc8503e7d2a812af0957c982687b8b
You're receiving this email because of your account on salsa.debian.org.


-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20191125/73e235c9/attachment-0001.html>


More information about the debian-med-commit mailing list