[med-svn] [Git][med-team/python-cutadapt][master] 4 commits: routine-update: New upstream version
Steffen Möller
gitlab at salsa.debian.org
Mon Apr 27 15:18:15 BST 2020
Steffen Möller pushed to branch master at Debian Med / python-cutadapt
Commits:
2c52a82a by Steffen Moeller at 2020-04-27T16:16:12+02:00
routine-update: New upstream version
- - - - -
cf8bf1fa by Steffen Moeller at 2020-04-27T16:16:13+02:00
New upstream version 2.10
- - - - -
b655714a by Steffen Moeller at 2020-04-27T16:16:13+02:00
Update upstream source from tag 'upstream/2.10'
Update to upstream version '2.10'
with Debian dir fab641706ac25bbe4c1c6115bff51b9d0bd215a6
- - - - -
28e1f017 by Steffen Moeller at 2020-04-27T16:16:43+02:00
routine-update: Ready to upload to unstable
- - - - -
21 changed files:
- CHANGES.rst
- debian/changelog
- doc/guide.rst
- doc/ideas.rst
- setup.py
- src/cutadapt/__main__.py
- src/cutadapt/_align.pyx
- src/cutadapt/adapters.py
- src/cutadapt/filters.py
- src/cutadapt/modifiers.py
- src/cutadapt/parser.py
- src/cutadapt/pipeline.py
- src/cutadapt/report.py
- src/cutadapt/utils.py
- tests/test_adapters.py
- tests/test_align.py
- tests/test_commandline.py
- tests/test_modifiers.py
- tests/test_paired.py
- tests/test_parser.py
- tests/test_trim.py
Changes:
=====================================
CHANGES.rst
=====================================
@@ -2,6 +2,21 @@
Changes
=======
+v2.10 (2020-04-22)
+------------------
+
+* Fixed a performance regression introduced in version 2.9.
+* :pr:`449`: ``--action=`` could not be used with ``--pair-adapters``.
+ Fix contributed by wlokhorst.
+* :issue:`450`: ``--untrimmed-output``, ``--too-short-output`` and ``--too-long-output`` can
+ now be written interleaved.
+* :issue:`453`: Fix problem that ``N`` wildcards in adapters did not match ``N`` characters
+ in the read. ``N`` characters now match any character in the read, independent of whether
+ ``--match-read-wildcards`` is used or not.
+* With ``--action=lowercase``/``mask``, print which sequences would have been
+ removed in the “Overview of removed sequences” statistics. Previously, it
+ would show that no sequences have been removed.
+
v2.9 (2020-03-18)
-----------------
=====================================
debian/changelog
=====================================
@@ -1,3 +1,9 @@
+python-cutadapt (2.10-1) unstable; urgency=medium
+
+ * New upstream version
+
+ -- Steffen Moeller <moeller at debian.org> Mon, 27 Apr 2020 16:16:18 +0200
+
python-cutadapt (2.9-1) unstable; urgency=medium
* New upstream version
=====================================
doc/guide.rst
=====================================
@@ -1355,15 +1355,19 @@ fit as well would have a partner that *can* be found. Some read pairs may theref
Interleaved paired-end reads
----------------------------
-Paired-end reads can be read from a single FASTQ file in which the entries for
-the first and second read from each pair alternate. The first read in each pair
-comes before the second. Enable this file format by adding the ``--interleaved``
-option to the command-line. For example::
+Cutadapt supports reading and writing paired-end reads from a single FASTQ file
+in which the entries for the first and second read from each pair alternate.
+The first read in each pair comes before the second. This is called “interleaved”
+format. Enable this file format by adding the ``--interleaved`` option to the
+command-line. Then, if you provide only a single file where usually two would be
+expected, reads are automatically read or written interleaved.
+
+For example, to read interleaved from ``reads.fastq`` and to write interleaved to ``trimmed.fastq``::
cutadapt --interleaved -q 20 -a ACGT -A TGCA -o trimmed.fastq reads.fastq
-To read from an interleaved file, but write regular two-file output, provide the
-second output file as usual with the ``-p`` option::
+In the following example, the input ``reads.fastq`` is interleaved, but output is
+written to two files ``trimmed.1.fastq`` and ``trimmed.2.fastq``::
cutadapt --interleaved -q 20 -a ACGT -A TGCA -o trimmed.1.fastq -p trimmed.2.fastq reads.fastq
@@ -1372,6 +1376,14 @@ a second input file::
cutadapt --interleaved -q 20 -a ACGT -A TGCA -o trimmed.1.fastq reads.1.fastq reads.2.fastq
+The following options also supported interleaved output::
+
+ * ``--untrimmed-output`` (omit ``--untrimmed-paired-output``)
+ * ``--too-short-output`` (omit ``--too-short-paired-output``)
+ * ``--too-long-output`` (omit ``--too-long-paired-output``)
+
+If you omit ``--interleaved`` but trim paired-end files, the above options must be used in pairs.
+
Cutadapt will detect if an input file is not properly interleaved by checking
whether read names match and whether the file contains an even number of entries.
=====================================
doc/ideas.rst
=====================================
@@ -8,12 +8,7 @@ improvements.
- show average error rate
- run pylint, pychecker
- length histogram
-- search for adapters in the order in which they are given on the
- command line
-- more tests for the alignment algorithm
- ``--detect`` prints out best guess which of the given adapters is the correct one
-- it seems the str.find optimization isn't very helpful. In any case, it should be
- moved into the Aligner class.
- allow to remove not the adapter itself, but the sequence before or after it
- warn when given adapter sequence contains non-IUPAC characters
- extensible file type detection
@@ -90,8 +85,9 @@ Paired-end trimming
* Could also use a paired-end read merger, then remove adapters with -a and -g
Available letters for command-line options
------------------------------------------------
+------------------------------------------
-* Remaining characters: All uppercase letters except A, B, G, M, N, O, U
* Lowercase letters: i, k, s, w
-* Planned/reserved: Q (paired-end quality trimming)
+* Uppercase letters: C, D, E, F, H, I, J, K, L, P, R, S, T, V, W
+* Deprecated, could be re-used: c, d, t
+* Planned/reserved: Q (paired-end quality trimming), V (alias for --version)
=====================================
setup.py
=====================================
@@ -103,7 +103,7 @@ setup(
packages=find_packages('src'),
entry_points={'console_scripts': ['cutadapt = cutadapt.__main__:main']},
install_requires=[
- 'dnaio~=0.4.1',
+ 'dnaio~=0.4.2',
'xopen~=0.8.4',
],
extras_require={
=====================================
src/cutadapt/__main__.py
=====================================
@@ -116,8 +116,9 @@ def get_argument_parser() -> ArgumentParser:
group.add_argument("-h", "--help", action="help", help="Show this help message and exit")
group.add_argument("--version", action="version", help="Show version number and exit",
version=__version__)
- group.add_argument("--debug", action='store_true', default=False,
- help="Print debugging information.")
+ group.add_argument("--debug", nargs="?", const=True, default=False,
+ choices=("trace",),
+ help="Print debug log. 'trace' prints also DP matrices")
group.add_argument("--profile", action="store_true", default=False, help=SUPPRESS)
group.add_argument('-j', '--cores', type=int, default=1,
help='Number of CPU cores to use. Use 0 to auto-detect. Default: %(default)s')
@@ -320,7 +321,7 @@ def get_argument_parser() -> ArgumentParser:
"filtering criterion in order for the pair to be filtered. "
"Default: any")
group.add_argument("--interleaved", action='store_true', default=False,
- help="Read and write interleaved paired-end reads.")
+ help="Read and/or write interleaved paired-end reads.")
group.add_argument("--untrimmed-paired-output", metavar="FILE",
help="Write second read in a pair to this FILE when no adapter "
"was found. Use with --untrimmed-output. Default: output "
@@ -392,7 +393,7 @@ def parse_lengths(s: str) -> Tuple[Optional[int], ...]:
def open_output_files(
- args, default_outfile, interleaved: bool, file_opener: FileOpener
+ args, default_outfile, file_opener: FileOpener
) -> OutputFiles:
"""
Return an OutputFiles instance. If demultiplex is True, the untrimmed, untrimmed2, out and out2
@@ -471,7 +472,6 @@ def open_output_files(
out=out,
out2=out2,
demultiplex=bool(demultiplex_mode),
- interleaved=interleaved,
force_fasta=args.fasta,
)
@@ -515,22 +515,11 @@ def determine_paired(args) -> bool:
or args.adapters2
or args.cut2
or args.pair_filter
+ or args.untrimmed_paired_output
or args.too_short_paired_output
or args.too_long_paired_output)
-def determine_interleaved(args) -> Tuple[bool, bool]:
- is_interleaved_input = False
- is_interleaved_output = False
- if args.interleaved:
- is_interleaved_input = len(args.inputs) == 1
- is_interleaved_output = not args.paired_output
- if not is_interleaved_input and not is_interleaved_output:
- raise CommandLineError("When --interleaved is used, you cannot provide both two "
- "input files and two output files")
- return is_interleaved_input, is_interleaved_output
-
-
def setup_input_files(
inputs: Sequence[str], paired: bool, interleaved: bool
) -> Tuple[str, Optional[str]]:
@@ -545,7 +534,7 @@ def setup_input_files(
"You provided {} input file names, but either one or two are expected. ".format(
len(inputs))
+ "The file names were:\n - "
- + "\n - ".join("{!r}".format(p) for p in inputs)
+ + "\n - ".join("'{}'".format(p) for p in inputs)
+ "\nHint: If your path contains spaces, you need to enclose it in quotes")
input_filename = inputs[0]
if paired and not interleaved:
@@ -568,7 +557,7 @@ def setup_input_files(
return input_filename, input_paired_filename
-def check_arguments(args, paired: bool, is_interleaved_output: bool) -> None:
+def check_arguments(args, paired: bool) -> None:
if not paired:
if args.untrimmed_paired_output:
raise CommandLineError("Option --untrimmed-paired-output can only be used when "
@@ -578,24 +567,23 @@ def check_arguments(args, paired: bool, is_interleaved_output: bool) -> None:
raise CommandLineError("Option --pair-adapters can only be used when trimming "
"paired-end reads")
- if paired:
- if not is_interleaved_output:
- if not args.paired_output:
- raise CommandLineError("When a paired-end trimming option such as -A/-G/-B/-U, "
- "is used, a second output file needs to be specified via -p (--paired-output).")
- if not args.output:
- raise CommandLineError("When you use -p or --paired-output, you must also "
- "use the -o option.")
-
- if bool(args.untrimmed_output) != bool(args.untrimmed_paired_output):
- raise CommandLineError("When trimming paired-end reads, you must use either none "
- "or both of the --untrimmed-output/--untrimmed-paired-output options.")
- if args.too_short_output and not args.too_short_paired_output:
- raise CommandLineError("When using --too-short-output with paired-end "
- "reads, you also need to use --too-short-paired-output")
- if args.too_long_output and not args.too_long_paired_output:
- raise CommandLineError("When using --too-long-output with paired-end "
- "reads, you also need to use --too-long-paired-output")
+ if paired and not args.interleaved:
+ if not args.paired_output:
+ raise CommandLineError("When a paired-end trimming option such as -A/-G/-B/-U, "
+ "is used, a second output file needs to be specified via -p (--paired-output).")
+ if not args.output:
+ raise CommandLineError("When you use -p or --paired-output, you must also "
+ "use the -o option.")
+ for out, paired_out, argname in [
+ (args.untrimmed_output, args.untrimmed_paired_output, "untrimmed"),
+ (args.too_short_output, args.too_short_paired_output, "too-short"),
+ (args.too_long_output, args.too_long_paired_output, "too-long"),
+ ]:
+ if bool(out) != bool(paired_out):
+ raise CommandLineError(
+ "When trimming paired-end data, you must use either none or both of the"
+ " --{name}-output/--{name}-paired-output options.".format(name=argname)
+ )
if args.format is not None:
logger.warning("Option --format is deprecated and ignored because the input file format is "
@@ -699,7 +687,7 @@ def adapters_from_args(args) -> Tuple[List[Adapter], List[Adapter]]:
raise CommandLineError(e)
warn_duplicate_adapters(adapters)
warn_duplicate_adapters(adapters2)
- if args.debug:
+ if args.debug == "trace":
for adapter in adapters + adapters2:
adapter.enable_debug()
return adapters, adapters2
@@ -805,7 +793,7 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
parser = get_argument_parser()
if cmdlineargs is None:
cmdlineargs = sys.argv[1:]
- args = parser.parse_args(args=cmdlineargs)
+ args, leftover_args = parser.parse_known_args(args=cmdlineargs)
# log to stderr if results are to be sent to stdout
log_to_stdout = args.output is not None and args.output != "-" and args.paired_output != "-"
# Setup logging only if there are not already any handlers (can happen when
@@ -813,12 +801,7 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
if not logging.root.handlers:
setup_logging(logger, stdout=log_to_stdout,
quiet=args.quiet, minimal=args.report == 'minimal', debug=args.debug)
- if args.profile:
- import cProfile
- profiler = cProfile.Profile()
- profiler.enable()
- else:
- profiler = None
+ profiler = setup_profiler_if_requested(args.profile)
if args.quiet and args.report:
parser.error("Options --quiet and --report cannot be used at the same time")
@@ -835,6 +818,10 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
# Print the header now because some of the functions below create logging output
log_header(cmdlineargs)
+ if leftover_args:
+ warn_if_en_dashes(cmdlineargs)
+ parser.error("unrecognized arguments: " + " ".join(leftover_args))
+
if args.cores < 0:
parser.error('Value for --cores cannot be negative')
@@ -847,12 +834,12 @@ def main(cmdlineargs=None, default_outfile=sys.stdout.buffer):
progress = DummyProgress()
try:
- is_interleaved_input, is_interleaved_output = determine_interleaved(args)
+ is_interleaved_input = args.interleaved and len(args.inputs) == 1
input_filename, input_paired_filename = setup_input_files(args.inputs,
paired, is_interleaved_input)
- check_arguments(args, paired, is_interleaved_output)
+ check_arguments(args, paired)
pipeline = pipeline_from_parsed_args(args, paired, file_opener)
- outfiles = open_output_files(args, default_outfile, is_interleaved_output, file_opener)
+ outfiles = open_output_files(args, default_outfile, file_opener)
infiles = InputFiles(input_filename, file2=input_paired_filename,
interleaved=is_interleaved_input)
runner = setup_runner(pipeline, infiles, outfiles, progress, cores, args.buffer_size)
@@ -901,11 +888,31 @@ def setup_runner(pipeline: Pipeline, infiles, outfiles, progress, cores, buffer_
runner_kwargs = dict()
try:
runner = runner_class(pipeline, infiles, outfiles, progress, **runner_kwargs)
- except (dnaio.UnknownFileFormat, IOError) as e:
+ except (dnaio.UnknownFileFormat, OSError) as e:
raise CommandLineError(e)
return runner
+def setup_profiler_if_requested(requested):
+ if requested:
+ import cProfile
+ profiler = cProfile.Profile()
+ profiler.enable()
+ else:
+ profiler = None
+ return profiler
+
+
+def warn_if_en_dashes(args):
+ for arg in args:
+ if arg.startswith("–"):
+ logger.warning(
+ "The first character in argument '%s' is '–' (an en-dash, Unicode U+2013)"
+ " and will therefore be interpreted as a file name. If you wanted to"
+ " provide an option, use a regular hyphen '-'.", arg
+ )
+
+
if __name__ == '__main__':
main()
=====================================
src/cutadapt/_align.pyx
=====================================
@@ -20,12 +20,12 @@ def _acgt_table():
"""
Return a translation table that maps A, C, G, T characters to the lower
four bits of a byte. Other characters (including possibly IUPAC characters)
- are mapped to zero.
+ are mapped to the most significant bit (0x80).
Lowercase versions are also translated, and U is treated the same as T.
"""
d = dict(A=1, C=2, G=4, T=8, U=8)
- t = bytearray(b'\0') * 256
+ t = bytearray([0x80]) * 256
for c, v in d.items():
t[ord(c)] = v
t[ord(c.lower())] = v
@@ -39,7 +39,11 @@ def _iupac_table():
The table maps ASCII-encoded IUPAC nucleotide characters to bytes in which
the four least significant bits are used to represent one nucleotide each.
- Whether two characters x and y match can then be checked with the
+ For the "N" wildcard, additionally the most significant bit is set (0x80),
+ which allows it to match characters that are not A, C, G or T if _acgt_table
+ was used to encode them.
+
+ Whether two encoded characters x and y match can then be checked with the
expression "x & y != 0".
"""
A = 1
@@ -63,7 +67,7 @@ def _iupac_table():
D=A|G|T,
H=A|C|T,
V=A|C|G,
- N=A|C|G|T
+ N=A|C|G|T + 0x80,
)
t = bytearray(b'\0') * 256
for c, v in iupac.items():
@@ -72,8 +76,9 @@ def _iupac_table():
return bytes(t)
-cdef bytes ACGT_TABLE = _acgt_table()
-cdef bytes IUPAC_TABLE = _iupac_table()
+cdef:
+ bytes ACGT_TABLE = _acgt_table()
+ bytes IUPAC_TABLE = _iupac_table()
class DPMatrix:
@@ -232,6 +237,18 @@ cdef class Aligner:
self._insertion_cost = indel_cost
self._deletion_cost = indel_cost
+ def __reduce__(self):
+ cdef int flags = 0
+ if self.start_in_reference:
+ flags &=1
+ if self.start_in_query:
+ flags &= 2
+ if self.stop_in_reference:
+ flags &= 4
+ if self.stop_in_query:
+ flags &= 8
+ return (Aligner, (self.reference, self.max_error_rate, flags, self.wildcard_ref, self.wildcard_query, self._insertion_cost, self._min_overlap))
+
def _set_reference(self, str reference):
mem = <_Entry*> PyMem_Realloc(self.column, (len(reference) + 1) * sizeof(_Entry))
if not mem:
=====================================
src/cutadapt/adapters.py
=====================================
@@ -30,36 +30,6 @@ class Where(Enum):
LINKED = 'linked'
-class WhereToRemove(Enum):
- PREFIX = 1
- SUFFIX = 2
- AUTO = 3
-
-
-WHERE_TO_REMOVE_MAP = {
- Where.PREFIX: WhereToRemove.PREFIX,
- Where.FRONT_NOT_INTERNAL: WhereToRemove.PREFIX,
- Where.FRONT: WhereToRemove.PREFIX,
- Where.BACK: WhereToRemove.SUFFIX,
- Where.SUFFIX: WhereToRemove.SUFFIX,
- Where.BACK_NOT_INTERNAL: WhereToRemove.SUFFIX,
- Where.ANYWHERE: WhereToRemove.AUTO,
-}
-
-
-# TODO could become a property/attribute of the Adapter classes
-ADAPTER_TYPE_NAMES = {
- Where.BACK: "regular 3'",
- Where.BACK_NOT_INTERNAL: "non-internal 3'",
- Where.FRONT: "regular 5'",
- Where.FRONT_NOT_INTERNAL: "non-internal 5'",
- Where.PREFIX: "anchored 5'",
- Where.SUFFIX: "anchored 3'",
- Where.ANYWHERE: "variable 5'/3'",
- Where.LINKED: "linked",
-}
-
-
def returns_defaultdict_int():
# We need this function to make EndStatistics picklable.
# Even a @staticmethod of EndStatistics is not sufficient
@@ -71,20 +41,19 @@ class EndStatistics:
"""Statistics about the 5' or 3' end"""
def __init__(self, adapter: "SingleAdapter"):
- self.where = adapter.where # type: Where
self.max_error_rate = adapter.max_error_rate # type: float
self.sequence = adapter.sequence # type: str
self.effective_length = adapter.effective_length # type: int
self.has_wildcards = adapter.adapter_wildcards # type: bool
# self.errors[l][e] == n iff n times a sequence of length l matching at e errors was removed
self.errors = defaultdict(returns_defaultdict_int) # type: Dict[int, Dict[int, int]]
- self._remove = adapter.remove # type: Optional[WhereToRemove]
self.adjacent_bases = {'A': 0, 'C': 0, 'G': 0, 'T': 0, '': 0}
+ # TODO avoid hard-coding the list of classes
+ self._remove_prefix = isinstance(adapter, (FrontAdapter, NonInternalFrontAdapter, PrefixAdapter))
def __repr__(self):
errors = {k: dict(v) for k, v in self.errors.items()}
- return "EndStatistics(where={}, max_error_rate={}, errors={}, adjacent_bases={})".format(
- self.where,
+ return "EndStatistics(max_error_rate={}, errors={}, adjacent_bases={})".format(
self.max_error_rate,
errors,
self.adjacent_bases,
@@ -94,9 +63,7 @@ class EndStatistics:
if not isinstance(other, self.__class__):
raise ValueError("Cannot compare")
if (
- self.where != other.where
- or self._remove != other._remove
- or self.max_error_rate != other.max_error_rate
+ self.max_error_rate != other.max_error_rate
or self.sequence != other.sequence
or self.effective_length != other.effective_length
):
@@ -113,7 +80,7 @@ class EndStatistics:
d = {length: sum(errors.values()) for length, errors in self.errors.items()}
return d
- def random_match_probabilities(self, gc_content: float):
+ def random_match_probabilities(self, gc_content: float) -> List[float]:
"""
Estimate probabilities that this adapter end matches a
random sequence. Indels are not taken into account.
@@ -121,13 +88,10 @@ class EndStatistics:
Returns a list p, where p[i] is the probability that
i bases of this adapter match a random sequence with
GC content gc_content.
-
- The where parameter is necessary for linked adapters to
- specify which (front or back) of the two adapters is meant.
"""
seq = self.sequence
- # FIXME this is broken for self._remove == 'auto'
- if self._remove == WhereToRemove.PREFIX:
+ # FIXME this is broken for 'anywhere' adapters
+ if self._remove_prefix:
seq = seq[::-1]
allowed_bases = 'CGRYSKMBDHVN' if self.has_wildcards else 'GC'
p = 1.
@@ -150,30 +114,27 @@ class AdapterStatistics:
def __init__(
self,
- adapter: "SingleAdapter",
- adapter2: Optional["SingleAdapter"] = None,
- where: Optional[Where] = None,
+ adapter: "Adapter",
+ front: "SingleAdapter",
+ back: Optional["SingleAdapter"] = None,
):
self.name = adapter.name
- self.where = where if where is not None else adapter.where
- self.front = EndStatistics(adapter)
- self.reverse_complemented = 0
- if adapter2 is None:
- self.back = EndStatistics(adapter)
+ self.adapter = adapter
+ self.front = EndStatistics(front)
+ if back is None:
+ self.back = EndStatistics(front)
else:
- self.back = EndStatistics(adapter2)
+ self.back = EndStatistics(back)
+ self.reverse_complemented = 0
def __repr__(self):
- return "AdapterStatistics(name={}, where={}, front={}, back={})".format(
+ return "AdapterStatistics(name={}, front={}, back={})".format(
self.name,
- self.where,
self.front,
self.back,
)
def __iadd__(self, other: "AdapterStatistics"):
- if self.where != other.where: # TODO self.name != other.name or
- raise ValueError('incompatible objects')
self.front += other.front
self.back += other.back
self.reverse_complemented += other.reverse_complemented
@@ -187,18 +148,21 @@ class Match(ABC):
pass
@abstractmethod
- def get_info_records(self) -> List[List]:
+ def get_info_records(self, read) -> List[List]:
+ pass
+
+ @abstractmethod
+ def trimmed(self, read):
pass
-class SingleMatch(Match):
+class SingleMatch(Match, ABC):
"""
- Representation of a single adapter matched to a single read.
+ Representation of a single adapter matched to a single string
"""
- __slots__ = ['astart', 'astop', 'rstart', 'rstop', 'matches', 'errors', 'remove_before',
- 'adapter', 'read', 'length', '_trimmed_read', 'adjacent_base']
+ __slots__ = ['astart', 'astop', 'rstart', 'rstop', 'matches', 'errors',
+ 'adapter', 'sequence', 'length', 'adjacent_base']
- # TODO Can remove_before be removed from the constructor parameters?
def __init__(
self,
astart: int,
@@ -207,13 +171,10 @@ class SingleMatch(Match):
rstop: int,
matches: int,
errors: int,
- remove_before: bool,
adapter: "SingleAdapter",
- read,
+ sequence: str,
):
- """
- remove_before -- True: remove bases before adapter. False: remove after
- """
+ self.adjacent_base = ""
self.astart = astart # type: int
self.astop = astop # type: int
self.rstart = rstart # type: int
@@ -221,16 +182,7 @@ class SingleMatch(Match):
self.matches = matches # type: int
self.errors = errors # type: int
self.adapter = adapter # type: SingleAdapter
- self.read = read
- if remove_before:
- # Compute the trimmed read, assuming it’s a 'front' adapter
- self._trimmed_read = read[rstop:]
- self.adjacent_base = "" # type: str
- else:
- # Compute the trimmed read, assuming it’s a 'back' adapter
- self.adjacent_base = read.sequence[rstart - 1:rstart]
- self._trimmed_read = read[:rstart]
- self.remove_before = remove_before # type: bool
+ self.sequence = sequence
# Number of aligned characters in the adapter. If there are
# indels, this may be different from the number of characters
# in the read.
@@ -249,36 +201,14 @@ class SingleMatch(Match):
If there are indels, this is not reliable as the full alignment
is not available.
"""
- wildcards = [self.read.sequence[self.rstart + i] for i in range(self.length)
+ wildcards = [self.sequence[self.rstart + i] for i in range(self.length)
if self.adapter.sequence[self.astart + i] == wildcard_char and
- self.rstart + i < len(self.read.sequence)]
+ self.rstart + i < len(self.sequence)]
return ''.join(wildcards)
- def rest(self) -> str:
- """
- Return the part of the read before this match if this is a
- 'front' (5') adapter,
- return the part after the match if this is not a 'front' adapter (3').
- This can be an empty string.
- """
- if self.remove_before:
- return self.read.sequence[:self.rstart]
- else:
- return self.read.sequence[self.rstop:]
-
- def remainder_interval(self) -> Tuple[int, int]:
- """
- Return an interval (start, stop) that describes the part of the read that would
- remain after trimming
- """
- if self.remove_before:
- return self.rstop, len(self.read.sequence)
- else:
- return 0, self.rstart
-
- def get_info_records(self) -> List[List]:
- seq = self.read.sequence
- qualities = self.read.qualities
+ def get_info_records(self, read) -> List[List]:
+ seq = read.sequence
+ qualities = read.qualities
info = [
"",
self.errors,
@@ -293,26 +223,87 @@ class SingleMatch(Match):
info += [
qualities[0:self.rstart],
qualities[self.rstart:self.rstop],
- qualities[self.rstop:]
+ qualities[self.rstop:],
]
else:
info += ["", "", ""]
return [info]
- def trimmed(self):
- return self._trimmed_read
+
+class RemoveBeforeMatch(SingleMatch):
+ """A match that removes sequence before the match"""
+
+ def __repr__(self):
+ return 'RemoveBeforeMatch(astart={}, astop={}, rstart={}, rstop={}, matches={}, errors={})'.format(
+ self.astart, self.astop, self.rstart, self.rstop, self.matches, self.errors)
+
+ def rest(self) -> str:
+ """
+ Return the part of the read before this match if this is a
+ 'front' (5') adapter,
+ return the part after the match if this is not a 'front' adapter (3').
+ This can be an empty string.
+ """
+ return self.sequence[:self.rstart]
+
+ def remainder_interval(self) -> Tuple[int, int]:
+ """
+ Return an interval (start, stop) that describes the part of the read that would
+ remain after trimming
+ """
+ return self.rstop, len(self.sequence)
+
+ def trim_slice(self):
+ # Same as remainder_interval, but as a slice() object
+ return slice(self.rstop, None)
+
+ def trimmed(self, read):
+ return read[self.rstop:]
def update_statistics(self, statistics: AdapterStatistics):
"""Update AdapterStatistics in place"""
- if self.remove_before:
- statistics.front.errors[self.rstop][self.errors] += 1
- else:
- statistics.back.errors[len(self.read) - len(self._trimmed_read)][self.errors] += 1
- try:
- statistics.back.adjacent_bases[self.adjacent_base] += 1
- except KeyError:
- statistics.back.adjacent_bases[''] = 1
+ statistics.front.errors[self.rstop][self.errors] += 1
+
+
+class RemoveAfterMatch(SingleMatch):
+ """A match that removes sequence after the match"""
+
+ def __repr__(self):
+ return "RemoveAfterMatch(astart={}, astop={}, rstart={}, rstop={}, matches={}, errors={})".format(
+ self.astart, self.astop, self.rstart, self.rstop, self.matches, self.errors)
+
+ def rest(self) -> str:
+ """
+ Return the part of the read before this match if this is a
+ 'front' (5') adapter,
+ return the part after the match if this is not a 'front' adapter (3').
+ This can be an empty string.
+ """
+ return self.sequence[self.rstop:]
+
+ def remainder_interval(self) -> Tuple[int, int]:
+ """
+ Return an interval (start, stop) that describes the part of the read that would
+ remain after trimming
+ """
+ return 0, self.rstart
+
+ def trim_slice(self):
+ # Same as remainder_interval, but as a slice() object
+ return slice(None, self.rstart)
+
+ def trimmed(self, read):
+ return read[:self.rstart]
+
+ def update_statistics(self, statistics: AdapterStatistics):
+ """Update AdapterStatistics in place"""
+ adjacent_base = self.sequence[self.rstart - 1:self.rstart]
+ statistics.back.errors[len(self.sequence) - self.rstart][self.errors] += 1
+ try:
+ statistics.back.adjacent_bases[adjacent_base] += 1
+ except KeyError:
+ statistics.back.adjacent_bases[''] = 1
def _generate_adapter_name(_start=[1]) -> str:
@@ -322,19 +313,22 @@ def _generate_adapter_name(_start=[1]) -> str:
class Adapter(ABC):
- def __init__(self, *args, **kwargs):
- pass
+
+ description = "adapter with one component" # this is overriden in subclasses
+
+ def __init__(self, name: Optional[str], *args, **kwargs):
+ self.name = name
@abstractmethod
def enable_debug(self):
pass
@abstractmethod
- def match_to(self, read):
+ def match_to(self, sequence: str):
pass
-class SingleAdapter(Adapter):
+class SingleAdapter(Adapter, ABC):
"""
This class can find a single adapter characterized by sequence, error rate,
type etc. within reads.
@@ -342,14 +336,6 @@ class SingleAdapter(Adapter):
where -- A Where enum value. This influences where the adapter is allowed to appear within the
read.
- remove -- a WhereToRemove enum value. This describes which part of the read to remove if the
- adapter was found:
- * WhereToRemove.PREFIX (for a 3' adapter)
- * WhereToRemove.SUFFIX (for a 5' adapter)
- * WhereToRemove.AUTO for a 5'/3' mixed adapter (if the match involves the first base of
- the read, it is assumed to be a 5' adapter and a 3' otherwise)
- * None: One of the above is chosen depending on the 'where' parameter
-
sequence -- The adapter sequence as string. Will be converted to uppercase.
Also, Us will be converted to Ts.
@@ -372,8 +358,6 @@ class SingleAdapter(Adapter):
def __init__(
self,
sequence: str,
- where: Where,
- remove: Optional[WhereToRemove] = None,
max_error_rate: float = 0.1,
min_overlap: int = 3,
read_wildcards: bool = False,
@@ -381,15 +365,12 @@ class SingleAdapter(Adapter):
name: Optional[str] = None,
indels: bool = True,
):
- super().__init__()
- assert not isinstance(remove, str)
+ super().__init__(name)
self._debug = False # type: bool
self.name = _generate_adapter_name() if name is None else name # type: str
self.sequence = sequence.upper().replace("U", "T") # type: str
if not self.sequence:
raise ValueError("Adapter sequence is empty")
- self.where = where # type: Where
- self.remove = WHERE_TO_REMOVE_MAP[where] if remove is None else remove # type: WhereToRemove
self.max_error_rate = max_error_rate # type: float
self.min_overlap = min(min_overlap, len(self.sequence)) # type: int
iupac = frozenset('XACGTURYSWKMBDHVN')
@@ -403,36 +384,29 @@ class SingleAdapter(Adapter):
self.adapter_wildcards = adapter_wildcards and not set(self.sequence) <= set("ACGT") # type: bool
self.read_wildcards = read_wildcards # type: bool
self.indels = indels # type: bool
- if self.is_anchored and not self.indels:
- aligner_class = align.PrefixComparer if self.where is Where.PREFIX else align.SuffixComparer
- self.aligner = aligner_class(
- self.sequence,
- self.max_error_rate,
- wildcard_ref=self.adapter_wildcards,
- wildcard_query=self.read_wildcards,
- min_overlap=self.min_overlap
- )
- else:
- # TODO
- # Indels are suppressed by setting their cost very high, but a different algorithm
- # should be used instead.
- indel_cost = 1 if self.indels else 100000
- self.aligner = align.Aligner(
- self.sequence,
- self.max_error_rate,
- flags=self.where.value,
- wildcard_ref=self.adapter_wildcards,
- wildcard_query=self.read_wildcards,
- indel_cost=indel_cost,
- min_overlap=self.min_overlap,
- )
+ self.aligner = self._aligner()
+
+ def _make_aligner(self, flags):
+ # TODO
+ # Indels are suppressed by setting their cost very high, but a different algorithm
+ # should be used instead.
+ indel_cost = 1 if self.indels else 100000
+ return align.Aligner(
+ self.sequence,
+ self.max_error_rate,
+ flags=flags,
+ wildcard_ref=self.adapter_wildcards,
+ wildcard_query=self.read_wildcards,
+ indel_cost=indel_cost,
+ min_overlap=self.min_overlap,
+ )
def __repr__(self):
- return '<SingleAdapter(name={name!r}, sequence={sequence!r}, where={where}, '\
- 'remove={remove}, max_error_rate={max_error_rate}, min_overlap={min_overlap}, '\
+ return '<{cls}(name={name!r}, sequence={sequence!r}, '\
+ 'max_error_rate={max_error_rate}, min_overlap={min_overlap}, '\
'read_wildcards={read_wildcards}, '\
'adapter_wildcards={adapter_wildcards}, '\
- 'indels={indels})>'.format(**vars(self))
+ 'indels={indels})>'.format(cls=self.__class__.__name__, **vars(self))
@property
def is_anchored(self):
@@ -451,79 +425,68 @@ class SingleAdapter(Adapter):
self._debug = True
self.aligner.enable_debug()
- def match_to(self, read):
+ @abstractmethod
+ def _aligner(self):
+ pass
+
+ @abstractmethod
+ def match_to(self, sequence: str):
"""
- Attempt to match this adapter to the given read.
+ Attempt to match this adapter to the given string.
Return a Match instance if a match was found;
return None if no match was found given the matching criteria (minimum
overlap length, maximum error rate).
"""
- read_seq = read.sequence
- pos = -1
-
- # try to find an exact match first unless wildcards are allowed
- if not self.adapter_wildcards:
- if self.where is Where.PREFIX:
- pos = 0 if read_seq.startswith(self.sequence) else -1
- elif self.where is Where.SUFFIX:
- pos = (len(read_seq) - len(self.sequence)) if read_seq.endswith(self.sequence) else -1
- elif self.where is Where.BACK or self.where is Where.FRONT:
- pos = read_seq.find(self.sequence)
- # TODO BACK_NOT_INTERNAL, FRONT_NOT_INTERNAL
- if pos >= 0:
- match_args = (
- 0, len(self.sequence), pos, pos + len(self.sequence),
- len(self.sequence), 0)
- else:
- # try approximate matching
- alignment = self.aligner.locate(read_seq)
- if self._debug:
- try:
- print(self.aligner.dpmatrix) # pragma: no cover
- except AttributeError:
- pass
- if alignment is None:
- match_args = None
- else:
- astart, astop, rstart, rstop, matches, errors = alignment
- match_args = (astart, astop, rstart, rstop, matches, errors)
-
- if match_args is None:
- return None
- if self.remove == WhereToRemove.AUTO:
- # guess: if alignment starts at pos 0, it’s a 5' adapter
- remove_before = match_args[2] == 0 # index 2 is rstart
- else:
- remove_before = self.remove == WhereToRemove.PREFIX
- match = SingleMatch(*match_args, remove_before=remove_before, adapter=self, read=read)
-
- assert match.length >= self.min_overlap
- return match
def __len__(self):
return len(self.sequence)
def create_statistics(self):
- return AdapterStatistics(self)
+ return AdapterStatistics(self, self)
-class BackOrFrontAdapter(SingleAdapter):
- """A 5' or 3' adapter.
+class FrontAdapter(SingleAdapter):
+ """A 5' adapter"""
- This is separate from the Adapter class so that a specialized match_to
- method can be implemented that reduces some of the runtime checks.
+ description = "regular 5'"
- TODO The generic Adapter class should become abstract, and the other
- adapter types should also get their own classes.
- """
+ def __init__(self, *args, **kwargs):
+ self._force_anywhere = kwargs.pop("force_anywhere", False)
+ super().__init__(*args, **kwargs)
+
+ def _aligner(self):
+ return self._make_aligner(Where.ANYWHERE.value if self._force_anywhere else Where.FRONT.value)
+
+ def match_to(self, sequence: str):
+ """
+ Attempt to match this adapter to the given read.
+
+ Return a Match instance if a match was found;
+ return None if no match was found given the matching criteria (minimum
+ overlap length, maximum error rate).
+ """
+ alignment = self.aligner.locate(sequence.upper()) # type: Optional[Tuple[int,int,int,int,int,int]]
+ if self._debug:
+ print(self.aligner.dpmatrix) # pragma: no cover
+ if alignment is None:
+ return None
+ return RemoveBeforeMatch(*alignment, adapter=self, sequence=sequence)
+
+
+class BackAdapter(SingleAdapter):
+ """A 3' adapter"""
+
+ description = "regular 3'"
def __init__(self, *args, **kwargs):
+ self._force_anywhere = kwargs.pop("force_anywhere", False)
super().__init__(*args, **kwargs)
- assert self.where is Where.BACK or self.where is Where.FRONT
- self._remove_before = self.remove is WhereToRemove.PREFIX
- def match_to(self, read):
+ def _aligner(self):
+ return self._make_aligner(Where.ANYWHERE.value if self._force_anywhere else Where.BACK.value)
+
+ def match_to(self, sequence: str):
"""
Attempt to match this adapter to the given read.
@@ -531,35 +494,142 @@ class BackOrFrontAdapter(SingleAdapter):
return None if no match was found given the matching criteria (minimum
overlap length, maximum error rate).
"""
- read_seq = read.sequence.upper() # temporary copy
- pos = -1
-
- # try to find an exact match first unless wildcards are allowed
- if not self.adapter_wildcards:
- pos = read_seq.find(self.sequence)
- if pos >= 0:
- alignment = (
- 0, len(self.sequence), pos, pos + len(self.sequence),
- len(self.sequence), 0)
- else:
- alignment = self.aligner.locate(read_seq)
+ alignment = self.aligner.locate(sequence.upper()) # type: Optional[Tuple[int,int,int,int,int,int]]
if self._debug:
print(self.aligner.dpmatrix) # pragma: no cover
if alignment is None:
return None
+ return RemoveAfterMatch(*alignment, adapter=self, sequence=sequence)
+
+
+class AnywhereAdapter(SingleAdapter):
+ """
+ An adapter that can be 5' or 3'. If a match involves the first base of
+ the read, it is assumed to be a 5' adapter and a 3' otherwise.
+ """
+
+ description = "variable 5'/3'"
+
+ def _aligner(self):
+ return self._make_aligner(Where.ANYWHERE.value)
- match = SingleMatch(*alignment, remove_before=self._remove_before, adapter=self, read=read)
+ def match_to(self, sequence: str):
+ """
+ Attempt to match this adapter to the given string.
+
+ Return a Match instance if a match was found;
+ return None if no match was found given the matching criteria (minimum
+ overlap length, maximum error rate).
+ """
+ alignment = self.aligner.locate(sequence)
+ if self._debug:
+ print(self.aligner.dpmatrix) # pragma: no cover
+ if alignment is None:
+ return None
+ # guess: if alignment starts at pos 0, it’s a 5' adapter
+ if alignment[2] == 0: # index 2 is rstart
+ match = RemoveBeforeMatch(*alignment, adapter=self, sequence=sequence) # type: ignore
+ else:
+ match = RemoveAfterMatch(*alignment, adapter=self, sequence=sequence) # type: ignore
return match
+class NonInternalFrontAdapter(FrontAdapter):
+ """A non-internal 5' adapter"""
+
+ description = "non-internal 5'"
+
+ def _aligner(self):
+ return self._make_aligner(Where.FRONT_NOT_INTERNAL.value)
+
+ def match_to(self, sequence: str):
+ if not self.adapter_wildcards and sequence.startswith(self.sequence):
+ n = len(self.sequence)
+ return RemoveBeforeMatch(
+ 0, n, 0, n, n, 0, adapter=self, sequence=sequence
+ ) # type: ignore
+ alignment = self.aligner.locate(sequence)
+ if self._debug:
+ try:
+ print(self.aligner.dpmatrix) # pragma: no cover
+ except AttributeError:
+ pass
+ if alignment is None:
+ return None
+ return RemoveBeforeMatch(*alignment, adapter=self, sequence=sequence) # type: ignore
+
+
+class NonInternalBackAdapter(BackAdapter):
+ """A non-internal 3' adapter"""
+
+ description = "non-internal 3'"
+
+ def _aligner(self):
+ return self._make_aligner(Where.BACK_NOT_INTERNAL.value)
+
+ def match_to(self, sequence: str):
+ if not self.adapter_wildcards and sequence.endswith(self.sequence):
+ # Exact match found
+ # astart, astop, rstart, rstop, matches, errors
+ n = len(self.sequence)
+ return RemoveAfterMatch(
+ 0, n, len(sequence) - n, len(sequence), n, 0, adapter=self, sequence=sequence
+ ) # type: ignore
+ alignment = self.aligner.locate(sequence)
+ if self._debug:
+ try:
+ print(self.aligner.dpmatrix) # pragma: no cover
+ except AttributeError:
+ pass
+ if alignment is None:
+ return None
+ return RemoveAfterMatch(*alignment, adapter=self, sequence=sequence) # type: ignore
+
+
+class PrefixAdapter(NonInternalFrontAdapter):
+ """An anchored 5' adapter"""
+
+ description = "anchored 5'"
+
+ def _aligner(self):
+ if not self.indels: # TODO or if error rate allows 0 errors anyway
+ return align.PrefixComparer(
+ self.sequence,
+ self.max_error_rate,
+ wildcard_ref=self.adapter_wildcards,
+ wildcard_query=self.read_wildcards,
+ min_overlap=self.min_overlap
+ )
+ else:
+ return self._make_aligner(Where.PREFIX.value)
+
+
+class SuffixAdapter(NonInternalBackAdapter):
+ """An anchored 3' adapter"""
+
+ description = "anchored 3'"
+
+ def _aligner(self):
+ if not self.indels: # TODO or if error rate allows 0 errors anyway
+ return align.SuffixComparer(
+ self.sequence,
+ self.max_error_rate,
+ wildcard_ref=self.adapter_wildcards,
+ wildcard_query=self.read_wildcards,
+ min_overlap=self.min_overlap
+ )
+ else:
+ return self._make_aligner(Where.SUFFIX.value)
+
+
class LinkedMatch(Match):
"""
Represent a match of a LinkedAdapter
"""
- def __init__(self, front_match: SingleMatch, back_match: SingleMatch, adapter: "LinkedAdapter"):
+ def __init__(self, front_match: RemoveBeforeMatch, back_match: RemoveAfterMatch, adapter: "LinkedAdapter"):
assert front_match is not None or back_match is not None
- self.front_match = front_match # type: SingleMatch
- self.back_match = back_match # type: SingleMatch
+ self.front_match = front_match # type: RemoveBeforeMatch
+ self.back_match = back_match # type: RemoveAfterMatch
self.adapter = adapter # type: LinkedAdapter
def __repr__(self):
@@ -585,14 +655,12 @@ class LinkedMatch(Match):
e += self.back_match.errors
return e
- def trimmed(self):
+ def trimmed(self, read):
+ if self.front_match:
+ read = self.front_match.trimmed(read)
if self.back_match:
- # back match is relative to front match, so even if a front match exists,
- # this is correct
- return self.back_match.trimmed()
- else:
- assert self.front_match
- return self.front_match.trimmed()
+ read = self.back_match.trimmed(read)
+ return read
@property
def adjacent_base(self):
@@ -603,13 +671,14 @@ class LinkedMatch(Match):
if self.front_match:
statistics.front.errors[self.front_match.rstop][self.front_match.errors] += 1
if self.back_match:
- statistics.back.errors[len(self.back_match.read) - self.back_match.rstart][self.back_match.errors] += 1
+ length = len(self.back_match.sequence) - self.back_match.rstart
+ statistics.back.errors[length][self.back_match.errors] += 1
def remainder_interval(self) -> Tuple[int, int]:
matches = [match for match in [self.front_match, self.back_match] if match is not None]
return remainder(matches)
- def get_info_records(self) -> List[List]:
+ def get_info_records(self, read) -> List[List]:
records = []
for match, namesuffix in [
(self.front_match, ";1"),
@@ -617,15 +686,18 @@ class LinkedMatch(Match):
]:
if match is None:
continue
- record = match.get_info_records()[0]
- record[7] = self.adapter.name + namesuffix
+ record = match.get_info_records(read)[0]
+ record[7] = ("none" if self.adapter.name is None else self.adapter.name) + namesuffix
records.append(record)
+ read = match.trimmed(read)
return records
class LinkedAdapter(Adapter):
- """
- """
+ """A 5' adapter combined with a 3' adapter"""
+
+ description = "linked"
+
def __init__(
self,
front_adapter,
@@ -634,7 +706,7 @@ class LinkedAdapter(Adapter):
back_required,
name,
):
- super().__init__()
+ super().__init__(name)
self.front_required = front_required
self.back_required = back_required
@@ -649,26 +721,22 @@ class LinkedAdapter(Adapter):
self.front_adapter.enable_debug()
self.back_adapter.enable_debug()
- def match_to(self, read):
+ def match_to(self, sequence: str) -> Optional[LinkedMatch]:
"""
- Match the linked adapters against the given read. Any anchored adapters are
- required to exist for a successful match. If both adapters are unanchored,
- both need to match.
+ Match the two linked adapters against a string
"""
- front_match = self.front_adapter.match_to(read)
+ front_match = self.front_adapter.match_to(sequence)
if self.front_required and front_match is None:
return None
-
if front_match is not None:
- # TODO statistics
- read = front_match.trimmed()
- back_match = self.back_adapter.match_to(read)
+ sequence = sequence[front_match.trim_slice()]
+ back_match = self.back_adapter.match_to(sequence)
if back_match is None and (self.back_required or front_match is None):
return None
return LinkedMatch(front_match, back_match, self)
def create_statistics(self):
- return AdapterStatistics(self.front_adapter, self.back_adapter, where=Where.LINKED)
+ return AdapterStatistics(self, self.front_adapter, self.back_adapter)
@property
def sequence(self):
@@ -679,14 +747,13 @@ class LinkedAdapter(Adapter):
return None
-class MultiAdapter(Adapter):
+class MultiAdapter(Adapter, ABC):
"""
Represent multiple adapters of the same type at once and use an index data structure
to speed up matching. This acts like a "normal" Adapter as it provides a match_to
method, but is faster with lots of adapters.
There are quite a few restrictions:
- - the adapters need to be either all PREFIX or all SUFFIX adapters
- no indels are allowed
- the error rate allows at most 2 mismatches
- wildcards in the adapter are not allowed
@@ -694,48 +761,45 @@ class MultiAdapter(Adapter):
Use the is_acceptable() method to check individual adapters.
"""
+ MultiAdapterIndex = Dict[str, Tuple[SingleAdapter, int, int]]
def __init__(self, adapters):
- """All given adapters must be of the same type, either Where.PREFIX or Where.SUFFIX"""
- super().__init__()
+ """All given adapters must be of the same type"""
+ super().__init__(name="multi_adapter")
if not adapters:
raise ValueError("Adapter list is empty")
- self._where = adapters[0].where
for adapter in adapters:
self._accept(adapter)
- if adapter.where is not self._where:
- raise ValueError("All adapters must have identical 'where' attributes")
- assert self._where in (Where.PREFIX, Where.SUFFIX)
self._adapters = adapters
self._lengths, self._index = self._make_index()
- if self._where is Where.PREFIX:
- def make_affix(read, n):
- return read.sequence[:n]
- else:
- def make_affix(read, n):
- return read.sequence[-n:]
- self._make_affix = make_affix
+ self._make_affix = self._get_make_affix()
def __repr__(self):
- return 'MultiAdapter(adapters={!r}, where={})'.format(self._adapters, self._where)
+ return "MultiAdapter(adapters={!r})".format(self._adapters)
- @staticmethod
- def _accept(adapter):
+ @abstractmethod
+ def _get_make_affix(self):
+ pass
+
+ @abstractmethod
+ def _make_match(self, adapter, length, matches, errors, sequence) -> SingleMatch:
+ pass
+
+ @classmethod
+ def _accept(cls, adapter):
"""Raise a ValueError if the adapter is not acceptable"""
- if adapter.where is not Where.PREFIX and adapter.where is not Where.SUFFIX:
- raise ValueError("Only anchored adapter types are allowed")
if adapter.read_wildcards:
raise ValueError("Wildcards in the read not supported")
if adapter.adapter_wildcards:
raise ValueError("Wildcards in the adapter not supported")
- if adapter.indels:
- raise ValueError("Indels not allowed")
k = int(len(adapter) * adapter.max_error_rate)
+ if k > 0 and adapter.indels:
+ raise ValueError("Indels not allowed")
if k > 2:
raise ValueError("Error rate too high")
- @staticmethod
- def is_acceptable(adapter):
+ @classmethod
+ def is_acceptable(cls, adapter):
"""
Return whether this adapter is acceptable for being used by MultiAdapter
@@ -743,14 +807,14 @@ class MultiAdapter(Adapter):
or would lead to a very large index.
"""
try:
- MultiAdapter._accept(adapter)
+ cls._accept(adapter)
except ValueError:
return False
return True
- def _make_index(self):
+ def _make_index(self) -> Tuple[List[int], "MultiAdapterIndex"]:
logger.info('Building index of %s adapters ...', len(self._adapters))
- index = dict()
+ index = dict() # type: MultiAdapter.MultiAdapterIndex
lengths = set()
has_warned = False
for adapter in self._adapters:
@@ -777,13 +841,13 @@ class MultiAdapter(Adapter):
return sorted(lengths, reverse=True), index
- def match_to(self, read):
+ def match_to(self, sequence: str):
"""
- Match the adapters against the read and return a Match that represents
+ Match the adapters against a string and return a Match that represents
the best match or None if no match was found
"""
- # Check all the prefixes or suffixes (affixes) of the read that could match
- best_adapter = None
+ # Check all the prefixes or suffixes (affixes) that could match
+ best_adapter = None # type: Optional[SingleAdapter]
best_length = 0
best_m = -1
best_e = 1000
@@ -792,7 +856,7 @@ class MultiAdapter(Adapter):
# No chance of getting the same or a higher number of matches, so we can stop early
break
- affix = self._make_affix(read, length)
+ affix = self._make_affix(sequence, length)
try:
adapter, e, m = self._index[affix]
except KeyError:
@@ -806,35 +870,77 @@ class MultiAdapter(Adapter):
if best_m == -1:
return None
else:
- if self._where is Where.PREFIX:
- rstart, rstop = 0, best_length
- else:
- assert self._where is Where.SUFFIX
- rstart, rstop = len(read) - best_length, len(read)
- return SingleMatch(
- astart=0,
- astop=len(best_adapter.sequence),
- rstart=rstart,
- rstop=rstop,
- matches=best_m,
- errors=best_e,
- remove_before=best_adapter.remove is WhereToRemove.PREFIX,
- adapter=best_adapter,
- read=read
- )
+ assert best_adapter is not None
+ return self._make_match(best_adapter, best_length, best_m, best_e, sequence)
def enable_debug(self):
pass
+class MultiPrefixAdapter(MultiAdapter):
+
+ @classmethod
+ def _accept(cls, adapter):
+ if not isinstance(adapter, PrefixAdapter):
+ raise ValueError("Only 5' anchored adapters are allowed")
+ return super()._accept(adapter)
+
+ def _make_match(self, adapter, length, matches, errors, sequence):
+ return RemoveBeforeMatch(
+ astart=0,
+ astop=len(adapter.sequence),
+ rstart=0,
+ rstop=length,
+ matches=matches,
+ errors=errors,
+ adapter=adapter,
+ sequence=sequence,
+ )
+
+ def _get_make_affix(self):
+ return self._make_prefix
+
+ @staticmethod
+ def _make_prefix(s, n):
+ return s[:n]
+
+
+class MultiSuffixAdapter(MultiAdapter):
+
+ @classmethod
+ def _accept(cls, adapter):
+ if not isinstance(adapter, SuffixAdapter):
+ raise ValueError("Only anchored 3' adapters are allowed")
+ return super()._accept(adapter)
+
+ def _make_match(self, adapter, length, matches, errors, sequence):
+ return RemoveAfterMatch(
+ astart=0,
+ astop=len(adapter.sequence),
+ rstart=len(sequence) - length,
+ rstop=len(sequence),
+ matches=matches,
+ errors=errors,
+ adapter=adapter,
+ sequence=sequence,
+ )
+
+ def _get_make_affix(self):
+ return self._make_suffix
+
+ @staticmethod
+ def _make_suffix(s, n):
+ return s[-n:]
+
+
def warn_duplicate_adapters(adapters):
d = dict()
for adapter in adapters:
- key = (adapter.sequence, adapter.where, adapter.remove)
+ key = (adapter.__class__, adapter.sequence)
if key in d:
logger.warning("Adapter %r (%s) was specified multiple times! "
"Please make sure that this is what you want.",
- adapter.sequence, ADAPTER_TYPE_NAMES[adapter.where])
+ adapter.sequence, adapter.description)
d[key] = adapter.name
=====================================
src/cutadapt/filters.py
=====================================
@@ -13,9 +13,9 @@ filters is created and each redirector is called in turn until one returns True.
The read is then assumed to have been "consumed", that is, either written
somewhere or filtered (should be discarded).
"""
-from collections import Counter
+from collections import defaultdict, Counter
from abc import ABC, abstractmethod
-from typing import List, Tuple, Optional, Dict, Any
+from typing import Tuple, Optional, Dict, Any, DefaultDict
from .qualtrim import expected_errors
from .utils import FileOpener
@@ -33,8 +33,6 @@ class SingleEndFilter(ABC):
def __call__(self, read, info: ModificationInfo):
"""
Called to process a single-end read
-
- Any adapter matches are append to the matches list.
"""
@@ -43,25 +41,31 @@ class PairedEndFilter(ABC):
def __call__(self, read1, read2, info1: ModificationInfo, info2: ModificationInfo):
"""
Called to process the read pair (read1, read2)
-
- Any adapter matches are append to the info.matches list.
"""
class WithStatistics(ABC):
def __init__(self) -> None:
- self._written = 0 # no. of written reads or read pairs
- self._written_bp = [0, 0]
- self._written_lengths = [Counter(), Counter()] # type: List[Counter]
+ # A defaultdict is much faster than a Counter
+ self._written_lengths1 = defaultdict(int) # type: DefaultDict[int, int]
+ self._written_lengths2 = defaultdict(int) # type: DefaultDict[int, int]
def written_reads(self) -> int:
- return self._written
+ """Return number of written reads or read pairs"""
+ return sum(self._written_lengths1.values())
- def written_bp(self) -> Tuple[int, ...]:
- return tuple(self._written_bp)
+ def written_bp(self) -> Tuple[int, int]:
+ return (
+ self._compute_total_bp(self._written_lengths1),
+ self._compute_total_bp(self._written_lengths2),
+ )
def written_lengths(self) -> Tuple[Counter, Counter]:
- return (self._written_lengths[0].copy(), self._written_lengths[1].copy())
+ return (Counter(self._written_lengths1), Counter(self._written_lengths2))
+
+ @staticmethod
+ def _compute_total_bp(counts: DefaultDict[int, int]) -> int:
+ return sum(length * count for length, count in counts.items())
class SingleEndFilterWithStatistics(SingleEndFilter, WithStatistics, ABC):
@@ -69,9 +73,7 @@ class SingleEndFilterWithStatistics(SingleEndFilter, WithStatistics, ABC):
super().__init__()
def update_statistics(self, read) -> None:
- self._written += 1
- self._written_bp[0] += len(read)
- self._written_lengths[0][len(read)] += 1
+ self._written_lengths1[len(read)] += 1
class PairedEndFilterWithStatistics(PairedEndFilter, WithStatistics, ABC):
@@ -79,11 +81,8 @@ class PairedEndFilterWithStatistics(PairedEndFilter, WithStatistics, ABC):
super().__init__()
def update_statistics(self, read1, read2):
- self._written += 1
- self._written_bp[0] += len(read1)
- self._written_bp[1] += len(read2)
- self._written_lengths[0][len(read1)] += 1
- self._written_lengths[1][len(read2)] += 1
+ self._written_lengths1[len(read1)] += 1
+ self._written_lengths2[len(read2)] += 1
class NoFilter(SingleEndFilterWithStatistics):
@@ -111,7 +110,6 @@ class PairedNoFilter(PairedEndFilterWithStatistics):
def __call__(self, read1, read2, info1: ModificationInfo, info2: ModificationInfo):
self.writer.write(read1, read2)
self.update_statistics(read1, read2)
-
return DISCARD
@@ -344,10 +342,10 @@ class PairedDemultiplexer(PairedEndFilterWithStatistics):
qualities, file_opener)
def written(self) -> int:
- return self._demultiplexer1._written + self._demultiplexer2._written
+ return self._demultiplexer1.written_reads()
def written_bp(self) -> Tuple[int, int]:
- return (self._demultiplexer1._written_bp[0], self._demultiplexer2._written_bp[0])
+ return (self._demultiplexer1.written_bp()[0], self._demultiplexer2.written_bp()[0])
def __call__(self, read1, read2, info1: ModificationInfo, info2: ModificationInfo):
assert read2 is not None
@@ -461,11 +459,13 @@ class InfoFileWriter(SingleEndFilter):
self.file = file
def __call__(self, read, info: ModificationInfo):
+ current_read = info.original_read
if info.matches:
for match in info.matches:
- for info_record in match.get_info_records():
+ for info_record in match.get_info_records(current_read):
# info_record[0] is the read name suffix
print(read.name + info_record[0], *info_record[1:], sep='\t', file=self.file)
+ current_read = match.trimmed(current_read)
else:
seq = read.sequence
qualities = read.qualities if read.qualities is not None else ''
=====================================
src/cutadapt/modifiers.py
=====================================
@@ -9,7 +9,7 @@ from abc import ABC, abstractmethod
from collections import OrderedDict
from .qualtrim import quality_trim_index, nextseq_trim_index
-from .adapters import Where, MultiAdapter, Match, remainder
+from .adapters import MultiPrefixAdapter, MultiSuffixAdapter, Match, remainder
from .utils import reverse_complemented_sequence
@@ -19,10 +19,11 @@ class ModificationInfo:
Any information (except the read itself) that needs to be passed from one modifier
to one later in the pipeline or from one modifier to the filters is recorded here.
"""
- __slots__ = ["matches"]
+ __slots__ = ["matches", "original_read"]
- def __init__(self):
+ def __init__(self, read):
self.matches = [] # type: List[Match]
+ self.original_read = read
class SingleEndModifier(ABC):
@@ -85,11 +86,14 @@ class AdapterCutter(SingleEndModifier):
# the adapters when we don’t need to
if len(prefix) > 1 or len(suffix) > 1:
adapters = other
- for affix in (prefix, suffix):
- if len(affix) > 1:
- adapters.append(MultiAdapter(affix))
- else:
- adapters.extend(affix)
+ if len(prefix) > 1:
+ adapters.append(MultiPrefixAdapter(prefix))
+ else:
+ adapters.extend(prefix)
+ if len(suffix) > 1:
+ adapters.append(MultiSuffixAdapter(suffix))
+ else:
+ adapters.extend(suffix)
self.adapters = adapters
def __repr__(self):
@@ -107,12 +111,10 @@ class AdapterCutter(SingleEndModifier):
"""
prefix, suffix, other = [], [], []
for a in adapters:
- if MultiAdapter.is_acceptable(a):
- if a.where == Where.PREFIX:
- lst = prefix
- else:
- assert a.where == Where.SUFFIX
- lst = suffix
+ if MultiPrefixAdapter.is_acceptable(a):
+ lst = prefix
+ elif MultiSuffixAdapter.is_acceptable(a):
+ lst = suffix
else:
lst = other
lst.append(a)
@@ -127,7 +129,7 @@ class AdapterCutter(SingleEndModifier):
"""
best_match = None
for adapter in adapters:
- match = adapter.match_to(read)
+ match = adapter.match_to(read.sequence)
if match is None:
continue
@@ -139,28 +141,25 @@ class AdapterCutter(SingleEndModifier):
return best_match
@staticmethod
- def masked_read(read, trimmed_read, matches: Sequence[Match]):
+ def masked_read(read, matches: Sequence[Match]):
start, stop = remainder(matches)
- # TODO modification in place
- trimmed_read.sequence = (
+ result = read[:]
+ result.sequence = (
'N' * start
+ read.sequence[start:stop]
+ 'N' * (len(read) - stop))
- trimmed_read.qualities = read.qualities
- return trimmed_read
+ return result
@staticmethod
- def lowercased_read(read, trimmed_read, matches: Sequence[Match]):
+ def lowercased_read(read, matches: Sequence[Match]):
start, stop = remainder(matches)
- read_sequence = read.sequence
- # TODO modification in place
- trimmed_read.sequence = (
- read_sequence[:start].lower()
- + read_sequence[start:stop].upper()
- + read_sequence[stop:].lower()
+ result = read[:]
+ result.sequence = (
+ read.sequence[:start].lower()
+ + read.sequence[start:stop].upper()
+ + read.sequence[stop:].lower()
)
- trimmed_read.qualities = read.qualities
- return trimmed_read
+ return result
def __call__(self, read, info: ModificationInfo):
trimmed_read, matches = self.match_and_trim(read)
@@ -193,7 +192,7 @@ class AdapterCutter(SingleEndModifier):
# if nothing found, attempt no further rounds
break
matches.append(match)
- trimmed_read = match.trimmed()
+ trimmed_read = match.trimmed(trimmed_read)
if not matches:
return trimmed_read, []
@@ -202,9 +201,9 @@ class AdapterCutter(SingleEndModifier):
# read is already trimmed, nothing to do
pass
elif self.action == 'mask':
- trimmed_read = self.masked_read(read, trimmed_read, matches)
+ trimmed_read = self.masked_read(read, matches)
elif self.action == 'lowercase':
- trimmed_read = self.lowercased_read(read, trimmed_read, matches)
+ trimmed_read = self.lowercased_read(read, matches)
assert len(trimmed_read.sequence) == len(read)
elif self.action is None: # --no-trim
trimmed_read = read[:]
@@ -269,7 +268,7 @@ class PairedAdapterCutter(PairedModifier):
Both lists must have the same, non-zero length.
read pair is trimmed if adapters1[i] is found in R1 and adapters2[i] in R2.
- action -- What to do with a found adapter: None, 'trim', or 'mask'
+ action -- What to do with a found adapter: None, 'trim', 'lowercase' or 'mask'
"""
super().__init__()
if len(adapters1) != len(adapters2):
@@ -299,7 +298,7 @@ class PairedAdapterCutter(PairedModifier):
return read1, read2
adapter1 = match1.adapter
adapter2 = self._adapters2[self._adapter_indices[adapter1]]
- match2 = adapter2.match_to(read2)
+ match2 = adapter2.match_to(read2.sequence)
if match2 is None:
return read1, read2
@@ -310,16 +309,16 @@ class PairedAdapterCutter(PairedModifier):
if self.action == 'lowercase':
trimmed_read.sequence = trimmed_read.sequence.upper()
- trimmed_read = match.trimmed()
+ trimmed_read = match.trimmed(trimmed_read)
match.update_statistics(self.adapter_statistics[i][match.adapter])
if self.action == 'trim':
# read is already trimmed, nothing to do
pass
elif self.action == 'mask':
- trimmed_read = AdapterCutter.masked_read(trimmed_read, [match])
+ trimmed_read = AdapterCutter.masked_read(read, [match])
elif self.action == 'lowercase':
- trimmed_read = AdapterCutter.lowercased_read(trimmed_read, [match])
+ trimmed_read = AdapterCutter.lowercased_read(read, [match])
assert len(trimmed_read.sequence) == len(read)
elif self.action is None: # --no-trim
trimmed_read = read[:]
=====================================
src/cutadapt/parser.py
=====================================
@@ -6,7 +6,10 @@ import logging
from typing import Type, Optional, List, Tuple, Iterator, Any, Dict
from xopen import xopen
from dnaio.readers import FastaReader
-from .adapters import Where, WHERE_TO_REMOVE_MAP, Adapter, SingleAdapter, BackOrFrontAdapter, LinkedAdapter
+from .adapters import (
+ Adapter, FrontAdapter, NonInternalFrontAdapter, BackAdapter, NonInternalBackAdapter,
+ AnywhereAdapter, PrefixAdapter, SuffixAdapter, LinkedAdapter
+)
logger = logging.getLogger(__name__)
@@ -189,7 +192,7 @@ class AdapterSpecification:
name, spec = cls._extract_name(spec)
spec = spec.strip()
parameters = cls._parse_parameters(parameters_spec)
- spec = AdapterSpecification.expand_braces(spec)
+ spec = cls.expand_braces(spec)
# Special case for adapters consisting of only X characters:
# This needs to be supported for backwards-compatibilitity
@@ -240,36 +243,36 @@ class AdapterSpecification:
return name, restriction, spec, parameters
@staticmethod
- def _restriction_to_where(cmdline_type, restriction):
+ def _restriction_to_class(cmdline_type, restriction):
if cmdline_type == 'front':
if restriction is None:
- return Where.FRONT
+ return FrontAdapter
elif restriction == 'anchored':
- return Where.PREFIX
+ return PrefixAdapter
elif restriction == 'noninternal':
- return Where.FRONT_NOT_INTERNAL
+ return NonInternalFrontAdapter
else:
raise ValueError(
'Value {} for a front restriction not allowed'.format(restriction))
elif cmdline_type == 'back':
if restriction is None:
- return Where.BACK
+ return BackAdapter
elif restriction == 'anchored':
- return Where.SUFFIX
+ return SuffixAdapter
elif restriction == 'noninternal':
- return Where.BACK_NOT_INTERNAL
+ return NonInternalBackAdapter
else:
raise ValueError(
'Value {} for a back restriction not allowed'.format(restriction))
else:
assert cmdline_type == 'anywhere'
if restriction is None:
- return Where.ANYWHERE
+ return AnywhereAdapter
else:
raise ValueError('No placement may be specified for "anywhere" adapters')
- def where(self):
- return self._restriction_to_where(self.cmdline_type, self.restriction)
+ def adapter_class(self):
+ return self._restriction_to_class(self.cmdline_type, self.restriction)
class AdapterParser:
@@ -327,19 +330,15 @@ class AdapterParser:
def _parse_not_linked(self, spec: str, name: Optional[str], cmdline_type: str) -> Adapter:
aspec = AdapterSpecification.parse(spec, cmdline_type)
- where = aspec.where()
+ adapter_class = aspec.adapter_class() # type: Type[Adapter]
if not name:
name = aspec.name
- if aspec.parameters.pop('anywhere', False):
- aspec.parameters['remove'] = WHERE_TO_REMOVE_MAP[where]
- where = Where.ANYWHERE
+ if aspec.parameters.pop('anywhere', False) and adapter_class in (FrontAdapter, BackAdapter):
+ aspec.parameters['force_anywhere'] = True
parameters = self.default_parameters.copy()
parameters.update(aspec.parameters)
- if where in (Where.FRONT, Where.BACK):
- adapter_class = BackOrFrontAdapter # type: Type[Adapter]
- else:
- adapter_class = SingleAdapter
- return adapter_class(sequence=aspec.sequence, where=where, name=name, **parameters)
+
+ return adapter_class(sequence=aspec.sequence, name=name, **parameters)
def _parse_linked(self, spec1: str, spec2: str, name: Optional[str], cmdline_type: str) -> LinkedAdapter:
"""Return a linked adapter from two specification strings"""
@@ -382,9 +381,9 @@ class AdapterParser:
front_required = front_parameters.pop('required', front_required)
back_required = back_parameters.pop('required', back_required)
- front_adapter = SingleAdapter(front_spec.sequence, where=front_spec.where(), name=None,
+ front_adapter = front_spec.adapter_class()(front_spec.sequence, name=None,
**front_parameters)
- back_adapter = SingleAdapter(back_spec.sequence, where=back_spec.where(), name=None,
+ back_adapter = back_spec.adapter_class()(back_spec.sequence, name=None,
**back_parameters)
return LinkedAdapter(
=====================================
src/cutadapt/pipeline.py
=====================================
@@ -33,6 +33,10 @@ class InputFiles:
self.file2 = file2
self.interleaved = interleaved
+ def open(self):
+ return dnaio.open(self.file1, file2=self.file2,
+ interleaved=self.interleaved, mode="r")
+
class OutputFiles:
"""
@@ -40,11 +44,8 @@ class OutputFiles:
untrimmed, untrimmed2, out and out2 are file names or templates
as required by the used demultiplexer ('{name}' etc.).
- If interleaved is True, then out is written interleaved.
-
Files may also be None.
"""
- # TODO interleaving for the other file pairs (too_short, too_long, untrimmed)?
def __init__(
self,
out: Optional[BinaryIO] = None,
@@ -59,7 +60,6 @@ class OutputFiles:
rest: Optional[BinaryIO] = None,
wildcard: Optional[BinaryIO] = None,
demultiplex: bool = False,
- interleaved: bool = False,
force_fasta: Optional[bool] = None,
):
self.out = out
@@ -74,7 +74,6 @@ class OutputFiles:
self.rest = rest
self.wildcard = wildcard
self.demultiplex = demultiplex
- self.interleaved = interleaved
self.force_fasta = force_fasta
def __iter__(self):
@@ -116,27 +115,17 @@ class Pipeline(ABC):
self.file_opener = file_opener
def connect_io(self, infiles: InputFiles, outfiles: OutputFiles):
- self._reader = dnaio.open(infiles.file1, file2=infiles.file2,
- interleaved=infiles.interleaved, mode='r')
+ self._reader = infiles.open()
self._set_output(outfiles)
+ @abstractmethod
def _open_writer(
self,
file: BinaryIO,
file2: Optional[BinaryIO] = None,
force_fasta: Optional[bool] = None,
- **kwargs
):
- # TODO backwards-incompatible change (?) would be to use outfiles.interleaved
- # for all outputs
- if force_fasta:
- kwargs['fileformat'] = 'fasta'
- # file and file2 must already be file-like objects because we don’t want to
- # take care of threads and compression levels here.
- for f in (file, file2):
- assert not isinstance(f, (str, bytes, Path))
- return dnaio.open(file, file2=file2, mode='w', qualities=self.uses_qualities,
- **kwargs)
+ pass
def _set_output(self, outfiles: OutputFiles):
self._filters = []
@@ -272,15 +261,14 @@ class SingleEndPipeline(Pipeline):
def process_reads(self, progress: Progress = None):
"""Run the pipeline. Return statistics"""
- n = 0 # no. of processed reads # TODO turn into attribute
+ n = 0 # no. of processed reads
total_bp = 0
- assert self._reader is not None
for read in self._reader:
n += 1
if n % 10000 == 0 and progress:
progress.update(n)
total_bp += len(read)
- info = ModificationInfo()
+ info = ModificationInfo(read)
for modifier in self._modifiers:
read = modifier(read, info)
for filter_ in self._filters:
@@ -288,6 +276,17 @@ class SingleEndPipeline(Pipeline):
break
return (n, total_bp, None)
+ def _open_writer(
+ self,
+ file: BinaryIO,
+ file2: Optional[BinaryIO] = None,
+ force_fasta: Optional[bool] = None,
+ ):
+ assert file2 is None
+ assert not isinstance(file, (str, bytes, Path))
+ return dnaio.open(
+ file, mode="w", qualities=self.uses_qualities, fileformat="fasta" if force_fasta else None)
+
def _filter_wrapper(self):
return Redirector
@@ -367,8 +366,8 @@ class PairedEndPipeline(Pipeline):
progress.update(n)
total1_bp += len(read1)
total2_bp += len(read2)
- info1 = ModificationInfo()
- info2 = ModificationInfo()
+ info1 = ModificationInfo(read1)
+ info2 = ModificationInfo(read2)
for modifier in self._modifiers:
read1, read2 = modifier(read1, read2, info1, info2)
for filter_ in self._filters:
@@ -377,6 +376,25 @@ class PairedEndPipeline(Pipeline):
break
return (n, total1_bp, total2_bp)
+ def _open_writer(
+ self,
+ file: BinaryIO,
+ file2: Optional[BinaryIO] = None,
+ force_fasta: Optional[bool] = None,
+ ):
+ # file and file2 must already be file-like objects because we don’t want to
+ # take care of threads and compression levels here.
+ for f in (file, file2):
+ assert not isinstance(f, (str, bytes, Path))
+ return dnaio.open(
+ file,
+ file2=file2,
+ mode="w",
+ qualities=self.uses_qualities,
+ fileformat="fasta" if force_fasta else None,
+ interleaved=file2 is None,
+ )
+
def _filter_wrapper(self, pair_filter_mode=None):
if pair_filter_mode is None:
pair_filter_mode = self._pair_filter_mode
@@ -393,9 +411,7 @@ class PairedEndPipeline(Pipeline):
return self._filter_wrapper()
def _final_filter(self, outfiles):
- writer = self._open_writer(
- outfiles.out, outfiles.out2, interleaved=outfiles.interleaved,
- force_fasta=outfiles.force_fasta)
+ writer = self._open_writer(outfiles.out, outfiles.out2, force_fasta=outfiles.force_fasta)
return PairedNoFilter(writer)
def _create_demultiplexer(self, outfiles):
=====================================
src/cutadapt/report.py
=====================================
@@ -5,7 +5,10 @@ from io import StringIO
import textwrap
from collections import Counter
from typing import Any, Optional, List
-from .adapters import Where, EndStatistics, AdapterStatistics, ADAPTER_TYPE_NAMES
+from .adapters import (
+ EndStatistics, AdapterStatistics, FrontAdapter, NonInternalFrontAdapter, PrefixAdapter,
+ BackAdapter, NonInternalBackAdapter, SuffixAdapter, AnywhereAdapter, LinkedAdapter,
+)
from .modifiers import (SingleEndModifier, PairedModifier, QualityTrimmer, NextseqQualityTrimmer,
AdapterCutter, PairedAdapterCutter, ReverseComplementer)
from .filters import WithStatistics, TooShortReadFilter, TooLongReadFilter, NContentFilter
@@ -342,12 +345,11 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
total_back = sum(adapter_statistics.back.lengths.values())
total = total_front + total_back
reverse_complemented = adapter_statistics.reverse_complemented
- where = adapter_statistics.where
- where_backs = (Where.BACK, Where.BACK_NOT_INTERNAL, Where.SUFFIX)
- where_fronts = (Where.FRONT, Where.FRONT_NOT_INTERNAL, Where.PREFIX)
- assert (where in (Where.ANYWHERE, Where.LINKED)
- or (where in where_backs and total_front == 0)
- or (where in where_fronts and total_back == 0)), (where, total_front, total_back)
+ adapter = adapter_statistics.adapter
+ if isinstance(adapter, (BackAdapter, NonInternalBackAdapter, SuffixAdapter)):
+ assert total_front == 0
+ if isinstance(adapter, (FrontAdapter, NonInternalFrontAdapter, PrefixAdapter)):
+ assert total_back == 0
if stats.paired:
extra = 'First read: ' if which_in_pair == 0 else 'Second read: '
@@ -357,7 +359,7 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
print_s("=" * 3, extra + "Adapter", adapter_statistics.name, "=" * 3)
print_s()
- if where is Where.LINKED:
+ if isinstance(adapter, LinkedAdapter):
print_s("Sequence: {}...{}; Type: linked; Length: {}+{}; "
"5' trimmed: {} times; 3' trimmed: {} times".format(
adapter_statistics.front.sequence,
@@ -367,7 +369,7 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
total_front, total_back))
else:
print_s("Sequence: {}; Type: {}; Length: {}; Trimmed: {} times".
- format(adapter_statistics.front.sequence, ADAPTER_TYPE_NAMES[adapter_statistics.where],
+ format(adapter_statistics.front.sequence, adapter.description,
len(adapter_statistics.front.sequence), total), end="")
if stats.reverse_complemented is not None:
print_s("; Reverse-complemented: {} times".format(reverse_complemented))
@@ -376,7 +378,7 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
if total == 0:
print_s()
continue
- if where is Where.ANYWHERE:
+ if isinstance(adapter, AnywhereAdapter):
print_s(total_front, "times, it overlapped the 5' end of a read")
print_s(total_back, "times, it overlapped the 3' end or was within the read")
print_s()
@@ -386,7 +388,7 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
print_s()
print_s("Overview of removed sequences (3' or within)")
print_s(histogram(adapter_statistics.back, stats.n, gc_content))
- elif where is Where.LINKED:
+ elif isinstance(adapter, LinkedAdapter):
print_s()
print_s(error_ranges(adapter_statistics.front))
print_s(error_ranges(adapter_statistics.back))
@@ -395,13 +397,13 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
print_s()
print_s("Overview of removed sequences at 3' end")
print_s(histogram(adapter_statistics.back, stats.n, gc_content))
- elif where in (Where.FRONT, Where.PREFIX, Where.FRONT_NOT_INTERNAL):
+ elif isinstance(adapter, (FrontAdapter, NonInternalFrontAdapter, PrefixAdapter)):
print_s()
print_s(error_ranges(adapter_statistics.front))
print_s("Overview of removed sequences")
print_s(histogram(adapter_statistics.front, stats.n, gc_content))
else:
- assert where in (Where.BACK, Where.SUFFIX, Where.BACK_NOT_INTERNAL)
+ assert isinstance(adapter, (BackAdapter, NonInternalBackAdapter, SuffixAdapter))
print_s()
print_s(error_ranges(adapter_statistics.back))
base_stats = AdjacentBaseStatistics(adapter_statistics.back.adjacent_bases)
@@ -418,7 +420,7 @@ def full_report(stats: Statistics, time: float, gc_content: float) -> str: # no
return sio.getvalue().rstrip()
-def minimal_report(stats, _time, _gc_content) -> str:
+def minimal_report(stats: Statistics, _time, _gc_content) -> str:
"""Create a minimal tabular report suitable for concatenation"""
def none(value):
@@ -446,7 +448,7 @@ def minimal_report(stats, _time, _gc_content) -> str:
warning = False
for which_in_pair in (0, 1):
for adapter_statistics in stats.adapter_stats[which_in_pair]:
- if adapter_statistics.where in (Where.BACK, Where.SUFFIX, Where.BACK_NOT_INTERNAL):
+ if isinstance(adapter_statistics.adapter, (BackAdapter, NonInternalBackAdapter, SuffixAdapter)):
if AdjacentBaseStatistics(adapter_statistics.back.adjacent_bases).should_warn:
warning = True
break
=====================================
src/cutadapt/utils.py
=====================================
@@ -3,10 +3,20 @@ import sys
import time
import errno
import multiprocessing
+import logging
from xopen import xopen
import dnaio
+try:
+ import resource
+except ImportError:
+ # Windows
+ resource = None # type: ignore
+
+
+logger = logging.getLogger(__name__)
+
def available_cpu_count():
"""
@@ -25,20 +35,17 @@ def available_cpu_count():
res = bin(int(m.group(1).replace(",", ""), 16)).count("1")
if res > 0:
return min(res, multiprocessing.cpu_count())
- except IOError:
+ except OSError:
pass
return multiprocessing.cpu_count()
def raise_open_files_limit(n):
- try:
- import resource
- except ImportError:
- # Windows
+ if resource is None:
return
soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
- soft += n
+ soft = min(soft + n, hard)
resource.setrlimit(resource.RLIMIT_NOFILE, (soft, hard))
@@ -152,6 +159,7 @@ class FileOpener:
self.threads = threads
def xopen(self, path, mode):
+ logger.debug("Opening file '%s', mode '%s' with xopen", path, mode)
return xopen(path, mode, compresslevel=self.compression_level, threads=self.threads)
def xopen_or_none(self, path, mode):
@@ -161,17 +169,15 @@ class FileOpener:
return self.xopen(path, mode)
def xopen_pair(self, path1, path2, mode):
- file1 = file2 = None
- if path1 is not None:
- file1 = self.xopen(path1, mode)
- if path2 is not None:
- file2 = self.xopen(path2, mode)
- elif path2 is not None:
+ if path1 is None and path2 is not None:
raise ValueError("When giving paths for paired-end files, only providing the second"
" file is not supported")
+ file1 = self.xopen_or_none(path1, mode)
+ file2 = self.xopen_or_none(path2, mode)
return file1, file2
def dnaio_open(self, *args, **kwargs):
+ logger.debug("Opening file '%s', mode '%s' with dnaio", args[0], kwargs['mode'])
kwargs["opener"] = self.xopen
return dnaio.open(*args, **kwargs)
=====================================
tests/test_adapters.py
=====================================
@@ -1,21 +1,32 @@
import pytest
from dnaio import Sequence
-from cutadapt.adapters import SingleAdapter, SingleMatch, Where, LinkedAdapter, WhereToRemove
+from cutadapt.adapters import (
+ RemoveAfterMatch, FrontAdapter, BackAdapter, PrefixAdapter, SuffixAdapter, LinkedAdapter,
+ MultiPrefixAdapter, MultiSuffixAdapter,
+ )
+
+
+def test_front_adapter_partial_occurrence_in_back():
+ adapter = FrontAdapter("CTGAATT", max_error_rate=0, min_overlap=4)
+ assert adapter.match_to("GGGGGCTGAA") is None
+
+
+def test_back_adapter_partial_occurrence_in_front():
+ adapter = BackAdapter("CTGAATT", max_error_rate=0, min_overlap=4)
+ assert adapter.match_to("AATTGGGGGGG") is None
def test_issue_52():
- adapter = SingleAdapter(
+ adapter = BackAdapter(
sequence='GAACTCCAGTCACNNNNN',
- where=Where.BACK,
- remove=WhereToRemove.SUFFIX,
max_error_rate=0.12,
min_overlap=5,
read_wildcards=False,
adapter_wildcards=True)
- read = Sequence(name="abc", sequence='CCCCAGAACTACAGTCCCGGC')
- am = SingleMatch(astart=0, astop=17, rstart=5, rstop=21, matches=15, errors=2,
- remove_before=False, adapter=adapter, read=read)
+ sequence = "CCCCAGAACTACAGTCCCGGC"
+ am = RemoveAfterMatch(astart=0, astop=17, rstart=5, rstop=21, matches=15, errors=2,
+ adapter=adapter, sequence=sequence)
assert am.wildcards() == 'GGC'
"""
The result above should actually be 'CGGC' since the correct
@@ -42,55 +53,51 @@ def test_issue_80():
# This is correct, albeit a little surprising, since an alignment without
# indels would have only two errors.
- adapter = SingleAdapter(
+ adapter = BackAdapter(
sequence="TCGTATGCCGTCTTC",
- where=Where.BACK,
- remove=WhereToRemove.SUFFIX,
max_error_rate=0.2,
min_overlap=3,
read_wildcards=False,
adapter_wildcards=False)
- read = Sequence(name="seq2", sequence="TCGTATGCCCTCC")
- result = adapter.match_to(read)
+ result = adapter.match_to("TCGTATGCCCTCC")
assert result.errors == 3, result
assert result.astart == 0, result
assert result.astop == 15, result
def test_str():
- a = SingleAdapter('ACGT', where=Where.BACK, remove=WhereToRemove.SUFFIX, max_error_rate=0.1)
+ a = BackAdapter('ACGT', max_error_rate=0.1)
str(a)
- str(a.match_to(Sequence(name='seq', sequence='TTACGT')))
+ str(a.match_to("TTACGT"))
def test_linked_adapter():
- front_adapter = SingleAdapter('AAAA', where=Where.PREFIX, min_overlap=4)
- back_adapter = SingleAdapter('TTTT', where=Where.BACK, min_overlap=3)
+ front_adapter = PrefixAdapter('AAAA', min_overlap=4)
+ back_adapter = BackAdapter('TTTT', min_overlap=3)
linked_adapter = LinkedAdapter(
front_adapter, back_adapter, front_required=True, back_required=False, name='name')
assert linked_adapter.front_adapter.min_overlap == 4
assert linked_adapter.back_adapter.min_overlap == 3
- sequence = Sequence(name='seq', sequence='AAAACCCCCTTTT')
- trimmed = linked_adapter.match_to(sequence).trimmed()
+ read = Sequence(name='seq', sequence='AAAACCCCCTTTT')
+ trimmed = linked_adapter.match_to(read.sequence).trimmed(read)
assert trimmed.name == 'seq'
assert trimmed.sequence == 'CCCCC'
def test_info_record():
- adapter = SingleAdapter(
+ adapter = BackAdapter(
sequence='GAACTCCAGTCACNNNNN',
- where=Where.BACK,
max_error_rate=0.12,
min_overlap=5,
read_wildcards=False,
adapter_wildcards=True,
name="Foo")
read = Sequence(name="abc", sequence='CCCCAGAACTACAGTCCCGGC')
- am = SingleMatch(astart=0, astop=17, rstart=5, rstop=21, matches=15, errors=2, remove_before=False,
- adapter=adapter, read=read)
- assert am.get_info_records() == [[
+ am = RemoveAfterMatch(astart=0, astop=17, rstart=5, rstop=21, matches=15, errors=2,
+ adapter=adapter, sequence=read.sequence)
+ assert am.get_info_records(read) == [[
"",
2,
5,
@@ -106,22 +113,22 @@ def test_info_record():
def test_random_match_probabilities():
- a = SingleAdapter('A', where=Where.BACK, max_error_rate=0.1).create_statistics()
+ a = BackAdapter('A', max_error_rate=0.1).create_statistics()
assert a.back.random_match_probabilities(0.5) == [1, 0.25]
assert a.back.random_match_probabilities(0.2) == [1, 0.4]
for s in ('ACTG', 'XMWH'):
- a = SingleAdapter(s, where=Where.BACK, max_error_rate=0.1).create_statistics()
+ a = BackAdapter(s, max_error_rate=0.1).create_statistics()
assert a.back.random_match_probabilities(0.5) == [1, 0.25, 0.25**2, 0.25**3, 0.25**4]
assert a.back.random_match_probabilities(0.2) == [1, 0.4, 0.4*0.1, 0.4*0.1*0.4, 0.4*0.1*0.4*0.1]
- a = SingleAdapter('GTCA', where=Where.FRONT, max_error_rate=0.1).create_statistics()
+ a = FrontAdapter('GTCA', max_error_rate=0.1).create_statistics()
assert a.front.random_match_probabilities(0.5) == [1, 0.25, 0.25**2, 0.25**3, 0.25**4]
assert a.front.random_match_probabilities(0.2) == [1, 0.4, 0.4*0.1, 0.4*0.1*0.4, 0.4*0.1*0.4*0.1]
def test_add_adapter_statistics():
- stats = SingleAdapter('A', name='name', where=Where.BACK, max_error_rate=0.1).create_statistics()
+ stats = BackAdapter('A', name='name', max_error_rate=0.1).create_statistics()
end_stats = stats.back
end_stats.adjacent_bases['A'] = 7
end_stats.adjacent_bases['C'] = 19
@@ -136,7 +143,7 @@ def test_add_adapter_statistics():
end_stats.errors[20][1] = 66
end_stats.errors[20][2] = 6
- stats2 = SingleAdapter('A', name='name', where=Where.BACK, max_error_rate=0.1).create_statistics()
+ stats2 = BackAdapter('A', name='name', max_error_rate=0.1).create_statistics()
end_stats2 = stats2.back
end_stats2.adjacent_bases['A'] = 43
end_stats2.adjacent_bases['C'] = 31
@@ -161,18 +168,73 @@ def test_add_adapter_statistics():
}
-def test_issue_265():
- """Crash when accessing the matches property of non-anchored linked adapters"""
- s = Sequence('name', 'AAAATTTT')
- front_adapter = SingleAdapter('GGG', where=Where.FRONT)
- back_adapter = SingleAdapter('TTT', where=Where.BACK)
+def test_linked_matches_property():
+ """Accessing matches property of non-anchored linked adapters"""
+ # Issue #265
+ front_adapter = FrontAdapter("GGG")
+ back_adapter = BackAdapter("TTT")
la = LinkedAdapter(front_adapter, back_adapter, front_required=False, back_required=False, name='name')
- assert la.match_to(s).matches == 3
+ assert la.match_to("AAAATTTT").matches == 3
- at pytest.mark.parametrize("where", [Where.PREFIX, Where.SUFFIX])
-def test_no_indels_empty_read(where):
+ at pytest.mark.parametrize("adapter_class", [PrefixAdapter, SuffixAdapter])
+def test_no_indels_empty_read(adapter_class):
# Issue #376
- adapter = SingleAdapter('ACGT', where=where, indels=False)
- empty = Sequence('name', '')
- adapter.match_to(empty)
+ adapter = adapter_class("ACGT", indels=False)
+ adapter.match_to("")
+
+
+def test_prefix_match_with_n_wildcard_in_read():
+ adapter = PrefixAdapter("NNNACGT", indels=False)
+ match = adapter.match_to("TTTACGTAAAA")
+ assert match is not None and (0, 7) == (match.rstart, match.rstop)
+ match = adapter.match_to("NTTACGTAAAA")
+ assert match is not None and (0, 7) == (match.rstart, match.rstop)
+
+
+def test_suffix_match_with_n_wildcard_in_read():
+ adapter = SuffixAdapter("ACGTNNN", indels=False)
+ match = adapter.match_to("TTTTACGTTTT")
+ assert match is not None and (4, 11) == (match.rstart, match.rstop)
+ match = adapter.match_to("TTTTACGTCNC")
+ assert match is not None and (4, 11) == (match.rstart, match.rstop)
+
+
+def test_multi_prefix_adapter():
+ adapters = [
+ PrefixAdapter("GAAC", indels=False),
+ PrefixAdapter("TGCT", indels=False),
+ ]
+ ma = MultiPrefixAdapter(adapters)
+ match = ma.match_to("GAACTT")
+ assert match.adapter is adapters[0]
+ match = ma.match_to("TGCTAA")
+ assert match.adapter is adapters[1]
+
+
+def test_multi_prefix_adapter_incorrect_type():
+ with pytest.raises(ValueError):
+ MultiPrefixAdapter([
+ PrefixAdapter("GAAC", indels=False),
+ SuffixAdapter("TGCT", indels=False),
+ ])
+
+
+def test_multi_suffix_adapter():
+ adapters = [
+ SuffixAdapter("GAAC", indels=False),
+ SuffixAdapter("TGCT", indels=False),
+ ]
+ ma = MultiSuffixAdapter(adapters)
+ match = ma.match_to("TTGAAC")
+ assert match.adapter is adapters[0]
+ match = ma.match_to("AATGCT")
+ assert match.adapter is adapters[1]
+
+
+def test_multi_suffix_adapter_incorrect_type():
+ with pytest.raises(ValueError):
+ MultiSuffixAdapter([
+ SuffixAdapter("GAAC", indels=False),
+ PrefixAdapter("TGCT", indels=False),
+ ])
=====================================
tests/test_align.py
=====================================
@@ -95,7 +95,7 @@ def test_compare_prefixes():
assert result == (0, 10, 0, 10, 10, 0)
for s in WILDCARD_SEQUENCES:
- for t in WILDCARD_SEQUENCES:
+ for t in WILDCARD_SEQUENCES: # FIXME what is this t doing?
r = s + 'GCCAGGG'
result = compare_prefixes(s, r, )
assert result == (0, 10, 0, 10, 10, 0)
@@ -110,6 +110,23 @@ def test_compare_prefixes():
assert result == (0, 10, 0, 10, 8, 2)
+def test_n_wildcard_in_ref_matches_n_wildcard_in_query_prefix():
+ # With allowed wildcards in the ref, an N wildcard in the ref should never count as an error,
+ # even if matched against an N wildcard in the query while wildcard_query is False
+ # Issue #453
+ match = compare_prefixes("NNACGT", "NTACGTAA", wildcard_ref=True, wildcard_query=False)
+ assert match == (0, 6, 0, 6, 6, 0)
+
+ match = compare_prefixes("NNACGT", "YTACGTAA", wildcard_ref=True, wildcard_query=False)
+ assert match == (0, 6, 0, 6, 6, 0)
+
+
+def test_n_wildcard_in_ref_matches_n_wildcard_in_query_back():
+ aligner = Aligner("NNACGT", max_error_rate=0, wildcard_ref=True, flags=Where.BACK.value)
+ match = aligner.locate("AAANTACGTAAA")
+ assert match == (0, 6, 3, 9, 6, 0)
+
+
def test_compare_suffixes():
assert compare_suffixes('AAXAA', 'TTTTTTTAAAAA') == (0, 5, 7, 12, 4, 1)
assert compare_suffixes('AANAA', 'TTTTTTTAACAA', wildcard_ref=True) == (0, 5, 7, 12, 5, 0)
=====================================
tests/test_commandline.py
=====================================
@@ -736,3 +736,8 @@ def test_max_expected_errors_fasta(tmp_path):
path = tmp_path / "input.fasta"
path.write_text(">read\nACGTACGT\n")
main(["--max-ee=0.001", "-o", "/dev/null", str(path)])
+
+
+def test_warn_if_en_dashes_used():
+ with pytest.raises(SystemExit):
+ main(["–q", "25", "-o", "/dev/null", "in.fastq"])
=====================================
tests/test_modifiers.py
=====================================
@@ -1,6 +1,8 @@
+import pytest
+
from dnaio import Sequence
from cutadapt.modifiers import (UnconditionalCutter, NEndTrimmer, QualityTrimmer,
- Shortener, AdapterCutter)
+ Shortener, AdapterCutter, PairedAdapterCutter, ModificationInfo)
def test_unconditional_cutter():
@@ -19,41 +21,61 @@ def test_nend_trimmer():
for seq, trimmed in zip(seqs, trims):
_seq = Sequence('read1', seq, qualities='#'*len(seq))
_trimmed = Sequence('read1', trimmed, qualities='#'*len(trimmed))
- assert trimmer(_seq, []) == _trimmed
+ assert trimmer(_seq, ModificationInfo(_seq)) == _trimmed
def test_quality_trimmer():
read = Sequence('read1', 'ACGTTTACGTA', '##456789###')
qt = QualityTrimmer(10, 10, 33)
- assert qt(read, []) == Sequence('read1', 'GTTTAC', '456789')
+ assert qt(read, ModificationInfo(read)) == Sequence('read1', 'GTTTAC', '456789')
qt = QualityTrimmer(0, 10, 33)
- assert qt(read, []) == Sequence('read1', 'ACGTTTAC', '##456789')
+ assert qt(read, ModificationInfo(read)) == Sequence('read1', 'ACGTTTAC', '##456789')
qt = QualityTrimmer(10, 0, 33)
- assert qt(read, []) == Sequence('read1', 'GTTTACGTA', '456789###')
+ assert qt(read, ModificationInfo(read)) == Sequence('read1', 'GTTTACGTA', '456789###')
def test_shortener():
read = Sequence('read1', 'ACGTTTACGTA', '##456789###')
shortener = Shortener(0)
- assert shortener(read, []) == Sequence('read1', '', '')
+ assert shortener(read, ModificationInfo(read)) == Sequence('read1', '', '')
shortener = Shortener(1)
- assert shortener(read, []) == Sequence('read1', 'A', '#')
+ assert shortener(read, ModificationInfo(read)) == Sequence('read1', 'A', '#')
shortener = Shortener(5)
- assert shortener(read, []) == Sequence('read1', 'ACGTT', '##456')
+ assert shortener(read, ModificationInfo(read)) == Sequence('read1', 'ACGTT', '##456')
shortener = Shortener(100)
- assert shortener(read, []) == read
+ assert shortener(read, ModificationInfo(read)) == read
def test_adapter_cutter():
- from cutadapt.adapters import SingleAdapter, Where
- a1 = SingleAdapter('GTAGTCCCGC', where=Where.BACK)
- a2 = SingleAdapter('GTAGTCCCCC', where=Where.BACK)
+ from cutadapt.adapters import BackAdapter
+ a1 = BackAdapter("GTAGTCCCGC")
+ a2 = BackAdapter("GTAGTCCCCC")
match = AdapterCutter.best_match([a1, a2], Sequence("name", "ATACCCCTGTAGTCCCC"))
assert match.adapter is a2
+
+
+ at pytest.mark.parametrize("action,expected_trimmed1,expected_trimmed2", [
+ (None, "CCCCGGTTAACCCC", "TTTTAACCGGTTTT"),
+ ("trim", "CCCC", "TTTT"),
+ ("lowercase", "CCCCggttaacccc", "TTTTaaccggtttt"),
+ ("mask", "CCCCNNNNNNNNNN", "TTTTNNNNNNNNNN")
+])
+def test_paired_adapter_cutter_actions(action, expected_trimmed1, expected_trimmed2):
+ from cutadapt.adapters import BackAdapter
+ a1 = BackAdapter("GGTTAA")
+ a2 = BackAdapter("AACCGG")
+ s1 = Sequence("name", "CCCCGGTTAACCCC")
+ s2 = Sequence("name", "TTTTAACCGGTTTT")
+ pac = PairedAdapterCutter([a1], [a2], action=action)
+ info1 = ModificationInfo(s1)
+ info2 = ModificationInfo(s2)
+ trimmed1, trimmed2 = pac(s1, s2, info1, info2)
+ assert expected_trimmed1 == trimmed1.sequence
+ assert expected_trimmed2 == trimmed2.sequence
=====================================
tests/test_paired.py
=====================================
@@ -329,6 +329,21 @@ def test_interleaved_neither_nor(tmpdir):
main(params)
+def test_interleaved_untrimmed_output(tmpdir):
+ o1 = str(tmpdir.join("out.1.fastq"))
+ o2 = str(tmpdir.join("out.2.fastq"))
+ untrimmed = str(tmpdir.join("untrimmed.interleaved.fastq"))
+ main([
+ "--interleaved",
+ "-a", "XXXX",
+ "-o", o1,
+ "-p", o2,
+ "--untrimmed-output", untrimmed,
+ datapath("interleaved.fastq")
+ ])
+ assert_files_equal(datapath("interleaved.fastq"), untrimmed)
+
+
def test_pair_filter_both(run_paired, cores):
run_paired(
"--pair-filter=both -a TTAGACATAT -A GGAGTA -m 14",
=====================================
tests/test_parser.py
=====================================
@@ -2,7 +2,7 @@ from textwrap import dedent
import pytest
from dnaio import Sequence
-from cutadapt.adapters import Where, WhereToRemove, LinkedAdapter, SingleAdapter
+from cutadapt.adapters import LinkedAdapter, BackAdapter, FrontAdapter
from cutadapt.parser import AdapterParser, AdapterSpecification
from cutadapt.modifiers import ModificationInfo
@@ -169,15 +169,31 @@ def test_linked_adapter_name():
assert a.create_statistics().name == "the_name"
-def test_anywhere_parameter():
+def test_anywhere_parameter_back():
parser = AdapterParser(max_error_rate=0.2, min_overlap=4, read_wildcards=False,
adapter_wildcards=False, indels=True)
adapter = list(parser.parse('CTGAAGTGAAGTACACGGTT;anywhere', 'back'))[0]
- assert isinstance(adapter, SingleAdapter)
- assert adapter.remove == WhereToRemove.SUFFIX
- assert adapter.where is Where.ANYWHERE
+ assert isinstance(adapter, BackAdapter)
+ assert adapter._force_anywhere
+
+ # TODO move the rest to a separate test
read = Sequence('foo1', 'TGAAGTACACGGTTAAAAAAAAAA')
from cutadapt.modifiers import AdapterCutter
cutter = AdapterCutter([adapter])
- trimmed_read = cutter(read, ModificationInfo())
+ trimmed_read = cutter(read, ModificationInfo(read))
+ assert trimmed_read.sequence == ''
+
+
+def test_anywhere_parameter_front():
+ parser = AdapterParser(max_error_rate=0.2, min_overlap=4, read_wildcards=False,
+ adapter_wildcards=False, indels=True)
+ adapter = list(parser.parse('CTGAAGTGAAGTACACGGTT;anywhere', 'front'))[0]
+ assert isinstance(adapter, FrontAdapter)
+ assert adapter._force_anywhere
+
+ # TODO move the rest to a separate test
+ read = Sequence('foo1', 'AAAAAAAAAACTGAAGTGAA')
+ from cutadapt.modifiers import AdapterCutter
+ cutter = AdapterCutter([adapter])
+ trimmed_read = cutter(read, ModificationInfo(read))
assert trimmed_read.sequence == ''
=====================================
tests/test_trim.py
=====================================
@@ -1,13 +1,13 @@
from dnaio import Sequence
-from cutadapt.adapters import SingleAdapter, Where
+from cutadapt.adapters import BackAdapter, AnywhereAdapter
from cutadapt.modifiers import AdapterCutter, ModificationInfo
def test_statistics():
read = Sequence('name', 'AAAACCCCAAAA')
- adapters = [SingleAdapter('CCCC', Where.BACK, max_error_rate=0.1)]
+ adapters = [BackAdapter("CCCC", max_error_rate=0.1)]
cutter = AdapterCutter(adapters, times=3)
- cutter(read, ModificationInfo())
+ cutter(read, ModificationInfo(read))
# TODO make this a lot simpler
trimmed_bp = 0
for adapter in adapters:
@@ -25,11 +25,11 @@ def test_end_trim_with_mismatch():
the hit and so the match is considered good. An insertion or substitution
at the same spot is not a match.
"""
- adapter = SingleAdapter('TCGATCGATCGAT', Where.BACK, max_error_rate=0.1)
+ adapter = BackAdapter("TCGATCGATCGAT", max_error_rate=0.1)
read = Sequence('foo1', 'AAAAAAAAAAATCGTCGATC')
cutter = AdapterCutter([adapter], times=1)
- trimmed_read = cutter(read, ModificationInfo())
+ trimmed_read = cutter(read, ModificationInfo(read))
assert trimmed_read.sequence == 'AAAAAAAAAAA'
assert cutter.adapter_statistics[adapter].back.lengths == {9: 1}
@@ -39,14 +39,14 @@ def test_end_trim_with_mismatch():
read = Sequence('foo2', 'AAAAAAAAAAATCGAACGA')
cutter = AdapterCutter([adapter], times=1)
- trimmed_read = cutter(read, ModificationInfo())
+ trimmed_read = cutter(read, ModificationInfo(read))
assert trimmed_read.sequence == read.sequence
assert cutter.adapter_statistics[adapter].back.lengths == {}
def test_anywhere_with_errors():
- adapter = SingleAdapter('CCGCATTTAG', Where.ANYWHERE, max_error_rate=0.1)
+ adapter = AnywhereAdapter("CCGCATTTAG", max_error_rate=0.1)
for seq, expected_trimmed in (
('AACCGGTTccgcatttagGATC', 'AACCGGTT'),
('AACCGGTTccgcgtttagGATC', 'AACCGGTT'), # one mismatch
@@ -57,5 +57,5 @@ def test_anywhere_with_errors():
):
read = Sequence('foo', seq)
cutter = AdapterCutter([adapter], times=1)
- trimmed_read = cutter(read, ModificationInfo())
+ trimmed_read = cutter(read, ModificationInfo(read))
assert trimmed_read.sequence == expected_trimmed
View it on GitLab: https://salsa.debian.org/med-team/python-cutadapt/-/compare/34e57ab55812767fb499cd3738b1ce33f22fac49...28e1f017f62630a283934338b1cad96654212cfb
--
View it on GitLab: https://salsa.debian.org/med-team/python-cutadapt/-/compare/34e57ab55812767fb499cd3738b1ce33f22fac49...28e1f017f62630a283934338b1cad96654212cfb
You're receiving this email because of your account on salsa.debian.org.
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://alioth-lists.debian.net/pipermail/debian-med-commit/attachments/20200427/e3d22aac/attachment-0001.html>
More information about the debian-med-commit
mailing list