[med-svn] [snakemake] 02/04: New upstream version 3.8.2+dfsg
Kevin Murray
daube-guest at moszumanska.debian.org
Sun Nov 13 11:17:03 UTC 2016
This is an automated email from the git hooks/post-receive script.
daube-guest pushed a commit to branch master
in repository snakemake.
commit 1881744243dacc3077c5989fd5958ecc569c2247
Author: Kevin Murray <spam at kdmurray.id.au>
Date: Sun Nov 13 22:04:12 2016 +1100
New upstream version 3.8.2+dfsg
---
.gitignore | 1 +
CHANGELOG.md | 23 ++
bitbucket-pipelines.yml | 9 +
environment.yml | 2 +
setup.py | 15 +-
snakemake/__init__.py | 37 ++-
snakemake/dag.py | 138 ++++++----
snakemake/executors.py | 108 ++++----
snakemake/io.py | 120 ++++++--
snakemake/jobs.py | 108 ++++++--
snakemake/logging.py | 30 +-
snakemake/parser.py | 25 +-
snakemake/remote/FTP.py | 76 +++---
snakemake/remote/SFTP.py | 18 +-
snakemake/remote/__init__.py | 35 ++-
snakemake/rules.py | 304 ++++++++++++++-------
snakemake/scheduler.py | 11 +-
snakemake/script.py | 49 ++++
snakemake/stats.py | 2 +-
snakemake/utils.py | 107 ++++++--
snakemake/version.py | 2 +-
snakemake/workflow.py | 37 ++-
tests/test_deferred_func_eval/Snakefile | 22 ++
.../expected-results/test.out | 1 +
tests/test_deferred_func_eval/test1.in | 0
tests/test_deferred_func_eval/test2.in | 0
tests/test_delete_output/Snakefile | 23 ++
.../expected-results/foo/output.foo.file | 0
.../expected-results/output.file | 0
tests/test_format_params/Snakefile | 5 +
tests/test_format_params/expected-results/test.out | 1 +
tests/test_get_log_both/Snakefile | 6 +
tests/test_get_log_both/expected-results/test.log | 2 +
tests/test_get_log_both/expected-results/test.out | 2 +
tests/test_get_log_both/test.in | 2 +
tests/test_get_log_both/test.out | 2 +
tests/test_get_log_both/wrapper.py | 7 +
tests/test_get_log_complex/Snakefile | 6 +
.../test_get_log_complex/expected-results/test.log | 3 +
.../test_get_log_complex/expected-results/test.out | 2 +
tests/test_get_log_complex/test.in | 2 +
tests/test_get_log_complex/test.out | 2 +
tests/test_get_log_complex/wrapper.py | 11 +
tests/test_get_log_none/Snakefile | 5 +
tests/test_get_log_none/expected-results/test.out | 2 +
tests/test_get_log_none/test.in | 2 +
tests/test_get_log_none/wrapper.py | 7 +
tests/test_get_log_stderr/Snakefile | 6 +
.../test_get_log_stderr/expected-results/test.log | 1 +
.../test_get_log_stderr/expected-results/test.out | 2 +
tests/test_get_log_stderr/test.in | 2 +
tests/test_get_log_stderr/test.out | 2 +
tests/test_get_log_stderr/wrapper.py | 7 +
tests/test_get_log_stdout/Snakefile | 6 +
.../test_get_log_stdout/expected-results/test.log | 1 +
.../test_get_log_stdout/expected-results/test.out | 2 +
tests/test_get_log_stdout/test.in | 2 +
tests/test_get_log_stdout/test.out | 2 +
tests/test_get_log_stdout/wrapper.py | 7 +
tests/test_io.py | 33 +++
tests/test_issue328/Snakefile | 26 ++
tests/test_issue328/expected-results/out.txt | 0
tests/test_issue328/in.txt | 0
tests/test_remote_sftp/Snakefile | 10 +
tests/test_remote_sftp/expected-results/readme.txt | 10 +
tests/test_shadow/Snakefile | 68 ++++-
tests/test_spaces_in_fnames/Snakefile | 21 ++
.../expected-results/test bam file realigned.bam | 0
tests/test_spaces_in_fnames/qsub | 6 +
tests/test_spaces_in_fnames/test bam file.bam | 0
tests/test_static_remote/S3MockedForStaticTest.py | 141 ++++++++++
tests/test_static_remote/Snakefile | 42 +++
tests/test_static_remote/__init__.py | 0
tests/test_static_remote/expected-results/out1.txt | 1 +
tests/test_static_remote/expected-results/out2.txt | 1 +
tests/test_static_remote/out1.txt | 1 +
tests/test_static_remote/out2.txt | 1 +
tests/test_static_remote/test.txt | 1 +
tests/test_subworkflows/Snakefile | 1 +
tests/test_subworkflows/subconfig.yaml | 2 +
tests/test_symlink_time_handling/Snakefile | 68 +++++
.../expected-results/time_diff.txt | 1 +
tests/test_wildcard_keyword/Snakefile | 35 +++
.../expected-results/globalbar.log | 0
.../expected-results/globalbar.txt | 1 +
.../expected-results/localbar.log | 0
.../expected-results/localbar.txt | 1 +
.../expected-results/stringbar.log | 0
.../expected-results/stringbar.txt | 1 +
tests/tests.py | 81 +++++-
90 files changed, 1604 insertions(+), 361 deletions(-)
diff --git a/.gitignore b/.gitignore
index 95876b8..d3d8b8b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,5 +1,6 @@
*~
*.pyc
+*.swp
build/
dist/
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 43dddda..639b25c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,28 @@
# Change Log
+## [3.8.1] - 2016-09-14
+### Changed
+- Snakemake now warns when using relative paths starting with "./".
+- The option -R now also accepts an empty list of arguments.
+- Bug fix when handling benchmark directive.
+- Jobscripts exit with code 1 in case of failure. This should improve the error messages of cluster system.
+- Fixed a bug in SFTP remote provider.
+
+
+## [3.8.0] - 2016-08-26
+### Added
+- Wildcards can now be constrained by rule and globally via the new `wildcard_constraints` directive (see the [docs](https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-wildcards)).
+- Subworkflows now allow to overwrite their config file via the configfile directive in the calling Snakefile.
+- A method `log_fmt_shell` in the snakemake proxy object that is available in scripts and wrappers allows to obtain a formatted string to redirect logging output from STDOUT or STDERR.
+- Functions given to resources can now optionally contain an additional argument `input` that refers to the input files.
+- Functions given to params can now optionally contain additional arguments `input` (see above) and `resources`. The latter refers to the resources.
+- It is now possible to let items in shell commands be automatically quoted (see the [docs](https://bitbucket.org/snakemake/snakemake/wiki/Documentation#markdown-header-rules)). This is usefull when dealing with filenames that contain whitespaces.
+### Changed
+- Snakemake now deletes output files before job exection. Further, it touches output files after job execution. This solves various problems with slow NFS filesystems.
+- A bug was fixed that caused dynamic output rules to be executed multiple times when forcing their execution with -R.
+- A bug causing double uploads with remote files was fixed. Various additional bug fixes related to remote files.
+- Various minor bug fixes.
+
## [3.7.1] - 2016-05-16
### Changed
- Fixed a missing import of the multiprocessing module.
diff --git a/bitbucket-pipelines.yml b/bitbucket-pipelines.yml
new file mode 100644
index 0000000..50041c7
--- /dev/null
+++ b/bitbucket-pipelines.yml
@@ -0,0 +1,9 @@
+image: continuumio/anaconda
+pipelines:
+ default:
+ - step:
+ script:
+ - conda update -y conda conda-env
+ - conda env create --file environment.yml --name snakemake
+ - source activate snakemake
+ - python setup.py nosetests
diff --git a/environment.yml b/environment.yml
index 828922d..0db7017 100644
--- a/environment.yml
+++ b/environment.yml
@@ -13,3 +13,5 @@ dependencies:
- pysftp
- requests
- dropbox
+ - numpy
+ - appdirs
diff --git a/setup.py b/setup.py
index 3536c17..22cda87 100644
--- a/setup.py
+++ b/setup.py
@@ -26,10 +26,17 @@ exec(open("snakemake/version.py").read())
class NoseTestCommand(TestCommand):
+ user_options = [
+ ('test-suite=', 's', "Test to run (e.g. test_shadow)")
+ ]
+
def run_tests(self):
# Run nose ensuring that argv simulates running nosetests directly
+ argv = ['nosetests']
+ if self.test_suite != 'all':
+ argv.append('tests/tests.py:' + self.test_suite)
import nose
- nose.run_exit(argv=['nosetests'])
+ nose.run_exit(argv=argv)
setup(
@@ -53,8 +60,10 @@ setup(
"snakemake-bash-completion = snakemake:bash_completion"]
},
package_data={'': ['*.css', '*.sh', '*.html']},
- tests_require=['rpy2', 'httpretty==0.8.10', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6',
- 'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=5.2'],
+ install_requires=['wrapt',],
+ tests_require=['pytools', 'rpy2', 'httpretty==0.8.10', 'docutils', 'nose>=1.3', 'boto>=2.38.0', 'filechunkio>=1.6',
+ 'moto>=0.4.14', 'ftputil>=3.2', 'pysftp>=0.2.8', 'requests>=2.8.1', 'dropbox>=5.2', 'pyyaml'],
+ test_suite='all',
cmdclass={'test': NoseTestCommand},
classifiers=
["Development Status :: 5 - Production/Stable", "Environment :: Console",
diff --git a/snakemake/__init__.py b/snakemake/__init__.py
index a5f040a..13f7a3c 100644
--- a/snakemake/__init__.py
+++ b/snakemake/__init__.py
@@ -24,7 +24,6 @@ from snakemake.io import load_configfile
from snakemake.shell import shell
from snakemake.utils import update_config, available_cpu_count
-
def snakemake(snakefile,
listrules=False,
list_target_rules=False,
@@ -81,6 +80,7 @@ def snakemake(snakefile,
print_compilation=False,
debug=False,
notemp=False,
+ keep_remote_local=False,
nodeps=False,
keep_target_files=False,
keep_shadow=False,
@@ -94,7 +94,8 @@ def snakemake(snakefile,
log_handler=None,
keep_logger=False,
max_jobs_per_second=None,
- verbose=False):
+ verbose=False,
+ force_use_threads=False):
"""Run snakemake on a given snakefile.
This function provides access to the whole snakemake functionality. It is not thread-safe.
@@ -114,7 +115,7 @@ def snakemake(snakefile,
touch (bool): only touch all output files if present (default False)
forcetargets (bool): force given targets to be re-created (default False)
forceall (bool): force all output files to be re-created (default False)
- forcerun (list): list of files and rules that shall be re-created/re-executed (default [])
+ forcerun (list): list of files and rules that shall be re-created/re-executed (default [])
prioritytargets (list): list of targets that shall be run with maximum priority (default [])
stats (str): path to file that shall contain stats about the workflow execution (default None)
printreason (bool): print the reason for the execution of each job (default false)
@@ -153,6 +154,7 @@ def snakemake(snakefile,
print_compilation (bool): print the compilation of the snakefile (default False)
debug (bool): allow to use the debugger within rules
notemp (bool): ignore temp file flags, e.g. do not delete output files marked as temp after use (default False)
+ keep_remote_local (bool): keep local copies of remote files (default False)
nodeps (bool): ignore dependencies (default False)
keep_target_files (bool): Do not adjust the paths of given target files relative to the working directory.
keep_shadow (bool): Do not delete the shadow directory on snakemake startup.
@@ -165,6 +167,7 @@ def snakemake(snakefile,
verbose(bool): show additional debug output (default False)
log_handler (function): redirect snakemake output to this custom log handler, a function that takes a log message dictionary (see below) as its only argument (default None). The log message dictionary for the log handler has to following entries:
max_jobs_per_second: maximal number of cluster/drmaa jobs per second, None to impose no limit (default None)
+ force_use_threads: whether to force use of threads over processes. helpful if shared memory is full or unavailable (default False)
:level:
the log level ("info", "error", "debug", "progress", "job_info")
@@ -233,6 +236,8 @@ def snakemake(snakefile,
else:
cluster_config = dict()
+ # force thread use for any kind of cluster
+ use_threads = force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
if not keep_logger:
setup_logger(handler=log_handler,
quiet=quiet,
@@ -241,7 +246,8 @@ def snakemake(snakefile,
nocolor=nocolor,
stdout=dryrun,
debug=verbose,
- timestamp=timestamp)
+ timestamp=timestamp,
+ use_threads=use_threads)
if greediness is None:
greediness = 0.5 if prioritytargets else 1.0
@@ -335,6 +341,7 @@ def snakemake(snakefile,
benchmark_repeats=benchmark_repeats,
verbose=verbose,
notemp=notemp,
+ keep_remote_local=keep_remote_local,
nodeps=nodeps,
jobscript=jobscript,
timestamp=timestamp,
@@ -344,7 +351,8 @@ def snakemake(snakefile,
config=config,
config_args=config_args,
keep_logger=True,
- keep_shadow=True)
+ keep_shadow=True,
+ force_use_threads=use_threads)
success = workflow.execute(
targets=targets,
dryrun=dryrun,
@@ -388,6 +396,7 @@ def snakemake(snakefile,
unlock=unlock,
resources=resources,
notemp=notemp,
+ keep_remote_local=keep_remote_local,
nodeps=nodeps,
keep_target_files=keep_target_files,
keep_shadow=keep_shadow,
@@ -396,7 +405,8 @@ def snakemake(snakefile,
updated_files=updated_files,
allowed_rules=allowed_rules,
greediness=greediness,
- no_hooks=no_hooks)
+ no_hooks=no_hooks,
+ force_use_threads=use_threads)
except BrokenPipeError:
# ignore this exception and stop. It occurs if snakemake output is piped into less and less quits before reading the whole output.
@@ -559,6 +569,11 @@ def get_argument_parser():
"acyclic graph of jobs in the dot language. Recommended "
"use on Unix systems: snakemake --dag | dot | display")
parser.add_argument(
+ "--force-use-threads",
+ dest="force_use_threads",
+ action="store_true",
+ help="Force threads rather than processes. Helpful if shared memory (/dev/shm) is full or unavailable.")
+ parser.add_argument(
"--rulegraph",
action="store_true",
help="Do not execute anything and print the dependency graph "
@@ -620,7 +635,7 @@ def get_argument_parser():
"output."))
parser.add_argument(
"--forcerun", "-R",
- nargs="+",
+ nargs="*",
metavar="TARGET",
help=("Force the re-execution or creation of the given rules or files."
" Use this option if you changed a rule and want to have all its "
@@ -813,6 +828,10 @@ def get_argument_parser():
"a part of the workflow, since temp() would lead to deletion of "
"probably needed files by other parts of the workflow.")
parser.add_argument(
+ "--keep-remote",
+ action="store_true",
+ help="Keep local copies of remote input files.")
+ parser.add_argument(
"--keep-target-files",
action="store_true",
help=
@@ -1004,6 +1023,7 @@ def main():
debug=args.debug,
jobscript=args.jobscript,
notemp=args.notemp,
+ keep_remote_local=args.keep_remote,
timestamp=args.timestamp,
greediness=args.greediness,
no_hooks=args.no_hooks,
@@ -1014,7 +1034,8 @@ def main():
keep_target_files=args.keep_target_files,
keep_shadow=args.keep_shadow,
allowed_rules=args.allowed_rules,
- max_jobs_per_second=args.max_jobs_per_second)
+ max_jobs_per_second=args.max_jobs_per_second,
+ force_use_threads=args.force_use_threads)
if args.profile:
with open(args.profile, "w") as out:
diff --git a/snakemake/dag.py b/snakemake/dag.py
index cdc57a0..8b4f1d1 100644
--- a/snakemake/dag.py
+++ b/snakemake/dag.py
@@ -32,7 +32,6 @@ try:
except NameError:
RecursionError = RuntimeError
-
class DAG:
def __init__(self,
workflow,
@@ -52,7 +51,8 @@ class DAG:
ignore_ambiguity=False,
force_incomplete=False,
ignore_incomplete=False,
- notemp=False):
+ notemp=False,
+ keep_remote_local=False):
self.dryrun = dryrun
self.dependencies = defaultdict(partial(defaultdict, set))
@@ -76,6 +76,7 @@ class DAG:
self.prioritytargetjobs = set()
self._ready_jobs = set()
self.notemp = notemp
+ self.keep_remote_local = keep_remote_local
self._jobid = dict()
self.forcerules = set()
@@ -122,6 +123,9 @@ class DAG:
self.update_needrun()
self.set_until_jobs()
self.delete_omitfrom_jobs()
+ # check if remaining jobs are valid
+ for job in self.jobs:
+ job.is_valid()
def update_output_index(self):
self.output_index = OutputIndex(self.rules)
@@ -246,32 +250,38 @@ class DAG:
return True
return False
- def check_output(self, job, wait=3):
+ def check_and_touch_output(self, job, wait=3):
""" Raise exception if output files of job are missing. """
+ expanded_output = [job.shadowed_path(path) for path in job.expanded_output]
try:
- wait_for_files(job.expanded_shadowed_output, latency_wait=wait)
+ wait_for_files(expanded_output, latency_wait=wait)
except IOError as e:
raise MissingOutputException(str(e), rule=job.rule)
- input_maxtime = job.input_maxtime
- if input_maxtime is not None:
- output_mintime = job.output_mintime_local
- if output_mintime is not None and output_mintime < input_maxtime:
- raise RuleException(
- "Output files {} are older than input "
- "files. Did you extract an archive? Make sure that output "
- "files have a more recent modification date than the "
- "archive, e.g. by using 'touch'.".format(", ".join(
- job.expanded_output)),
- rule=job.rule)
+ #It is possible, due to archive expansion or cluster clock skew, that
+ #the files appear older than the input. But we know they must be new,
+ #so touch them to update timestamps.
+ #Note that if the input files somehow have a future date then this will
+ #not currently be spotted and the job will always be re-run.
+ #Also, don't touch directories, as we can't guarantee they were removed.
+ for f in expanded_output:
+ if not os.path.isdir(f):
+ f.touch()
def unshadow_output(self, job):
""" Move files from shadow directory to real output paths. """
if not job.shadow_dir or not job.expanded_output:
return
- cwd = os.getcwd()
- for real_output in job.expanded_output:
- shadow_output = os.path.join(job.shadow_dir, real_output)
+ for real_output in chain(job.expanded_output, job.log):
+ shadow_output = job.shadowed_path(real_output).file
+ # Remake absolute symlinks as relative
+ if os.path.islink(shadow_output):
+ dest = os.readlink(shadow_output)
+ if os.path.isabs(dest):
+ rel_dest = os.path.relpath(dest, job.shadow_dir)
+ os.remove(shadow_output)
+ os.symlink(rel_dest, shadow_output)
+
if os.path.realpath(shadow_output) == os.path.realpath(
real_output):
continue
@@ -305,6 +315,7 @@ class DAG:
""" Touches those output files that are marked for touching. """
for f in job.expanded_output:
if f in job.touch_output:
+ f = job.shadowed_path(f)
logger.info("Touching output file {}.".format(f))
f.touch_or_create()
@@ -335,50 +346,53 @@ class DAG:
logger.info("Removing temporary output file {}.".format(f))
f.remove(remove_non_empty_dir=True)
- def handle_remote(self, job):
+ def handle_remote(self, job, upload=True):
""" Remove local files if they are no longer needed, and upload to S3. """
-
- needed = lambda job_, f: any(
- f in files for j, files in self.depending[job_].items()
- if not self.finished(j) and self.needrun(j) and j != job)
-
- def unneeded_files():
- putative = lambda f: f.is_remote and not f.protected and not f.should_keep_local
- generated_input = set()
- for job_, files in self.dependencies[job].items():
- generated_input |= files
- for f in filter(putative, files):
- if not needed(job_, f):
- yield f
- for f in filter(putative, job.output):
- if not needed(job, f) and not f in self.targetfiles:
- for f_ in job.expand_dynamic(f):
+ if upload:
+ # handle output files
+ for f in job.expanded_output:
+ if f.is_remote:
+ f.upload_to_remote()
+ remote_mtime = f.mtime
+ # immediately force local mtime to match remote,
+ # since conversions from S3 headers are not 100% reliable
+ # without this, newness comparisons may fail down the line
+ f.touch(times=(remote_mtime, remote_mtime))
+
+ if not f.exists_remote:
+ raise RemoteFileException(
+ "The file upload was attempted, but it does not "
+ "exist on remote. Check that your credentials have "
+ "read AND write permissions.")
+
+ if not self.keep_remote_local:
+ # handle input files
+ needed = lambda job_, f: any(
+ f in files for j, files in self.depending[job_].items()
+ if not self.finished(j) and self.needrun(j) and j != job)
+
+ def unneeded_files():
+ putative = lambda f: f.is_remote and not f.protected and not f.should_keep_local
+ generated_input = set()
+ for job_, files in self.dependencies[job].items():
+ generated_input |= files
+ for f in filter(putative, files):
+ if not needed(job_, f):
+ yield f
+ for f in filter(putative, job.output):
+ if not needed(job, f) and not f in self.targetfiles:
+ for f_ in job.expand_dynamic(f):
+ yield f
+ for f in filter(putative, job.input):
+ # TODO what about remote inputs that are used by multiple jobs?
+ if f not in generated_input:
yield f
- for f in filter(putative, job.input):
- # TODO what about remote inputs that are used by multiple jobs?
- if f not in generated_input:
- yield f
-
- for f in job.expanded_output:
- if f.is_remote:
- f.upload_to_remote()
- remote_mtime = f.mtime
- # immediately force local mtime to match remote,
- # since conversions from S3 headers are not 100% reliable
- # without this, newness comparisons may fail down the line
- f.touch(times=(remote_mtime, remote_mtime))
-
- if not f.exists_remote:
- raise RemoteFileException(
- "The file upload was attempted, but it does not "
- "exist on remote. Check that your credentials have "
- "read AND write permissions.")
- for f in unneeded_files():
- logger.info("Removing local output file: {}".format(f))
- f.remove()
+ for f in unneeded_files():
+ logger.info("Removing local output file: {}".format(f))
+ f.remove()
- job.rmdir_empty_remote_dirs()
+ job.rmdir_empty_remote_dirs()
def jobid(self, job):
if job not in self._jobid:
@@ -626,18 +640,26 @@ class DAG:
for job in self.needrun_jobs:
self._temp_input_count[job] = sum(1 for _ in self.temp_input(job))
+ def close_remote_objects(self):
+ for job in self.jobs:
+ if not self.needrun(job):
+ job.close_remote()
+
def postprocess(self):
self.update_needrun()
self.update_priority()
self.update_ready()
self.update_downstream_size()
self.update_temp_input_count()
+ self.close_remote_objects()
def _ready(self, job):
return self._finished.issuperset(filter(self.needrun,
self.dependencies[job]))
def finish(self, job, update_dynamic=True):
+ job.close_remote()
+
self._finished.add(job)
try:
self._ready_jobs.remove(job)
diff --git a/snakemake/executors.py b/snakemake/executors.py
index 310cf50..93d320d 100644
--- a/snakemake/executors.py
+++ b/snakemake/executors.py
@@ -13,8 +13,6 @@ import json
import textwrap
import stat
import shutil
-import random
-import string
import threading
import concurrent.futures
import subprocess
@@ -22,6 +20,7 @@ import signal
from functools import partial
from itertools import chain
from collections import namedtuple
+from tempfile import mkdtemp
from snakemake.jobs import Job
from snakemake.shell import shell
@@ -74,9 +73,8 @@ class AbstractExecutor:
if self.dag.dynamic(job):
return
- def format_files(job, io, ruleio, dynamicio):
+ def format_files(job, io, dynamicio):
for f in io:
- f_ = ruleio[f]
if f in dynamicio:
yield "{} (dynamic)".format(f.format_dynamic())
else:
@@ -87,15 +85,15 @@ class AbstractExecutor:
msg=job.message,
name=job.rule.name,
local=self.workflow.is_local(job.rule),
- input=list(format_files(job, job.input, job.ruleio,
+ input=list(format_files(job, job.input,
job.dynamic_input)),
- output=list(format_files(job, job.output, job.ruleio,
+ output=list(format_files(job, job.output,
job.dynamic_output)),
log=list(job.log),
benchmark=job.benchmark,
wildcards=job.wildcards_dict,
reason=str(self.dag.reason(job)),
- resources=job.resources_dict,
+ resources=job.resources,
priority="highest"
if priority == Job.HIGHEST_PRIORITY else priority,
threads=job.threads)
@@ -108,11 +106,11 @@ class AbstractExecutor:
logger.error("Error in job {} while creating output file{} {}.".format(
job, "s" if len(job.output) > 1 else "", ", ".join(job.output)))
- def finish_job(self, job):
+ def finish_job(self, job, upload_remote=True):
self.dag.handle_touch(job)
- self.dag.check_output(job, wait=self.latency_wait)
+ self.dag.check_and_touch_output(job, wait=self.latency_wait)
self.dag.unshadow_output(job)
- self.dag.handle_remote(job)
+ self.dag.handle_remote(job, upload=upload_remote)
self.dag.handle_protected(job)
self.dag.handle_temp(job)
@@ -151,8 +149,8 @@ class RealExecutor(AbstractExecutor):
"Please ensure write permissions for the "
"directory {}".format(e, self.workflow.persistence.path))
- def finish_job(self, job):
- super().finish_job(job)
+ def finish_job(self, job, upload_remote=True):
+ super().finish_job(job, upload_remote=upload_remote)
self.stats.report_job_end(job)
try:
self.workflow.persistence.finished(job)
@@ -170,8 +168,7 @@ class TouchExecutor(RealExecutor):
error_callback=None):
super()._run(job)
try:
- for f in job.expanded_output:
- f.touch()
+ #Touching of output files will be done by finish_job
if job.benchmark:
job.benchmark.touch()
time.sleep(0.1)
@@ -298,18 +295,21 @@ class ClusterExecutor(RealExecutor):
raise WorkflowError(
"Defined jobname (\"{}\") has to contain the wildcard {jobid}.")
- self.exec_job = (
- 'cd {workflow.workdir_init} && '
- '{workflow.snakemakepath} --snakefile {workflow.snakefile} '
- '--force -j{cores} --keep-target-files --keep-shadow '
- '--wait-for-files {wait_for_files} --latency-wait {latency_wait} '
- '--benchmark-repeats {benchmark_repeats} '
- '{overwrite_workdir} {overwrite_config} --nocolor '
- '--notemp --quiet --no-hooks --nolock {target}')
+ self.exec_job = '\\\n'.join((
+ 'cd {workflow.workdir_init} && ',
+ '{workflow.snakemakepath} --snakefile {workflow.snakefile} ',
+ '--force -j{cores} --keep-target-files --keep-shadow --keep-remote ',
+ '--wait-for-files {wait_for_files} --latency-wait {latency_wait} ',
+ '--benchmark-repeats {benchmark_repeats} ',
+ '{overwrite_workdir} {overwrite_config} --nocolor ',
+ '--notemp --quiet --no-hooks --nolock {target}'))
if printshellcmds:
self.exec_job += " --printshellcmds "
+ # force threading.Lock() for cluster jobs
+ self.exec_job += " --force-use-threads "
+
if not any(dag.dynamic_output_jobs):
# disable restiction to target rule in case of dynamic rules!
self.exec_job += " --allowed-rules {job.rule.name} "
@@ -352,18 +352,14 @@ class ClusterExecutor(RealExecutor):
def _run(self, job, callback=None, error_callback=None):
if self.max_jobs_per_second:
self._limit_rate()
+ job.remove_existing_output()
super()._run(job, callback=callback, error_callback=error_callback)
logger.shellcmd(job.shellcmd)
@property
def tmpdir(self):
if self._tmpdir is None:
- while True:
- self._tmpdir = ".snakemake/tmp." + "".join(
- random.sample(string.ascii_uppercase + string.digits, 6))
- if not os.path.exists(self._tmpdir):
- os.mkdir(self._tmpdir)
- break
+ self._tmpdir = mkdtemp(dir=".snakemake", prefix="tmp.")
return os.path.abspath(self._tmpdir)
def get_jobscript(self, job):
@@ -375,41 +371,40 @@ class ClusterExecutor(RealExecutor):
cluster=self.cluster_wildcards(job)))
def spawn_jobscript(self, job, jobscript, **kwargs):
- overwrite_workdir = ""
+ overwrite_workdir = []
if self.workflow.overwrite_workdir:
- overwrite_workdir = "--directory {} ".format(
- self.workflow.overwrite_workdir)
- overwrite_config = ""
+ overwrite_workdir.extend(("--directory", self.workflow.overwrite_workdir))
+ overwrite_config = []
if self.workflow.overwrite_configfile:
- overwrite_config = "--configfile {} ".format(
- self.workflow.overwrite_configfile)
+ overwrite_config.extend(("--configfile", self.workflow.overwrite_configfile))
if self.workflow.config_args:
- overwrite_config += "--config {} ".format(
- " ".join(self.workflow.config_args))
+ overwrite_config.append("--config")
+ overwrite_config.extend(self.workflow.config_args)
target = job.output if job.output else job.rule.name
wait_for_files = list(job.local_input) + [self.tmpdir]
if job.shadow_dir:
wait_for_files.append(job.shadow_dir)
- format = partial(str.format,
- job=job,
- overwrite_workdir=overwrite_workdir,
- overwrite_config=overwrite_config,
- workflow=self.workflow,
- cores=self.cores,
- properties=json.dumps(job.properties(cluster=self.cluster_params(job))),
- latency_wait=self.latency_wait,
- benchmark_repeats=self.benchmark_repeats,
- target=target, wait_for_files=" ".join(wait_for_files),
- **kwargs)
+ format_p = partial(format,
+ job=job,
+ overwrite_workdir=overwrite_workdir,
+ overwrite_config=overwrite_config,
+ workflow=self.workflow,
+ cores=self.cores,
+ properties=json.dumps(job.properties(cluster=self.cluster_params(job))),
+ latency_wait=self.latency_wait,
+ benchmark_repeats=self.benchmark_repeats,
+ target=target,
+ wait_for_files=wait_for_files,
+ **kwargs)
try:
- exec_job = format(self.exec_job)
+ exec_job = format_p(self.exec_job, _quote_all=True)
with open(jobscript, "w") as f:
- print(format(self.jobscript, exec_job=exec_job), file=f)
+ print(format_p(self.jobscript, exec_job=exec_job), file=f)
except KeyError as e:
raise WorkflowError(
"Error formatting jobscript: {} not found\n"
- "Make sure that your custom jobscript it up to date.".format(e))
+ "Make sure that your custom jobscript is up to date.".format(e))
os.chmod(jobscript, os.stat(jobscript).st_mode | stat.S_IXUSR)
def cluster_params(self, job):
@@ -427,6 +422,9 @@ class ClusterExecutor(RealExecutor):
def cluster_wildcards(self, job):
return Wildcards(fromdict=self.cluster_params(job))
+ def finish_job(self, job):
+ super().finish_job(job, upload_remote=False)
+
GenericClusterJob = namedtuple("GenericClusterJob", "job callback error_callback jobscript jobfinished jobfailed")
@@ -453,7 +451,9 @@ class GenericClusterExecutor(ClusterExecutor):
max_jobs_per_second=max_jobs_per_second)
self.submitcmd = submitcmd
self.external_jobid = dict()
- self.exec_job += ' && touch "{jobfinished}" || touch "{jobfailed}"'
+ # TODO wrap with watch and touch {jobrunning}
+ # check modification date of {jobrunning} in the wait_for_job method
+ self.exec_job += ' && touch "{jobfinished}" || (touch "{jobfailed}"; exit 1)'
def cancel(self):
logger.info("Will exit after finishing currently running jobs.")
@@ -660,7 +660,10 @@ class DRMAAExecutor(ClusterExecutor):
def cancel(self):
from drmaa.const import JobControlAction
for jobid in self.submitted:
- self.session.control(jobid, JobControlAction.TERMINATE)
+ try:
+ self.session.control(jobid, JobControlAction.TERMINATE)
+ except drmaa.errors.InvalidJobException:
+ pass
self.shutdown()
def run(self, job,
@@ -683,6 +686,7 @@ class DRMAAExecutor(ClusterExecutor):
jt = self.session.createJobTemplate()
jt.remoteCommand = jobscript
jt.nativeSpecification = drmaa_args
+ jt.jobName = os.path.basename(jobscript)
jobid = self.session.runJob(jt)
except (drmaa.errors.InternalException,
diff --git a/snakemake/io.py b/snakemake/io.py
index 384e300..5a3fe17 100644
--- a/snakemake/io.py
+++ b/snakemake/io.py
@@ -9,6 +9,7 @@ import re
import stat
import time
import json
+import copy
import functools
from itertools import product, chain
from collections import Iterable, namedtuple
@@ -24,10 +25,19 @@ def lstat(f):
def lutime(f, times):
- return os.utime(
- f,
- times,
- follow_symlinks=os.utime not in os.supports_follow_symlinks)
+ #In some cases, we have a platform where os.supports_follow_symlink includes stat()
+ #but not utime(). This leads to an anomaly. In any case we never want to touch the
+ #target of a link.
+ if os.utime in os.supports_follow_symlinks:
+ #...utime is well behaved
+ return os.utime(f, times, follow_symlinks=False)
+ elif not os.path.islink(f):
+ #...symlinks not an issue here
+ return os.utime(f, times)
+ else:
+ #...problem system. Do nothing.
+ logger.warning("Unable to set utime on symlink {}. Your Python build does not support it.".format(f))
+ return None
def lchmod(f, mode):
@@ -81,8 +91,9 @@ class _IOFile(str):
def update_remote_filepath(self):
# if the file string is different in the iofile, update the remote object
# (as in the case of wildcard expansion)
- if get_flag_value(self._file, "remote_object").file != self._file:
- get_flag_value(self._file, "remote_object")._iofile = self
+ remote_object = get_flag_value(self._file, "remote_object")
+ if remote_object._file != self._file:
+ remote_object._iofile = self
@property
def should_keep_local(self):
@@ -102,6 +113,13 @@ class _IOFile(str):
raise ValueError("This IOFile is specified as a function and "
"may not be used directly.")
+ def check(self):
+ if self._file.startswith("./"):
+ logger.warning("File path {} starts with './'. This is redundant "
+ "and strongly discouraged. It can also lead to "
+ "inconsistent results of the file-matching approach "
+ "used by Snakemake.".format(self._file))
+
@property
@_refer_to_remote
def exists(self):
@@ -150,8 +168,16 @@ class _IOFile(str):
raise WorkflowError("File {} seems to be a broken symlink.".format(
self.file))
+ @_refer_to_remote
def is_newer(self, time):
- return self.mtime > time
+ """ Returns true of the file is newer than time, or if it is
+ a symlink that points to a file newer than time. """
+ if self.is_remote:
+ #If file is remote but provider does not override the implementation this
+ #is the best we can do.
+ return self.mtime > time
+ else:
+ return os.stat(self, follow_symlinks=True).st_mtime > time or self.mtime > time
def download_from_remote(self):
if self.is_remote and self.remote_object.exists():
@@ -190,7 +216,7 @@ class _IOFile(str):
lchmod(self.file, mode)
def remove(self, remove_non_empty_dir=False):
- remove(self.file, remove_non_empty_dir)
+ remove(self.file, remove_non_empty_dir=remove_non_empty_dir)
def touch(self, times=None):
""" times must be 2-tuple: (atime, mtime) """
@@ -265,7 +291,10 @@ class _IOFile(str):
if isinstance(self._file, str):
self._file = AnnotatedString(self._file)
if isinstance(other._file, AnnotatedString):
- self._file.flags = getattr(other._file, "flags", {})
+ self._file.flags = getattr(other._file, "flags", {}).copy()
+ if "remote_object" in self._file.flags:
+ self._file.flags['remote_object'] = copy.copy(
+ self._file.flags['remote_object'])
def set_flags(self, flags):
if isinstance(self._file, str):
@@ -281,9 +310,19 @@ class _IOFile(str):
_wildcard_regex = re.compile(
- "\{\s*(?P<name>\w+?)(\s*,\s*(?P<constraint>([^\{\}]+|\{\d+(,\d+)?\})*))?\s*\}")
-
-# "\{\s*(?P<name>\w+?)(\s*,\s*(?P<constraint>[^\}]*))?\s*\}")
+ r"""
+ \{
+ (?=( # This lookahead assertion emulates an 'atomic group'
+ # which is required for performance
+ \s*(?P<name>\w+) # wildcard name
+ (\s*,\s*
+ (?P<constraint> # an optional constraint
+ ([^{}]+ | \{\d+(,\d+)?\})* # allow curly braces to nest one level
+ ) # ... as in '{w,a{3,5}}'
+ )?\s*
+ ))\1
+ \}
+ """, re.VERBOSE)
def wait_for_files(files, latency_wait=3):
@@ -311,6 +350,10 @@ def contains_wildcard(path):
return _wildcard_regex.search(path) is not None
+def contains_wildcard_constraints(pattern):
+ return any(match.group('constraint') for match in _wildcard_regex.finditer(pattern))
+
+
def remove(file, remove_non_empty_dir=False):
if os.path.exists(file):
if os.path.isdir(file):
@@ -319,9 +362,12 @@ def remove(file, remove_non_empty_dir=False):
else:
try:
os.removedirs(file)
- except OSError:
- # ignore non empty directories
- pass
+ except OSError as e:
+ # skip non empty directories
+ if e.errno == 39:
+ logger.info("Skipped removing empty directory {}".format(e.filename))
+ else:
+ logger.warning(str(e))
else:
os.remove(file)
@@ -336,9 +382,8 @@ def regex(filepattern):
if wildcard in wildcards:
if match.group("constraint"):
raise ValueError(
- "If multiple wildcards of the same name "
- "appear in a string, eventual constraints have to be defined "
- "at the first occurence and will be inherited by the others.")
+ "Constraint regex must be defined only in the first "
+ "occurence of the wildcard in a string.")
f.append("(?P={})".format(wildcard))
else:
wildcards.add(wildcard)
@@ -374,6 +419,9 @@ def apply_wildcards(pattern,
return re.sub(_wildcard_regex, format_match, pattern)
+
+
+
def not_iterable(value):
return isinstance(value, str) or isinstance(value, dict) or not isinstance(
value, Iterable)
@@ -539,6 +587,39 @@ def glob_wildcards(pattern, files=None):
return wildcards
+def update_wildcard_constraints(pattern,
+ wildcard_constraints,
+ global_wildcard_constraints):
+ """Update wildcard constraints
+
+ Args:
+ pattern (str): pattern on which to update constraints
+ wildcard_constraints (dict): dictionary of wildcard:constraint key-value pairs
+ global_wildcard_constraints (dict): dictionary of wildcard:constraint key-value pairs
+ """
+ def replace_constraint(match):
+ name = match.group("name")
+ constraint = match.group("constraint")
+ newconstraint = wildcard_constraints.get(name, global_wildcard_constraints.get(name))
+ if name in examined_names:
+ return match.group(0)
+ examined_names.add(name)
+ # Don't override if constraint already set
+ if not constraint is None:
+ if name in wildcard_constraints:
+ raise ValueError("Wildcard {} is constrained by both the rule and the file pattern. Consider removing one of the constraints.")
+ return match.group(0)
+ # Only update if a new constraint has actually been set
+ elif not newconstraint is None:
+ return "{{{},{}}}".format(name, newconstraint)
+ else:
+ return match.group(0)
+
+ examined_names = set()
+ return re.sub(_wildcard_regex, replace_constraint, pattern)
+
+
+
# TODO rewrite Namedlist!
class Namedlist(list):
"""
@@ -641,6 +722,9 @@ class Namedlist(list):
def plainstrings(self):
return self.__class__.__call__(toclone=self, plainstr=True)
+ def get(self, key, default_value=None):
+ return self.__dict__.get(key, default_value)
+
def __getitem__(self, key):
try:
return super().__getitem__(key)
diff --git a/snakemake/jobs.py b/snakemake/jobs.py
index 53bab86..99ea315 100644
--- a/snakemake/jobs.py
+++ b/snakemake/jobs.py
@@ -38,21 +38,14 @@ class Job:
self._format_wildcards = (self.wildcards if format_wildcards is None
else Wildcards(fromdict=format_wildcards))
- (self.input, self.output, self.params, self.log, self.benchmark,
- self.ruleio,
- self.dependencies) = rule.expand_wildcards(self.wildcards_dict)
-
- self.resources_dict = {}
- for name, res in rule.resources.items():
- if callable(res):
- res = res(self.wildcards)
- if not isinstance(res, int):
- raise ValueError("Callable for resources must return int")
- self.resources_dict[name] = min(
- self.rule.workflow.global_resources.get(name, res), res)
-
- self.threads = self.resources_dict["_cores"]
- self.resources = Resources(fromdict=self.resources_dict)
+ self.input, input_mapping, self.dependencies = self.rule.expand_input(self.wildcards_dict)
+ self.output, output_mapping = self.rule.expand_output(self.wildcards_dict)
+ # other properties are lazy to be able to use additional parameters and check already existing files
+ self._params = None
+ self._log = None
+ self._benchmark = None
+ self._resources = None
+
self.shadow_dir = None
self._inputsize = None
@@ -61,7 +54,7 @@ class Job:
self.touch_output = set()
self.subworkflow_input = dict()
for f in self.output:
- f_ = self.ruleio[f]
+ f_ = output_mapping[f]
if f_ in self.rule.dynamic_output:
self.dynamic_output.add(f)
if f_ in self.rule.temp_output:
@@ -71,7 +64,7 @@ class Job:
if f_ in self.rule.touch_output:
self.touch_output.add(f)
for f in self.input:
- f_ = self.ruleio[f]
+ f_ = input_mapping[f]
if f_ in self.rule.dynamic_input:
self.dynamic_input.add(f)
if f_ in self.rule.subworkflow_input:
@@ -81,6 +74,45 @@ class Job:
for o in self.output:
self._hash ^= o.__hash__()
+ def is_valid(self):
+ """Check if job is valid"""
+ # these properties have to work in dry-run as well. Hence we check them here:
+ resources = self.rule.expand_resources(self.wildcards_dict, self.input)
+ self.rule.expand_params(self.wildcards_dict, self.input, resources)
+ self.rule.expand_benchmark(self.wildcards_dict)
+ self.rule.expand_log(self.wildcards_dict)
+
+ @property
+ def threads(self):
+ return self.resources._cores
+
+ @property
+ def params(self):
+ if self._params is None:
+ self._params = self.rule.expand_params(self.wildcards_dict,
+ self.input,
+ self.resources)
+ return self._params
+
+ @property
+ def log(self):
+ if self._log is None:
+ self._log = self.rule.expand_log(self.wildcards_dict)
+ return self._log
+
+ @property
+ def benchmark(self):
+ if self._benchmark is None:
+ self._benchmark = self.rule.expand_benchmark(self.wildcards_dict)
+ return self._benchmark
+
+ @property
+ def resources(self):
+ if self._resources is None:
+ self._resources = self.rule.expand_resources(self.wildcards_dict,
+ self.input)
+ return self._resources
+
@property
def is_shadow(self):
return self.rule.shadow_depth is not None
@@ -145,16 +177,13 @@ class Job:
else:
yield f
- @property
- def expanded_shadowed_output(self):
- """ Get the paths of output files, resolving shadow directory. """
+ def shadowed_path(self, f):
+ """ Get the shadowed path of IOFile f. """
if not self.shadow_dir:
- yield from self.expanded_output
- else:
- for f in self.expanded_output:
- file_to_yield = IOFile(os.path.join(self.shadow_dir, f), self.rule)
- file_to_yield.clone_flags(f)
- yield file_to_yield
+ return f
+ f_ = IOFile(os.path.join(self.shadow_dir, f), self.rule)
+ f_.clone_flags(f)
+ return f_
@property
def dynamic_wildcards(self):
@@ -334,6 +363,21 @@ class Job:
if protected:
raise ProtectedOutputException(self.rule, protected)
+ def remove_existing_output(self):
+ """Clean up both dynamic and regular output before rules actually run
+ """
+ if self.dynamic_output:
+ for f, _ in chain(*map(self.expand_dynamic,
+ self.rule.dynamic_output)):
+ os.remove(f)
+
+ for f, f_ in zip(self.output, self.rule.output):
+ try:
+ f.remove(remove_non_empty_dir=False)
+ except FileNotFoundError:
+ #No file == no problem
+ pass
+
def prepare(self):
"""
Prepare execution of job.
@@ -352,10 +396,6 @@ class Job:
"present when the DAG was created:\n{}".format(
self.rule, unexpected_output))
- if self.dynamic_output:
- for f, _ in chain(*map(self.expand_dynamic,
- self.rule.dynamic_output)):
- os.remove(f)
for f, f_ in zip(self.output, self.rule.output):
f.prepare()
@@ -367,6 +407,8 @@ class Job:
if self.benchmark:
self.benchmark.prepare()
+ self.remove_existing_output()
+
if not self.is_shadow:
return
# Create shadow directory structure
@@ -400,6 +442,12 @@ class Job:
link = os.path.join(self.shadow_dir, relative_source)
os.symlink(source, link)
+
+ def close_remote(self):
+ for f in (self.input + self.output):
+ if f.is_remote:
+ f.remote_object.close()
+
def cleanup(self):
""" Cleanup output files. """
to_remove = [f for f in self.expanded_output if f.exists]
diff --git a/snakemake/logging.py b/snakemake/logging.py
index b604cca..3c0830b 100644
--- a/snakemake/logging.py
+++ b/snakemake/logging.py
@@ -9,7 +9,8 @@ import time
import sys
import os
import json
-from multiprocessing import Lock
+import multiprocessing
+import threading
import tempfile
from functools import partial
@@ -17,7 +18,7 @@ from snakemake.common import DYNAMIC_FILL
class ColorizingStreamHandler(_logging.StreamHandler):
- _output_lock = Lock()
+
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
RESET_SEQ = "\033[0m"
@@ -32,8 +33,14 @@ class ColorizingStreamHandler(_logging.StreamHandler):
'ERROR': RED
}
- def __init__(self, nocolor=False, stream=sys.stderr, timestamp=False):
+ def __init__(self, nocolor=False, stream=sys.stderr, timestamp=False, use_threads=False):
super().__init__(stream=stream)
+
+ if not use_threads:
+ self._output_lock = multiprocessing.Lock()
+ else:
+ self._output_lock = threading.Lock()
+
self.nocolor = nocolor or not self.can_color_tty
self.timestamp = timestamp
@@ -180,6 +187,8 @@ class Logger:
yield fmt
singleitems = ["benchmark"]
+ if self.printreason:
+ singleitems.append("reason")
for item in singleitems:
fmt = format_item(item, omit=None)
if fmt != None:
@@ -198,9 +207,6 @@ class Logger:
if resources:
yield "\tresources: " + resources
- if self.printreason:
- singleitems.append("reason")
-
level = msg["level"]
if level == "info":
self.logger.warning(msg["msg"])
@@ -217,8 +223,10 @@ class Logger:
elif level == "progress" and not self.quiet:
done = msg["done"]
total = msg["total"]
- self.logger.info("{} of {} steps ({:.0%}) done".format(
- done, total, done / total))
+ p = done / total
+ percent_fmt = ("{:.2%}" if p < 0.01 else "{:.0%}").format(p)
+ self.logger.info("{} of {} steps ({}) done".format(
+ done, total, percent_fmt))
elif level == "job_info":
if not self.quiet:
if msg["msg"] is not None:
@@ -263,7 +271,8 @@ def setup_logger(handler=None,
nocolor=False,
stdout=False,
debug=False,
- timestamp=False):
+ timestamp=False,
+ use_threads=False):
logger.setup()
if handler is not None:
# custom log handler
@@ -273,7 +282,8 @@ def setup_logger(handler=None,
stream_handler = ColorizingStreamHandler(
nocolor=nocolor,
stream=sys.stdout if stdout else sys.stderr,
- timestamp=timestamp)
+ timestamp=timestamp,
+ use_threads=use_threads)
logger.set_stream_handler(stream_handler)
logger.set_level(_logging.DEBUG if debug else _logging.INFO)
diff --git a/snakemake/parser.py b/snakemake/parser.py
index 6908f80..3179236 100644
--- a/snakemake/parser.py
+++ b/snakemake/parser.py
@@ -256,6 +256,13 @@ class Ruleorder(GlobalKeywordState):
self.error('Expected a descending order of rule names, '
'e.g. rule1 > rule2 > rule3 ...', token)
+
+class GlobalWildcardConstraints(GlobalKeywordState):
+ @property
+ def keyword(self):
+ return "global_wildcard_constraints"
+
+
# subworkflows
@@ -267,10 +274,15 @@ class SubworkflowWorkdir(SubworkflowKeywordState):
pass
+class SubworkflowConfigfile(SubworkflowKeywordState):
+ pass
+
+
class Subworkflow(GlobalKeywordState):
subautomata = dict(snakefile=SubworkflowSnakefile,
- workdir=SubworkflowWorkdir)
+ workdir=SubworkflowWorkdir,
+ configfile=SubworkflowConfigfile)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile,
@@ -386,6 +398,13 @@ class Benchmark(RuleKeywordState):
pass
+class WildcardConstraints(RuleKeywordState):
+ @property
+ def keyword(self):
+ return "wildcard_constraints"
+
+
+
class Run(RuleKeywordState):
def __init__(self, snakefile, rulename,
base_indent=0,
@@ -541,6 +560,7 @@ class Rule(GlobalKeywordState):
log=Log,
message=Message,
benchmark=Benchmark,
+ wildcard_constraints=WildcardConstraints,
shadow=Shadow,
run=Run,
shell=Shell,
@@ -649,7 +669,8 @@ class Python(TokenAutomaton):
localrules=Localrules,
onsuccess=OnSuccess,
onerror=OnError,
- onstart=OnStart)
+ onstart=OnStart,
+ wildcard_constraints=GlobalWildcardConstraints)
def __init__(self, snakefile, base_indent=0, dedent=0, root=True):
super().__init__(snakefile,
diff --git a/snakemake/remote/FTP.py b/snakemake/remote/FTP.py
index c11ff17..f796459 100644
--- a/snakemake/remote/FTP.py
+++ b/snakemake/remote/FTP.py
@@ -32,52 +32,54 @@ class RemoteObject(DomainObject):
super(RemoteObject, self).__init__(*args, keep_local=keep_local, provider=provider, **kwargs)
self.encrypt_data_channel = encrypt_data_channel
-
+
+ def close(self):
+ if hasattr(self, "conn") and isinstance(self.conn, ftputil.FTPHost):
+ self.conn.close()
+
# === Implementations of abstract class members ===
@contextmanager #makes this a context manager. after 'yield' is __exit__()
def ftpc(self):
- # if args have been provided to remote(), use them over those given to RemoteProvider()
- args_to_use = self.provider.args
- if len(self.args):
- args_to_use = self.args
-
- # use kwargs passed in to remote() to override those given to the RemoteProvider()
- # default to the host and port given as part of the file, falling back to one specified
- # as a kwarg to remote() or the RemoteProvider (overriding the latter with the former if both)
- kwargs_to_use = {}
- kwargs_to_use["host"] = self.host
- kwargs_to_use["username"] = None
- kwargs_to_use["password"] = None
- kwargs_to_use["port"] = int(self.port) if self.port else 21
- kwargs_to_use["encrypt_data_channel"] = self.encrypt_data_channel
-
- for k,v in self.provider.kwargs.items():
- kwargs_to_use[k] = v
- for k,v in self.kwargs.items():
- kwargs_to_use[k] = v
-
- ftp_base_class = ftplib.FTP_TLS if kwargs_to_use["encrypt_data_channel"] else ftplib.FTP
-
- ftp_session_factory = ftputil.session.session_factory(
- base_class=ftp_base_class,
- port=kwargs_to_use["port"],
- encrypt_data_channel= kwargs_to_use["encrypt_data_channel"],
- debug_level=None)
-
- conn = ftputil.FTPHost(kwargs_to_use["host"], kwargs_to_use["username"], kwargs_to_use["password"], session_factory=ftp_session_factory)
- yield conn
- conn.close()
+ if not hasattr(self, "conn") or (hasattr(self, "conn") and not isinstance(self.conn, ftputil.FTPHost)):
+ # if args have been provided to remote(), use them over those given to RemoteProvider()
+ args_to_use = self.provider.args
+ if len(self.args):
+ args_to_use = self.args
+
+ # use kwargs passed in to remote() to override those given to the RemoteProvider()
+ # default to the host and port given as part of the file, falling back to one specified
+ # as a kwarg to remote() or the RemoteProvider (overriding the latter with the former if both)
+ kwargs_to_use = {}
+ kwargs_to_use["host"] = self.host
+ kwargs_to_use["username"] = None
+ kwargs_to_use["password"] = None
+ kwargs_to_use["port"] = int(self.port) if self.port else 21
+ kwargs_to_use["encrypt_data_channel"] = self.encrypt_data_channel
+
+ for k,v in self.provider.kwargs.items():
+ kwargs_to_use[k] = v
+ for k,v in self.kwargs.items():
+ kwargs_to_use[k] = v
+
+ ftp_base_class = ftplib.FTP_TLS if kwargs_to_use["encrypt_data_channel"] else ftplib.FTP
+
+ ftp_session_factory = ftputil.session.session_factory(
+ base_class=ftp_base_class,
+ port=kwargs_to_use["port"],
+ encrypt_data_channel= kwargs_to_use["encrypt_data_channel"],
+ debug_level=None)
+
+ self.conn = ftputil.FTPHost(kwargs_to_use["host"], kwargs_to_use["username"], kwargs_to_use["password"], session_factory=ftp_session_factory)
+ yield self.conn
def exists(self):
if self._matched_address:
with self.ftpc() as ftpc:
return ftpc.path.exists(self.remote_path)
- if ftpc.path.exists(self.remote_path):
- return ftpc.path.isfile(self.remote_path)
return False
else:
- raise SFTPFileException("The file cannot be parsed as an FTP path in form 'host:port/abs/path/to/file': %s" % self.file())
+ raise FTPFileException("The file cannot be parsed as an FTP path in form 'host:port/abs/path/to/file': %s" % self.file())
def mtime(self):
if self.exists():
@@ -89,7 +91,7 @@ class RemoteObject(DomainObject):
pass
return ftpc.path.getmtime(self.remote_path)
else:
- raise SFTPFileException("The file does not seem to exist remotely: %s" % self.file())
+ raise FTPFileException("The file does not seem to exist remotely: %s" % self.file())
def size(self):
if self.exists():
@@ -111,7 +113,7 @@ class RemoteObject(DomainObject):
pass
ftpc.download(source=self.remote_path, target=self.local_path)
else:
- raise SFTPFileException("The file does not seem to exist remotely: %s" % self.file())
+ raise FTPFileException("The file does not seem to exist remotely: %s" % self.file())
def upload(self):
with self.ftpc() as ftpc:
diff --git a/snakemake/remote/SFTP.py b/snakemake/remote/SFTP.py
index bdfff0e..d466ca5 100644
--- a/snakemake/remote/SFTP.py
+++ b/snakemake/remote/SFTP.py
@@ -9,13 +9,13 @@ from contextlib import contextmanager
# module-specific
from snakemake.remote import AbstractRemoteProvider, DomainObject
from snakemake.exceptions import SFTPFileException, WorkflowError
-import snakemake.io
+import snakemake.io
try:
# third-party modules
import pysftp
except ImportError as e:
- raise WorkflowError("The Python 3 package 'pysftp' " +
+ raise WorkflowError("The Python 3 package 'pysftp' " +
"must be installed to use SFTP remote() file functionality. %s" % e.msg)
@@ -29,11 +29,11 @@ class RemoteObject(DomainObject):
def __init__(self, *args, keep_local=False, provider=None, **kwargs):
super(RemoteObject, self).__init__(*args, keep_local=keep_local, provider=provider, **kwargs)
-
+
# === Implementations of abstract class members ===
@contextmanager #makes this a context manager. after 'yield' is __exit__()
- def sftpc(self):
+ def sftpc(self):
# if args have been provided to remote(), use them over those given to RemoteProvider()
args_to_use = self.provider.args
if len(self.args):
@@ -67,11 +67,19 @@ class RemoteObject(DomainObject):
def mtime(self):
if self.exists():
with self.sftpc() as sftpc:
- attr = sftpc.stat(self.remote_path)
+ #As per local operation, don't follow symlinks when reporting mtime
+ attr = sftpc.lstat(self.remote_path)
return int(attr.st_mtime)
else:
raise SFTPFileException("The file does not seem to exist remotely: %s" % self.file())
+ def is_newer(self, time):
+ """ Returns true of the file is newer than time, or if it is
+ a symlink that points to a file newer than time. """
+ with self.sftpc() as sftpc:
+ return ( sftpc.stat( self.remote_path).st_mtime > time or
+ sftpc.lstat(self.remote_path).st_mtime > time )
+
def size(self):
if self.exists():
with self.sftpc() as sftpc:
diff --git a/snakemake/remote/__init__.py b/snakemake/remote/__init__.py
index 8667584..0f689a8 100644
--- a/snakemake/remote/__init__.py
+++ b/snakemake/remote/__init__.py
@@ -6,11 +6,35 @@ __license__ = "MIT"
# built-ins
import os, sys, re
from abc import ABCMeta, abstractmethod
+from wrapt import ObjectProxy
+import copy
# module-specific
import snakemake.io
from snakemake.exceptions import RemoteFileException
+class StaticRemoteObjectProxy(ObjectProxy):
+ '''Proxy that implements static-ness for remote objects.
+
+ The constructor takes a real RemoteObject and returns a proxy that
+ behaves the same except for the exists() and mtime() methods.
+
+ '''
+ def exists(self):
+ return True
+
+ def mtime(self):
+ return float("-inf")
+
+ def __copy__(self):
+ copied_wrapped = copy.copy(self.__wrapped__)
+ return type(self)(copied_wrapped)
+
+ def __deepcopy__(self):
+ copied_wrapped = copy.deepcopy(self.__wrapped__)
+ return type(self)(copied_wrapped)
+
+
class AbstractRemoteProvider:
""" This is an abstract class to be used to derive remote provider classes. These might be used to hold common credentials,
and are then passed to RemoteObjects.
@@ -21,7 +45,7 @@ class AbstractRemoteProvider:
self.args = args
self.kwargs = kwargs
- def remote(self, value, *args, keep_local=False, **kwargs):
+ def remote(self, value, *args, keep_local=False, static=False, **kwargs):
if snakemake.io.is_flagged(value, "temp"):
raise SyntaxError(
"Remote and temporary flags are mutually exclusive.")
@@ -31,6 +55,8 @@ class AbstractRemoteProvider:
provider = sys.modules[self.__module__] # get module of derived class
remote_object = provider.RemoteObject(*args, keep_local=keep_local, provider=provider.RemoteProvider(*self.args, **self.kwargs), **kwargs)
+ if static:
+ remote_object = StaticRemoteObjectProxy(remote_object)
return snakemake.io.flag(
value,
@@ -55,6 +81,7 @@ class AbstractRemoteProvider:
def remote_interface(self):
pass
+
class AbstractRemoteObject:
""" This is an abstract class to be used to derive remote object classes for
different cloud storage providers. For example, there could be classes for interacting with
@@ -73,12 +100,18 @@ class AbstractRemoteObject:
@property
def _file(self):
+ if self._iofile is None:
+ return None
return self._iofile._file
def file(self):
return self._file
@abstractmethod
+ def close(self):
+ pass
+
+ @abstractmethod
def exists(self):
pass
diff --git a/snakemake/rules.py b/snakemake/rules.py
index f4ad9b1..060fef1 100644
--- a/snakemake/rules.py
+++ b/snakemake/rules.py
@@ -10,10 +10,11 @@ import inspect
import sre_constants
from collections import defaultdict, Iterable
-from snakemake.io import IOFile, _IOFile, protected, temp, dynamic, Namedlist, AnnotatedString
-from snakemake.io import expand, InputFiles, OutputFiles, Wildcards, Params, Log
+from snakemake.io import IOFile, _IOFile, protected, temp, dynamic, Namedlist, AnnotatedString, contains_wildcard_constraints, update_wildcard_constraints
+from snakemake.io import expand, InputFiles, OutputFiles, Wildcards, Params, Log, Resources
from snakemake.io import apply_wildcards, is_flagged, not_iterable
-from snakemake.exceptions import RuleException, IOFileException, WildcardError, InputFunctionException
+from snakemake.exceptions import RuleException, IOFileException, WildcardError, InputFunctionException, WorkflowError
+from snakemake.logging import logger
class Rule:
@@ -33,6 +34,7 @@ class Rule:
self._input = InputFiles()
self._output = OutputFiles()
self._params = Params()
+ self._wildcard_constraints = dict()
self.dependencies = dict()
self.dynamic_output = set()
self.dynamic_input = set()
@@ -62,6 +64,7 @@ class Rule:
self._input = InputFiles(other._input)
self._output = OutputFiles(other._output)
self._params = Params(other._params)
+ self._wildcard_constraints = dict(other._wildcard_constraints)
self.dependencies = dict(other.dependencies)
self.dynamic_output = set(other.dynamic_output)
self.dynamic_input = set(other.dynamic_input)
@@ -86,8 +89,7 @@ class Rule:
def dynamic_branch(self, wildcards, input=True):
def get_io(rule):
return (rule.input, rule.dynamic_input) if input else (
- rule.output, rule.dynamic_output
- )
+ rule.output, rule.dynamic_output)
def partially_expand(f, wildcards):
"""Expand the wildcards in f from the ones present in wildcards
@@ -98,8 +100,7 @@ class Rule:
# perform the partial expansion from f's string representation
s = str(f).replace('{', '{{').replace('}', '}}')
for key in wildcards.keys():
- s = s.replace('{{{{{}}}}}'.format(key),
- '{{{}}}'.format(key))
+ s = s.replace('{{{{{}}}}}'.format(key), '{{{}}}'.format(key))
# build result
anno_s = AnnotatedString(s)
anno_s.flags = f.flags
@@ -149,9 +150,12 @@ class Rule:
for name, values in wildcards.items()
if len(set(values)) == 1)
# TODO have a look into how to concretize dependencies here
- (branch._input, branch._output, branch._params, branch._log,
- branch._benchmark, _, branch.dependencies
- ) = branch.expand_wildcards(wildcards=non_dynamic_wildcards)
+ branch._input, _, branch.dependencies = branch.expand_input(non_dynamic_wildcards)
+ branch._output, _ = branch.expand_output(non_dynamic_wildcards)
+ branch.resources = dict(branch.expand_resources(non_dynamic_wildcards, branch._input).items())
+ branch._params = branch.expand_params(non_dynamic_wildcards, branch._input, branch.resources)
+ branch._log = branch.expand_log(non_dynamic_wildcards)
+ branch._benchmark = branch.expand_benchmark(non_dynamic_wildcards)
return branch, non_dynamic_wildcards
return branch
@@ -216,9 +220,9 @@ class Rule:
wildcards = item.get_wildcard_names()
if self.wildcard_names:
if self.wildcard_names != wildcards:
- raise SyntaxError(
- "Not all output files of rule {} "
- "contain the same wildcards.".format(self.name))
+ raise SyntaxError("Not all output files of rule {} "
+ "contain the same wildcards.".format(
+ self.name))
else:
self.wildcard_names = wildcards
@@ -236,6 +240,21 @@ class Rule:
# add the rule to the dependencies
if isinstance(item, _IOFile):
self.dependencies[item] = item.rule
+ if output:
+ if self.wildcard_constraints or self.workflow._wildcard_constraints:
+ try:
+ item = update_wildcard_constraints(
+ item, self.wildcard_constraints,
+ self.workflow._wildcard_constraints)
+ except ValueError as e:
+ raise IOFileException(
+ str(e),
+ snakefile=self.snakefile,
+ lineno=self.lineno)
+ else:
+ if contains_wildcard_constraints(item):
+ logger.warning(
+ "wildcard constraints in inputs are ignored")
_item = IOFile(item, rule=self)
if is_flagged(item, "temp"):
if output:
@@ -296,6 +315,13 @@ class Rule:
self.params.add_name(name)
@property
+ def wildcard_constraints(self):
+ return self._wildcard_constraints
+
+ def set_wildcard_constraints(self, **kwwildcard_constraints):
+ self._wildcard_constraints.update(kwwildcard_constraints)
+
+ @property
def log(self):
return self._log
@@ -308,8 +334,8 @@ class Rule:
def _set_log_item(self, item, name=None):
if isinstance(item, str) or callable(item):
self.log.append(IOFile(item,
- rule=self)
- if isinstance(item, str) else item)
+ rule=self) if isinstance(item, str) else
+ item)
if name:
self.log.add_name(name)
else:
@@ -322,12 +348,58 @@ class Rule:
except TypeError:
raise SyntaxError("Log files have to be specified as strings.")
- def expand_wildcards(self, wildcards=None):
- """
- Expand wildcards depending on the requested output
- or given wildcards dict.
- """
+ def check_wildcards(self, wildcards):
+ missing_wildcards = self.wildcard_names - set(wildcards.keys())
+ if missing_wildcards:
+ raise RuleException(
+ "Could not resolve wildcards in rule {}:\n{}".format(
+ self.name, "\n".join(self.wildcard_names)),
+ lineno=self.lineno,
+ snakefile=self.snakefile)
+
+ def apply_input_function(self, func, wildcards, **aux_params):
+ sig = inspect.signature(func)
+ _aux_params = {k: v for k, v in aux_params.items() if k in sig.parameters}
+ try:
+ value = func(Wildcards(fromdict=wildcards), **_aux_params)
+ except (Exception, BaseException) as e:
+ raise InputFunctionException(e, rule=self, wildcards=wildcards)
+ return value
+
+ def _apply_wildcards(self, newitems, olditems, wildcards,
+ concretize=apply_wildcards,
+ check_return_type=True,
+ mapping=None,
+ no_flattening=False,
+ aux_params=None):
+ if aux_params is None:
+ aux_params = dict()
+ for name, item in olditems.allitems():
+ start = len(newitems)
+ is_iterable = True
+
+ if callable(item):
+ item = self.apply_input_function(item, wildcards, **aux_params)
+
+ if not_iterable(item) or no_flattening:
+ item = [item]
+ is_iterable = False
+ for item_ in item:
+ if check_return_type and not isinstance(item_, str):
+ raise WorkflowError("Function did not return str or list "
+ "of str.", rule=self)
+ concrete = concretize(item_, wildcards)
+ newitems.append(concrete)
+ if mapping is not None:
+ mapping[concrete] = item_
+
+ if name:
+ newitems.set_name(
+ name, start,
+ end=len(newitems) if is_iterable else None)
+
+ def expand_input(self, wildcards):
def concretize_iofile(f, wildcards):
if not isinstance(f, _IOFile):
return IOFile(f, rule=self)
@@ -336,100 +408,125 @@ class Rule:
fill_missing=f in self.dynamic_input,
fail_dynamic=self.dynamic_output)
+ input = InputFiles()
+ mapping = dict()
+ try:
+ self._apply_wildcards(input, self.input, wildcards,
+ concretize=concretize_iofile,
+ mapping=mapping)
+ except WildcardError as e:
+ raise WorkflowError(
+ "Wildcards in input files cannot be "
+ "determined from output files:",
+ str(e), rule=self)
+
+ if self.dependencies:
+ dependencies = {
+ f: self.dependencies[f_]
+ for f, f_ in mapping.items() if f_ in self.dependencies
+ }
+ if None in self.dependencies:
+ dependencies[None] = self.dependencies[None]
+ else:
+ dependencies = self.dependencies
+
+ for f in input:
+ f.check()
+
+ return input, mapping, dependencies
+
+ def expand_params(self, wildcards, input, resources):
+ # TODO add output
def concretize_param(p, wildcards):
if isinstance(p, str):
return apply_wildcards(p, wildcards)
return p
- def check_string_type(f):
- if not isinstance(f, str):
- raise RuleException(
- "Input function did not return str or list of str.",
- rule=self)
-
- def _apply_wildcards(newitems, olditems, wildcards, wildcards_obj,
- concretize=apply_wildcards,
- check_return_type=check_string_type,
- ruleio=None,
- no_flattening=False):
- for name, item in olditems.allitems():
- start = len(newitems)
- is_iterable = True
-
- if callable(item):
- try:
- item = item(wildcards_obj)
- except (Exception, BaseException) as e:
- raise InputFunctionException(e, rule=self, wildcards=wildcards)
-
- if not_iterable(item) or no_flattening:
- item = [item]
- is_iterable = False
- for item_ in item:
- check_return_type(item_)
- concrete = concretize(item_, wildcards)
- newitems.append(concrete)
- if ruleio is not None:
- ruleio[concrete] = item_
-
- if name:
- newitems.set_name(
- name, start,
- end=len(newitems) if is_iterable else None)
-
- if wildcards is None:
- wildcards = dict()
- missing_wildcards = self.wildcard_names - set(wildcards.keys())
-
- if missing_wildcards:
- raise RuleException(
- "Could not resolve wildcards in rule {}:\n{}".format(
- self.name, "\n".join(self.wildcard_names)),
- lineno=self.lineno,
- snakefile=self.snakefile)
-
- ruleio = dict()
-
+ params = Params()
try:
- input = InputFiles()
- wildcards_obj = Wildcards(fromdict=wildcards)
- _apply_wildcards(input, self.input, wildcards, wildcards_obj,
- concretize=concretize_iofile,
- ruleio=ruleio)
-
- params = Params()
#When applying wildcards to params, the return type need not be
#a string, so the check is disabled.
- _apply_wildcards(params, self.params, wildcards, wildcards_obj,
- concretize=concretize_param,
- check_return_type=lambda x: None,
- no_flattening=True)
-
- output = OutputFiles(o.apply_wildcards(wildcards)
- for o in self.output)
- output.take_names(self.output.get_names())
-
- dependencies = {
- None if f is None else f.apply_wildcards(wildcards): rule
- for f, rule in self.dependencies.items()
- }
-
- ruleio.update(dict((f, f_) for f, f_ in zip(output, self.output)))
+ self._apply_wildcards(params, self.params, wildcards,
+ concretize=concretize_param,
+ check_return_type=False,
+ no_flattening=True,
+ aux_params={"input": input,
+ "resources": resources})
+ except WildcardError as e:
+ raise WorkflowError(
+ "Wildcards in params cannot be "
+ "determined from output files:",
+ str(e), rule=self)
+ return params
+
+
+ def expand_output(self, wildcards):
+ output = OutputFiles(o.apply_wildcards(wildcards)
+ for o in self.output)
+ output.take_names(self.output.get_names())
+ mapping = {f: f_ for f, f_ in zip(output, self.output)}
+
+ for f in output:
+ f.check()
+
+ return output, mapping
+
+
+ def expand_log(self, wildcards):
+ def concretize_logfile(f, wildcards):
+ if not isinstance(f, _IOFile):
+ return IOFile(f, rule=self)
+ else:
+ return f.apply_wildcards(wildcards,
+ fill_missing=False,
+ fail_dynamic=self.dynamic_output)
- log = Log()
- _apply_wildcards(log, self.log, wildcards, wildcards_obj,
- concretize=concretize_iofile)
+ log = Log()
+ try:
+ self._apply_wildcards(log,
+ self.log,
+ wildcards,
+ concretize=concretize_logfile)
+ except WildcardError as e:
+ raise WorkflowError(
+ "Wildcards in log files cannot be "
+ "determined from output files:",
+ str(e), rule=self)
+
+ for f in log:
+ f.check()
+
+ return log
+
+ def expand_benchmark(self, wildcards):
+ try:
benchmark = self.benchmark.apply_wildcards(
wildcards) if self.benchmark else None
- return input, output, params, log, benchmark, ruleio, dependencies
- except WildcardError as ex:
- # this can only happen if an input contains an unresolved wildcard.
- raise RuleException(
- "Wildcards in input, params, log or benchmark file of rule {} cannot be "
- "determined from output files:\n{}".format(self, str(ex)),
- lineno=self.lineno,
- snakefile=self.snakefile)
+ except WildcardError as e:
+ raise WorkflowError(
+ "Wildcards in benchmark file cannot be "
+ "determined from output files:",
+ str(e), rule=self)
+
+ if benchmark is not None:
+ benchmark.check()
+
+ return benchmark
+
+ def expand_resources(self, wildcards, input):
+ resources = dict()
+ for name, res in self.resources.items():
+ if callable(res):
+ res = self.apply_input_function(res,
+ wildcards,
+ input=input)
+ if not isinstance(res, int):
+ raise WorkflowError("Resources function did not return int.")
+ res = min(self.workflow.global_resources.get(name, res), res)
+ resources[name] = res
+ resources = Resources(fromdict=resources)
+ return resources
def is_producer(self, requested_output):
"""
@@ -470,6 +567,7 @@ class Rule:
if not bestmatch or bestmatchlen > l:
bestmatch = match.groupdict()
bestmatchlen = l
+ self.check_wildcards(bestmatch)
return bestmatch
@staticmethod
@@ -497,7 +595,7 @@ class Rule:
return self.name.__hash__()
def __eq__(self, other):
- return self.name == other.name
+ return self.name == other.name and self.output == other.output
class Ruleorder:
diff --git a/snakemake/scheduler.py b/snakemake/scheduler.py
index fa7f58f..246b605 100644
--- a/snakemake/scheduler.py
+++ b/snakemake/scheduler.py
@@ -43,7 +43,8 @@ class JobScheduler:
max_jobs_per_second=None,
latency_wait=3,
benchmark_repeats=1,
- greediness=1.0):
+ greediness=1.0,
+ force_use_threads=False):
""" Create a new instance of KnapsackJobScheduler. """
self.cluster = cluster
self.cluster_config = cluster_config
@@ -60,13 +61,16 @@ class JobScheduler:
self.resources = dict(self.workflow.global_resources)
- use_threads = os.name != "posix"
+ # we should use threads on a cluster, because shared memory /dev/shm may be full
+ # which prevents the multiprocessing.Lock() semaphore from being created
+ use_threads = force_use_threads or (os.name != "posix") or cluster or cluster_sync or drmaa
if not use_threads:
self._open_jobs = multiprocessing.Event()
self._lock = multiprocessing.Lock()
else:
self._open_jobs = threading.Event()
self._lock = threading.Lock()
+
self._errors = False
self._finished = False
self._job_queue = None
@@ -275,6 +279,7 @@ class JobScheduler:
def _error(self, job):
""" Clear jobs and stop the workflow. """
+ # TODO count down number of retries in job. If not zero, reschedule instead of failing.
with self._lock:
self._errors = True
self.running.remove(job)
@@ -350,7 +355,7 @@ Problem", Akcay, Li, Xu, Annals of Operations Research, 2012
for name in self.workflow.global_resources]
def job_weight(self, job):
- res = job.resources_dict
+ res = job.resources
return [self.calc_resource(name, res.get(name, 0))
for name in self.workflow.global_resources]
diff --git a/snakemake/script.py b/snakemake/script.py
index 317f128..3ff3d05 100644
--- a/snakemake/script.py
+++ b/snakemake/script.py
@@ -83,6 +83,55 @@ class Snakemake:
self.log = log
self.config = config
+ def log_fmt_shell(self, stdout=True, stderr=True, append=False):
+ """
+ Return a shell redirection string to be used in `shell()` calls
+
+ This function allows scripts and wrappers support optional `log` files
+ specified in the calling rule. If no `log` was specified, then an
+ empty string "" is returned, regardless of the values of `stdout`,
+ `stderr`, and `append`.
+
+ Parameters
+ ---------
+
+ stdout : bool
+ Send stdout to log
+
+ stderr : bool
+ Send stderr to log
+
+ append : bool
+ Do not overwrite the log file. Useful for sending output of
+ multiple commands to the same log. Note however that the log will
+ not be truncated at the start.
+
+ The following table describes the output:
+
+ -------- -------- -------- ----- -------------
+ stdout stderr append log return value
+ -------- -------- -------- ----- ------------
+ True True True fn >> fn 2>&1
+ True False True fn >> fn
+ False True True fn 2>> fn
+ True True False fn > fn 2>&1
+ True False False fn > fn
+ False True False fn 2> fn
+ any any any None ""
+ -------- -------- -------- ----- -----------
+ """
+ if not self.log:
+ return ""
+ lookup = {
+ (True, True, True): " >> {0} 2>&1",
+ (True, False, True): " >> {0}",
+ (False, True, True): " 2>> {0}",
+ (True, True, False): " > {0} 2>&1",
+ (True, False, False): " > {0}",
+ (False, True, False): " 2> {0}",
+ }
+ return lookup[(stdout, stderr, append)].format(self.log)
+
def script(basedir, path, input, output, params, wildcards, threads, resources,
log, config):
diff --git a/snakemake/stats.py b/snakemake/stats.py
index 96292c9..80621b8 100644
--- a/snakemake/stats.py
+++ b/snakemake/stats.py
@@ -64,7 +64,7 @@ class Stats:
"priority": job.priority
if job.priority != snakemake.jobs.Job.HIGHEST_PRIORITY else
"highest",
- "resources": job.resources_dict
+ "resources": dict(job.resources.items())
}
for f, start, stop, duration, job in self.file_stats
}
diff --git a/snakemake/utils.py b/snakemake/utils.py
index daf742e..47fb2f9 100644
--- a/snakemake/utils.py
+++ b/snakemake/utils.py
@@ -12,6 +12,8 @@ import textwrap
from itertools import chain
from collections import Mapping
import multiprocessing
+import string
+import shlex
from snakemake.io import regex, Namedlist
from snakemake.logging import logger
@@ -149,22 +151,89 @@ def R(code):
robjects.r(format(textwrap.dedent(code), stepout=2))
-def format(_pattern, *args, stepout=1, **kwargs):
- """Format a pattern in Snakemake style.
+class SequenceFormatter(string.Formatter):
+ """string.Formatter subclass with special behavior for sequences.
+
+ This class delegates formatting of individual elements to another
+ formatter object. Non-list objects are formatted by calling the
+ delegate formatter's "format_field" method. List-like objects
+ (list, tuple, set, frozenset) are formatted by formatting each
+ element of the list according to the specified format spec using
+ the delegate formatter and then joining the resulting strings with
+ a separator (space by default).
+
+ """
+ def __init__(self, separator=" ", element_formatter=string.Formatter(),
+ *args, **kwargs):
+ self.separator = separator
+ self.element_formatter = element_formatter
+
+ def format_element(self, elem, format_spec):
+ """Format a single element
+
+ For sequences, this is called once for each element in a
+ sequence. For anything else, it is called on the entire
+ object. It is intended to be overridden in subclases.
+
+ """
+ return self.element_formatter.format_field(elem, format_spec)
+
+ def format_field(self, value, format_spec):
+ if isinstance(value, (list, tuple, set, frozenset)):
+ return self.separator.join(self.format_element(v, format_spec) for v in value)
+ else:
+ return self.format_element(value, format_spec)
+
+
+class QuotedFormatter(string.Formatter):
+ """Subclass of string.Formatter that supports quoting.
+
+ Using this formatter, any field can be quoted after formatting by
+ appending "q" to its format string. By default, shell quoting is
+ performed using "shlex.quote", but you can pass a different
+ quote_func to the constructor. The quote_func simply has to take a
+ string argument and return a new string representing the quoted
+ form of the input string.
+
+ Note that if an element after formatting is the empty string, it
+ will not be quoted.
- This means that keywords embedded in braces are replaced by any variable
- values that are available in the current namespace.
"""
- class SequenceFormatter:
- def __init__(self, sequence):
- self._sequence = sequence
+ def __init__(self, quote_func=shlex.quote, *args, **kwargs):
+ self.quote_func = quote_func
+ super().__init__(*args, **kwargs)
- def __getitem__(self, i):
- return self._sequence[i]
+ def format_field(self, value, format_spec):
+ do_quote = format_spec.endswith("q")
+ if do_quote:
+ format_spec = format_spec[:-1]
+ formatted = super().format_field(value, format_spec)
+ if do_quote and formatted != '':
+ formatted = self.quote_func(formatted)
+ return formatted
- def __str__(self):
- return " ".join(self._sequence)
+
+class AlwaysQuotedFormatter(QuotedFormatter):
+ """Subclass of QuotedFormatter that always quotes.
+
+ Usage is identical to QuotedFormatter, except that it *always*
+ acts like "q" was appended to the format spec.
+
+ """
+
+ def format_field(self, value, format_spec):
+ if not format_spec.endswith("q"):
+ format_spec += "q"
+ return super().format_field(value, format_spec)
+
+
+def format(_pattern, *args, stepout=1, _quote_all=False, **kwargs):
+ """Format a pattern in Snakemake style.
+
+ This means that keywords embedded in braces are replaced by any variable
+ values that are available in the current namespace.
+ """
frame = inspect.currentframe().f_back
while stepout > 1:
@@ -176,12 +245,18 @@ def format(_pattern, *args, stepout=1, **kwargs):
variables = dict(frame.f_globals)
# add local variables from calling rule/function
variables.update(frame.f_locals)
+ if "self" in variables:
+ # self is the first arg of fmt.format as well. Not removing it would
+ # cause a multiple values error on Python <=3.4.2.
+ del variables["self"]
variables.update(kwargs)
- for key, value in list(variables.items()):
- if type(value) in (list, tuple, set, frozenset):
- variables[key] = SequenceFormatter(value)
+ fmt = SequenceFormatter(separator=" ")
+ if _quote_all:
+ fmt.element_formatter = AlwaysQuotedFormatter()
+ else:
+ fmt.element_formatter = QuotedFormatter()
try:
- return _pattern.format(*args, **variables)
+ return fmt.format(_pattern, *args, **variables)
except KeyError as ex:
raise NameError("The name {} is unknown in this context. Please "
"make sure that you defined that variable. "
@@ -278,7 +353,7 @@ def available_cpu_count():
if m:
res = bin(int(m.group(1).replace(',', ''), 16)).count('1')
if res > 0:
- return res
+ return min(res, multiprocessing.cpu_count())
except IOError:
pass
diff --git a/snakemake/version.py b/snakemake/version.py
index 975f691..2ae7a96 100644
--- a/snakemake/version.py
+++ b/snakemake/version.py
@@ -1 +1 @@
-__version__ = "3.7.1"
+__version__ = "3.8.2"
diff --git a/snakemake/workflow.py b/snakemake/workflow.py
index 3cee989..8d78ceb 100644
--- a/snakemake/workflow.py
+++ b/snakemake/workflow.py
@@ -71,6 +71,7 @@ class Workflow:
self._onsuccess = lambda log: None
self._onerror = lambda log: None
self._onstart = lambda log: None
+ self._wildcard_constraints = dict()
self.debug = debug
self._rulecount = 0
@@ -211,10 +212,12 @@ class Workflow:
updated_files=None,
keep_target_files=False,
keep_shadow=False,
+ keep_remote_local=False,
allowed_rules=None,
max_jobs_per_second=None,
greediness=1.0,
- no_hooks=False):
+ no_hooks=False,
+ force_use_threads=False):
self.global_resources = dict() if resources is None else resources
self.global_resources["_cores"] = cores
@@ -291,7 +294,8 @@ class Workflow:
ignore_ambiguity=ignore_ambiguity,
force_incomplete=force_incomplete,
ignore_incomplete=ignore_incomplete or printdag or printrulegraph,
- notemp=notemp)
+ notemp=notemp,
+ keep_remote_local=keep_remote_local)
self.persistence = Persistence(
nolock=nolock,
@@ -344,6 +348,7 @@ class Workflow:
if not subsnakemake(subworkflow.snakefile,
workdir=subworkflow.workdir,
targets=subworkflow_targets,
+ configfile=subworkflow.configfile,
updated_files=updated):
return False
dag.updated_subworkflow_files.update(subworkflow.target(f)
@@ -432,7 +437,8 @@ class Workflow:
printshellcmds=printshellcmds,
latency_wait=latency_wait,
benchmark_repeats=benchmark_repeats,
- greediness=greediness)
+ greediness=greediness,
+ force_use_threads=force_use_threads)
if not dryrun and not quiet:
if len(dag):
@@ -447,8 +453,9 @@ class Workflow:
logger.resources_info(
"Provided resources: " + provided_resources)
ignored_resources = format_resource_names(
- set(resource for job in dag.needrun_jobs for resource in
- job.resources_dict if resource not in resources))
+ resource for job in dag.needrun_jobs
+ for resource in job.resources.keys()
+ if resource not in resources)
if ignored_resources:
logger.resources_info(
"Ignored resources: " + ignored_resources)
@@ -533,6 +540,9 @@ class Workflow:
def onerror(self, func):
self._onerror = func
+ def global_wildcard_constraints(self, **content):
+ self._wildcard_constraints.update(content)
+
def workdir(self, workdir):
if self.overwrite_workdir is None:
os.makedirs(workdir, exist_ok=True)
@@ -549,8 +559,8 @@ class Workflow:
def ruleorder(self, *rulenames):
self._ruleorder.add(*rulenames)
- def subworkflow(self, name, snakefile=None, workdir=None):
- sw = Subworkflow(self, name, snakefile, workdir)
+ def subworkflow(self, name, snakefile=None, workdir=None, configfile=None):
+ sw = Subworkflow(self, name, snakefile, workdir, configfile)
self._subworkflows[name] = sw
self.globals[name] = sw.target
@@ -562,6 +572,8 @@ class Workflow:
rule = self.get_rule(name)
def decorate(ruleinfo):
+ if ruleinfo.wildcard_constraints:
+ rule.set_wildcard_constraints(*ruleinfo.wildcard_constraints[0], **ruleinfo.wildcard_constraints[1])
if ruleinfo.input:
rule.set_input(*ruleinfo.input[0], **ruleinfo.input[1])
if ruleinfo.output:
@@ -645,6 +657,13 @@ class Workflow:
return decorate
+ def wildcard_constraints(self, *wildcard_constraints, **kwwildcard_constraints):
+ def decorate(ruleinfo):
+ ruleinfo.wildcard_constraints = (wildcard_constraints, kwwildcard_constraints)
+ return ruleinfo
+
+ return decorate
+
def message(self, message):
def decorate(ruleinfo):
ruleinfo.message = message
@@ -733,6 +752,7 @@ class RuleInfo:
self.params = None
self.message = None
self.benchmark = None
+ self.wildcard_constraints = None
self.threads = None
self.shadow_depth = None
self.resources = None
@@ -743,11 +763,12 @@ class RuleInfo:
class Subworkflow:
- def __init__(self, workflow, name, snakefile, workdir):
+ def __init__(self, workflow, name, snakefile, workdir, configfile):
self.workflow = workflow
self.name = name
self._snakefile = snakefile
self._workdir = workdir
+ self.configfile = configfile
@property
def snakefile(self):
diff --git a/tests/test_deferred_func_eval/Snakefile b/tests/test_deferred_func_eval/Snakefile
new file mode 100644
index 0000000..0c346d2
--- /dev/null
+++ b/tests/test_deferred_func_eval/Snakefile
@@ -0,0 +1,22 @@
+import os
+
+
+def get_mem_gb(wildcards, input):
+ size = 0
+ if os.path.exists(input[0]):
+ size = int(os.path.getsize(input[0]) / (1024 ** 3))
+ return max(size, 1)
+
+
+rule a:
+ input:
+ "test1.in",
+ "test2.in"
+ output:
+ "test.out"
+ params:
+ a=lambda wildcards, input, resources: "+".join(input)
+ resources:
+ mem_gb=get_mem_gb
+ shell:
+ "echo {params.a} > {output}"
diff --git a/tests/test_deferred_func_eval/expected-results/test.out b/tests/test_deferred_func_eval/expected-results/test.out
new file mode 100644
index 0000000..5e8a7f5
--- /dev/null
+++ b/tests/test_deferred_func_eval/expected-results/test.out
@@ -0,0 +1 @@
+test1.in+test2.in
diff --git a/tests/test_deferred_func_eval/test1.in b/tests/test_deferred_func_eval/test1.in
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_deferred_func_eval/test2.in b/tests/test_deferred_func_eval/test2.in
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_delete_output/Snakefile b/tests/test_delete_output/Snakefile
new file mode 100644
index 0000000..b805f43
--- /dev/null
+++ b/tests/test_delete_output/Snakefile
@@ -0,0 +1,23 @@
+# Having decided to address bug #300, I need a test.
+#
+# The output should be deleted before the job starts.
+# The output should be deleted on the the head node for cluster jobs.
+# The path for the output should be created (as it already is)
+# The output should be touch'd on the head node to always be new.
+#
+# This test should work without any cluster. Do I need a special test for
+# the cluster case? Not sure.
+
+# Setup - touch a fake input file and a fake output file.
+
+shell("touch -t 201604010000 output.file")
+shell("touch input.file")
+
+rule main:
+ input: "output.file"
+
+rule make_the_file:
+ output: "output.file", "foo/output.foo.file"
+ input: "input.file"
+ # Rule fails if any output.file is already present
+ shell: "test ! -e output.file ; test -d foo ; test ! -e foo/* ; touch -t 201604010000 output.file ; touch foo/output.foo.file"
diff --git a/tests/test_delete_output/expected-results/foo/output.foo.file b/tests/test_delete_output/expected-results/foo/output.foo.file
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_delete_output/expected-results/output.file b/tests/test_delete_output/expected-results/output.file
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_format_params/Snakefile b/tests/test_format_params/Snakefile
new file mode 100644
index 0000000..3255d31
--- /dev/null
+++ b/tests/test_format_params/Snakefile
@@ -0,0 +1,5 @@
+rule a:
+ output: "test.out"
+ params: [1, 2, 3]
+ shell:
+ "echo {params[0]} > {output}"
diff --git a/tests/test_format_params/expected-results/test.out b/tests/test_format_params/expected-results/test.out
new file mode 100644
index 0000000..b85905e
--- /dev/null
+++ b/tests/test_format_params/expected-results/test.out
@@ -0,0 +1 @@
+1 2 3
diff --git a/tests/test_get_log_both/Snakefile b/tests/test_get_log_both/Snakefile
new file mode 100644
index 0000000..e2129ee
--- /dev/null
+++ b/tests/test_get_log_both/Snakefile
@@ -0,0 +1,6 @@
+rule:
+ input: "test.in"
+ output: "test.out"
+ log: "test.log"
+ wrapper:
+ 'file://wrapper.py'
diff --git a/tests/test_get_log_both/expected-results/test.log b/tests/test_get_log_both/expected-results/test.log
new file mode 100644
index 0000000..127ba51
--- /dev/null
+++ b/tests/test_get_log_both/expected-results/test.log
@@ -0,0 +1,2 @@
+a stderr message
+a stdout message
diff --git a/tests/test_get_log_both/expected-results/test.out b/tests/test_get_log_both/expected-results/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_both/expected-results/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_both/test.in b/tests/test_get_log_both/test.in
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_both/test.in
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_both/test.out b/tests/test_get_log_both/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_both/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_both/wrapper.py b/tests/test_get_log_both/wrapper.py
new file mode 100644
index 0000000..22e04ed
--- /dev/null
+++ b/tests/test_get_log_both/wrapper.py
@@ -0,0 +1,7 @@
+from snakemake.shell import shell
+log = snakemake.log_fmt_shell(append=True)
+shell('''
+ cat {snakemake.input} > {snakemake.output}
+ (>&2 echo "a stderr message") {log}
+ (echo "a stdout message") {log}
+ ''')
diff --git a/tests/test_get_log_complex/Snakefile b/tests/test_get_log_complex/Snakefile
new file mode 100644
index 0000000..e2129ee
--- /dev/null
+++ b/tests/test_get_log_complex/Snakefile
@@ -0,0 +1,6 @@
+rule:
+ input: "test.in"
+ output: "test.out"
+ log: "test.log"
+ wrapper:
+ 'file://wrapper.py'
diff --git a/tests/test_get_log_complex/expected-results/test.log b/tests/test_get_log_complex/expected-results/test.log
new file mode 100644
index 0000000..8939eaf
--- /dev/null
+++ b/tests/test_get_log_complex/expected-results/test.log
@@ -0,0 +1,3 @@
+first line
+a stderr message
+a stdout message
diff --git a/tests/test_get_log_complex/expected-results/test.out b/tests/test_get_log_complex/expected-results/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_complex/expected-results/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_complex/test.in b/tests/test_get_log_complex/test.in
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_complex/test.in
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_complex/test.out b/tests/test_get_log_complex/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_complex/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_complex/wrapper.py b/tests/test_get_log_complex/wrapper.py
new file mode 100644
index 0000000..69bb805
--- /dev/null
+++ b/tests/test_get_log_complex/wrapper.py
@@ -0,0 +1,11 @@
+from snakemake.shell import shell
+initial_log = snakemake.log_fmt_shell()
+stdout_log = snakemake.log_fmt_shell(stderr=False, append=True)
+stderr_log = snakemake.log_fmt_shell(stdout=False, append=True)
+shell('''
+ cat {snakemake.input} > {snakemake.output}
+ echo "should not appear since next line truncates" {initial_log}
+ echo "first line" {initial_log}
+ (>&2 echo "a stderr message") {stderr_log}
+ (echo "a stdout message") {stdout_log}
+ ''')
diff --git a/tests/test_get_log_none/Snakefile b/tests/test_get_log_none/Snakefile
new file mode 100644
index 0000000..441bd7b
--- /dev/null
+++ b/tests/test_get_log_none/Snakefile
@@ -0,0 +1,5 @@
+rule:
+ input: "test.in"
+ output: "test.out"
+ wrapper:
+ 'file://wrapper.py'
diff --git a/tests/test_get_log_none/expected-results/test.out b/tests/test_get_log_none/expected-results/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_none/expected-results/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_none/test.in b/tests/test_get_log_none/test.in
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_none/test.in
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_none/wrapper.py b/tests/test_get_log_none/wrapper.py
new file mode 100644
index 0000000..4cf7eee
--- /dev/null
+++ b/tests/test_get_log_none/wrapper.py
@@ -0,0 +1,7 @@
+from snakemake.shell import shell
+log = snakemake.log_fmt_shell(stdout=False)
+shell('''
+ cat {snakemake.input} > {snakemake.output}
+ (>&2 echo "a stderr message") {log}
+ (echo "a stdout message") {log}
+ ''')
diff --git a/tests/test_get_log_stderr/Snakefile b/tests/test_get_log_stderr/Snakefile
new file mode 100644
index 0000000..e2129ee
--- /dev/null
+++ b/tests/test_get_log_stderr/Snakefile
@@ -0,0 +1,6 @@
+rule:
+ input: "test.in"
+ output: "test.out"
+ log: "test.log"
+ wrapper:
+ 'file://wrapper.py'
diff --git a/tests/test_get_log_stderr/expected-results/test.log b/tests/test_get_log_stderr/expected-results/test.log
new file mode 100644
index 0000000..8faf40a
--- /dev/null
+++ b/tests/test_get_log_stderr/expected-results/test.log
@@ -0,0 +1 @@
+a stderr message
diff --git a/tests/test_get_log_stderr/expected-results/test.out b/tests/test_get_log_stderr/expected-results/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stderr/expected-results/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stderr/test.in b/tests/test_get_log_stderr/test.in
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stderr/test.in
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stderr/test.out b/tests/test_get_log_stderr/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stderr/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stderr/wrapper.py b/tests/test_get_log_stderr/wrapper.py
new file mode 100644
index 0000000..d06b047
--- /dev/null
+++ b/tests/test_get_log_stderr/wrapper.py
@@ -0,0 +1,7 @@
+from snakemake.shell import shell
+log = snakemake.log_fmt_shell(stdout=False, append=True)
+shell('''
+ cat {snakemake.input} > {snakemake.output}
+ (>&2 echo "a stderr message") {log}
+ (echo "a stdout message") {log}
+ ''')
diff --git a/tests/test_get_log_stdout/Snakefile b/tests/test_get_log_stdout/Snakefile
new file mode 100644
index 0000000..e2129ee
--- /dev/null
+++ b/tests/test_get_log_stdout/Snakefile
@@ -0,0 +1,6 @@
+rule:
+ input: "test.in"
+ output: "test.out"
+ log: "test.log"
+ wrapper:
+ 'file://wrapper.py'
diff --git a/tests/test_get_log_stdout/expected-results/test.log b/tests/test_get_log_stdout/expected-results/test.log
new file mode 100644
index 0000000..1407dfa
--- /dev/null
+++ b/tests/test_get_log_stdout/expected-results/test.log
@@ -0,0 +1 @@
+a stdout message
diff --git a/tests/test_get_log_stdout/expected-results/test.out b/tests/test_get_log_stdout/expected-results/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stdout/expected-results/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stdout/test.in b/tests/test_get_log_stdout/test.in
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stdout/test.in
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stdout/test.out b/tests/test_get_log_stdout/test.out
new file mode 100644
index 0000000..3bd1f0e
--- /dev/null
+++ b/tests/test_get_log_stdout/test.out
@@ -0,0 +1,2 @@
+foo
+bar
diff --git a/tests/test_get_log_stdout/wrapper.py b/tests/test_get_log_stdout/wrapper.py
new file mode 100644
index 0000000..2ff4d3d
--- /dev/null
+++ b/tests/test_get_log_stdout/wrapper.py
@@ -0,0 +1,7 @@
+from snakemake.shell import shell
+log = snakemake.log_fmt_shell(stderr=False, append=True)
+shell('''
+ cat {snakemake.input} > {snakemake.output}
+ (>&2 echo "a stderr message") {log}
+ (echo "a stdout message") {log}
+ ''')
diff --git a/tests/test_io.py b/tests/test_io.py
new file mode 100644
index 0000000..6f22b11
--- /dev/null
+++ b/tests/test_io.py
@@ -0,0 +1,33 @@
+from snakemake.io import _wildcard_regex
+
+
+def test_wildcard_regex():
+ def matches(text):
+ return [ (match.group('name'), match.group('constraint'))
+ for match in _wildcard_regex.finditer(text) ]
+
+ # without constraints
+ assert matches('') == []
+ assert matches('{') == []
+ assert matches('}') == []
+ assert matches('{}') == []
+ assert matches('{0}') == [('0', None)]
+ assert matches('{abc}') == [('abc', None)]
+ assert matches('abc{def}{ghi}') == [('def', None), ('ghi', None)]
+
+ # with constraints
+ assert matches('{w,constraint}') == [('w', 'constraint')]
+ assert matches('{w , constraint}') == [('w', 'constraint')]
+ # fails because constraint is detected as 'constraint '
+ # assert matches('{w,constraint }') == [('w', 'constraint')]
+ assert matches('abc { w , constraint} def') == [('w', 'constraint')]
+
+ # multiple wildcards
+ assert matches('{a,1} {b,2} {c,3}') == [('a', '1'), ('b', '2'), ('c', '3')]
+
+ # more complicated constraints
+ assert matches(r'{w,([a-z]+|pat\|t*ern)}') == [('w', r'([a-z]+|pat\|t*ern)')]
+ assert matches(r'{w,([a-z]+|pat\|te{1,3}rn){5,7}}') == [('w', r'([a-z]+|pat\|te{1,3}rn){5,7}')]
+
+ # This used to be very slow with an older version of the regex
+ assert matches('{w, long constraint without closing brace') == []
diff --git a/tests/test_issue328/Snakefile b/tests/test_issue328/Snakefile
new file mode 100644
index 0000000..a748035
--- /dev/null
+++ b/tests/test_issue328/Snakefile
@@ -0,0 +1,26 @@
+from pytools.persistent_dict import PersistentDict
+
+
+storage = PersistentDict("storage")
+
+
+rule merge:
+ input: dynamic("in_{sample}.txt")
+ output: "out.txt"
+ shell:
+ "touch {output}"
+
+
+storage["split_counter"] = 0
+
+
+rule split:
+ input: "in.txt"
+ output: dynamic("in_{sample}.txt")
+ run:
+ if storage["split_counter"] > 0:
+ raise Exception("Rule split was executed multiple times.")
+ storage["split_counter"] += 1
+ shell("touch in_1.txt in_2.txt")
+
+
diff --git a/tests/test_issue328/expected-results/out.txt b/tests/test_issue328/expected-results/out.txt
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_issue328/in.txt b/tests/test_issue328/in.txt
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_remote_sftp/Snakefile b/tests/test_remote_sftp/Snakefile
new file mode 100644
index 0000000..cf95f80
--- /dev/null
+++ b/tests/test_remote_sftp/Snakefile
@@ -0,0 +1,10 @@
+from snakemake.remote.SFTP import RemoteProvider
+SFTP = RemoteProvider(username="demo", password="password")
+
+rule a:
+ input:
+ SFTP.remote("test.rebex.net/readme.txt")
+ output:
+ "readme.txt"
+ shell:
+ "cp {input} {output}"
diff --git a/tests/test_remote_sftp/expected-results/readme.txt b/tests/test_remote_sftp/expected-results/readme.txt
new file mode 100644
index 0000000..2afbaef
--- /dev/null
+++ b/tests/test_remote_sftp/expected-results/readme.txt
@@ -0,0 +1,10 @@
+Welcome,
+
+you are connected to an FTP or SFTP server used for testing purposes by Rebex FTP/SSL or Rebex SFTP sample code.
+Only read access is allowed and the FTP download speed is limited to 16KBps.
+
+For infomation about Rebex FTP/SSL, Rebex SFTP and other Rebex .NET components, please visit our website at http://www.rebex.net/
+
+For feedback and support, contact support at rebex.net
+
+Thanks!
diff --git a/tests/test_shadow/Snakefile b/tests/test_shadow/Snakefile
index 425f4a0..78421c9 100644
--- a/tests/test_shadow/Snakefile
+++ b/tests/test_shadow/Snakefile
@@ -1,5 +1,9 @@
rule all:
- input: ["simple_shallow.out", "simple_full.out"]
+ input: "simple_shallow.out", "simple_full.out",
+ "absolute_exists.out", "touched.out", "log_exists.out",
+ "protected.out", "benchmark_exists.out"
+
+shell.executable("/bin/bash")
rule shallow:
input: "test.in"
@@ -24,3 +28,65 @@ rule full:
echo simple_full >> {output}
test ! -f junk.out
"""
+
+rule absolute_link:
+ input: "test.in"
+ output: "absolute.out", "absolute_link.out"
+ shadow: "shallow"
+ shell:
+ """
+ echo 1 > absolute.out
+ DIRNAME=$(perl -e 'use Cwd "abs_path"; print abs_path(shift)' .)
+ ln -s $DIRNAME/absolute.out absolute_link.out
+ """
+
+# Snakemake has check_broken_symlink so we just need to test existence
+rule absolute_stat:
+ input: "absolute_link.out"
+ output: touch("absolute_exists.out")
+ shell:
+ """
+ test -f {input}
+ """
+
+rule log_stat:
+ input: "touched.out"
+ output: touch("log_exists.out")
+ shell:
+ """
+ test -f shadow.log
+ """
+
+rule touch:
+ output: touch("touched.out")
+ log: "shadow.log"
+ shadow: "shallow"
+ shell:
+ """
+ echo 1 > {log}
+ """
+
+rule protected:
+ output: protected("protected.out")
+ shadow: "shallow"
+ shell:
+ """
+ touch {output}
+ """
+
+rule benchmark:
+ output: touch("benchmark.out")
+ benchmark: "benchmark.txt"
+ shadow: "shallow"
+ shell:
+ """
+ touch {output}
+ """
+
+rule benchmark_stat:
+ input: "benchmark.out"
+ output: touch("benchmark_exists.out")
+ shell:
+ """
+ test -f benchmark.txt
+ """
diff --git a/tests/test_spaces_in_fnames/Snakefile b/tests/test_spaces_in_fnames/Snakefile
new file mode 100644
index 0000000..36a3e60
--- /dev/null
+++ b/tests/test_spaces_in_fnames/Snakefile
@@ -0,0 +1,21 @@
+rule indelrealigner:
+ input: bam="{base}.bam",intervals="{base}.intervals",bai='{base}.bam.bai'
+ output: bam=temp("{base} realigned.bam")
+ params: batch="-q ccr -l nodes=1:gpfs"
+ threads: 1
+ shell: "touch {output:q}"
+
+rule realignertargetcreator:
+ input: "{base}.bam", "{base}.bam.bai"
+ output: temp("{base}.intervals")
+ params: batch="-q ccr -l nodes=1:gpfs"
+ threads: 32
+ shell: "touch {output:q}"
+
+
+rule indexbam:
+ threads: 1
+ input: '{base}.bam'
+ output: temp('{base}.bam.bai')
+ params: batch="-q ccr -l nodes=1:gpfs"
+ shell: 'touch {output:q}'
diff --git a/tests/test_spaces_in_fnames/expected-results/test bam file realigned.bam b/tests/test_spaces_in_fnames/expected-results/test bam file realigned.bam
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_spaces_in_fnames/qsub b/tests/test_spaces_in_fnames/qsub
new file mode 100755
index 0000000..63a46f9
--- /dev/null
+++ b/tests/test_spaces_in_fnames/qsub
@@ -0,0 +1,6 @@
+#!/bin/bash
+echo `date` >> qsub.log
+tail -n1 $1 >> qsub.log
+# simulate printing of job id by a random number
+echo $RANDOM
+$1
diff --git a/tests/test_spaces_in_fnames/test bam file.bam b/tests/test_spaces_in_fnames/test bam file.bam
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_static_remote/S3MockedForStaticTest.py b/tests/test_static_remote/S3MockedForStaticTest.py
new file mode 100644
index 0000000..3042753
--- /dev/null
+++ b/tests/test_static_remote/S3MockedForStaticTest.py
@@ -0,0 +1,141 @@
+__author__ = "Christopher Tomkins-Tinch"
+__copyright__ = "Copyright 2015, Christopher Tomkins-Tinch"
+__email__ = "tomkinsc at broadinstitute.org"
+__license__ = "MIT"
+
+# built-ins
+import os, sys
+from contextlib import contextmanager
+import pickle
+import time
+import threading
+import functools
+
+# intra-module
+from snakemake.remote.S3 import RemoteObject as S3RemoteObject, RemoteProvider as S3RemoteProvider
+from snakemake.remote.S3 import S3Helper
+from snakemake.decorators import dec_all_methods
+from snakemake.exceptions import WorkflowError
+from snakemake.logging import logger
+
+try:
+ # third-party
+ import boto
+ from moto import mock_s3
+ import filechunkio
+except ImportError as e:
+ raise WorkflowError("The Python 3 packages 'moto', boto' and 'filechunkio' " +
+ "need to be installed to use S3Mocked remote() file functionality. %s" % e.msg)
+
+def noop():
+ pass
+
+def pickled_moto_wrapper(func):
+ """
+ This is a class decorator that in turn decorates all methods within
+ a class to mock out boto calls with moto-simulated ones.
+ Since the moto backends are not presistent across calls by default,
+ the wrapper also pickles the bucket state after each function call,
+ and restores it before execution. This way uploaded files are available
+ for follow-on tasks. Since snakemake may execute with multiple threads
+ it also waits for the pickled bucket state file to be available before
+ loading it in. This is a hackey alternative to using proper locks,
+ but works ok in practice.
+ """
+ def wrapper_func(self, *args, **kwargs):
+ moto_context_file = "motoState.p"
+
+ moto_context = mock_s3()
+ moto_context.start()
+
+ moto_context.backends["global"].reset = noop
+
+ # load moto buckets from pickle
+ if os.path.isfile(moto_context_file) and os.path.getsize(moto_context_file) > 0:
+ with file_lock(moto_context_file):
+ with open( moto_context_file, "rb" ) as f:
+ moto_context.backends["global"].buckets = pickle.load( f )
+
+ mocked_function = moto_context(func)
+ retval = mocked_function(self, *args, **kwargs)
+
+ with file_lock(moto_context_file):
+ with open( moto_context_file, "wb" ) as f:
+ pickle.dump(moto_context.backends["global"].buckets, f)
+
+ moto_context.stop()
+
+ return retval
+ functools.update_wrapper(wrapper_func, func)
+ wrapper_func.__wrapped__ = func
+ return wrapper_func
+
+ at dec_all_methods(pickled_moto_wrapper, prefix=None)
+class RemoteProvider(S3RemoteProvider):
+ def __init__(self, *args, **kwargs):
+ super(RemoteProvider, self).__init__(*args, **kwargs)
+
+ at dec_all_methods(pickled_moto_wrapper, prefix=None)
+class RemoteObject(S3RemoteObject):
+ """
+ This is a derivative of the S3 remote provider that mocks
+ out boto-based S3 calls using the "moto" Python package.
+ Only the initializer is different; it "uploads" the input
+ test file to the moto-simulated bucket at the start.
+ """
+
+ def __init__(self, *args, keep_local=False, provider=None, **kwargs):
+ super(RemoteObject, self).__init__(*args, keep_local=keep_local, provider=provider, **kwargs)
+
+ bucket_name = 'test-static-remote-bucket'
+ test_files = ('test.txt', 'out1.txt', 'out2.txt')
+
+ conn = boto.connect_s3()
+ if bucket_name not in [b.name for b in conn.get_all_buckets()]:
+ conn.create_bucket(bucket_name)
+
+ # "Upload" files that should be in S3 before tests...
+ s3c = S3Helper()
+ for test_file in test_files:
+ if not s3c.exists_in_bucket(bucket_name, test_file):
+ logger.debug("Pre-populating remote bucket {} with file {}".format(
+ bucket_name, test_file))
+ s3c.upload_to_s3(bucket_name, test_file)
+
+ def mtime(self):
+ if self.s3_key == 'test.txt':
+ logger.debug("test.txt is new")
+ return float('inf')
+ elif self.s3_key.startswith('out'):
+ logger.debug("{} is old".format(self.s3_key))
+ # For some reason this breaks if you return 0
+ return 5
+ else:
+ logger.debug("Using real mtime for {}".format(self.s3_key))
+ return super().mtime()
+
+
+# ====== Helpers =====
+
+def touch(fname, mode=0o666, dir_fd=None, **kwargs):
+ # create lock file faster
+ # https://stackoverflow.com/a/1160227
+ flags = os.O_CREAT | os.O_APPEND
+ with os.fdopen(os.open(fname, flags=flags, mode=mode, dir_fd=dir_fd)) as f:
+ os.utime(f.fileno() if os.utime in os.supports_fd else fname,
+ dir_fd=None if os.supports_fd else dir_fd, **kwargs)
+
+ at contextmanager
+def file_lock(filepath):
+ lock_file = filepath + ".lock"
+
+ while os.path.isfile(lock_file):
+ time.sleep(2)
+
+ touch(lock_file)
+
+ try:
+ yield
+ finally:
+ if os.path.isfile(lock_file):
+ os.remove(lock_file)
diff --git a/tests/test_static_remote/Snakefile b/tests/test_static_remote/Snakefile
new file mode 100644
index 0000000..267c759
--- /dev/null
+++ b/tests/test_static_remote/Snakefile
@@ -0,0 +1,42 @@
+#import re, os, sys
+
+from S3MockedForStaticTest import RemoteProvider as S3RemoteProvider
+
+S3 = S3RemoteProvider()
+
+# remote dynamic file test
+# This makes use of a special provider that mocks up S3 using the moto
+# library so that boto calls hit local "buckets"
+rule all:
+ input:
+ # only keeping the files so we can copy them out to the cwd
+ S3.remote("test-static-remote-bucket/out1.txt", keep_local=True),
+ S3.remote("test-static-remote-bucket/out2.txt", keep_local=True)
+ run:
+ shell("for f in {input}; do mv $f ./; done")
+
+# This rule should run
+rule run_no_static:
+ input: S3.remote('test-static-remote-bucket/test.txt', static=False)
+ output: S3.remote('test-static-remote-bucket/out1.txt'),
+ run:
+ shell('''echo '{input} > {output}'; cat {input} | tee {output}''')
+
+# This rule should NOT run because Snakemake should assume that the
+# input file is static and has not changed.
+rule run_static:
+ input: S3.remote('test-static-remote-bucket/test.txt', static=True)
+ output: S3.remote('test-static-remote-bucket/out2.txt'),
+ run:
+ shell('''echo '{input} > {output}'; cat {input} | tee {output}''')
+ raise Exception("This rule should never run")
+
+# after we finish, we need to remove the pickle storing
+# the local moto "buckets" so we are starting fresh
+# next time this test is run. This file is created by
+# the moto wrapper defined in S3Mocked.py
+onsuccess:
+ shell("rm ./motoState.p")
+
+onerror:
+ shell("rm ./motoState.p")
diff --git a/tests/test_static_remote/__init__.py b/tests/test_static_remote/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_static_remote/expected-results/out1.txt b/tests/test_static_remote/expected-results/out1.txt
new file mode 100644
index 0000000..46e14ed
--- /dev/null
+++ b/tests/test_static_remote/expected-results/out1.txt
@@ -0,0 +1 @@
+This file should overwrite out1.txt but not out2.txt.
diff --git a/tests/test_static_remote/expected-results/out2.txt b/tests/test_static_remote/expected-results/out2.txt
new file mode 100644
index 0000000..c7c839f
--- /dev/null
+++ b/tests/test_static_remote/expected-results/out2.txt
@@ -0,0 +1 @@
+This file should not be overwritten by the input.
diff --git a/tests/test_static_remote/out1.txt b/tests/test_static_remote/out1.txt
new file mode 100644
index 0000000..ae49950
--- /dev/null
+++ b/tests/test_static_remote/out1.txt
@@ -0,0 +1 @@
+This file should get overwritten.
diff --git a/tests/test_static_remote/out2.txt b/tests/test_static_remote/out2.txt
new file mode 100644
index 0000000..c7c839f
--- /dev/null
+++ b/tests/test_static_remote/out2.txt
@@ -0,0 +1 @@
+This file should not be overwritten by the input.
diff --git a/tests/test_static_remote/test.txt b/tests/test_static_remote/test.txt
new file mode 100644
index 0000000..46e14ed
--- /dev/null
+++ b/tests/test_static_remote/test.txt
@@ -0,0 +1 @@
+This file should overwrite out1.txt but not out2.txt.
diff --git a/tests/test_subworkflows/Snakefile b/tests/test_subworkflows/Snakefile
index 4eff116..35e112a 100644
--- a/tests/test_subworkflows/Snakefile
+++ b/tests/test_subworkflows/Snakefile
@@ -2,6 +2,7 @@ import os.path
subworkflow test02:
workdir: config["subworkdir"]
+ configfile: "subconfig.yaml" # test optional config files
rule:
input: "test.out"
diff --git a/tests/test_subworkflows/subconfig.yaml b/tests/test_subworkflows/subconfig.yaml
new file mode 100644
index 0000000..b4e5778
--- /dev/null
+++ b/tests/test_subworkflows/subconfig.yaml
@@ -0,0 +1,2 @@
+test:
+ "xy"
diff --git a/tests/test_symlink_time_handling/Snakefile b/tests/test_symlink_time_handling/Snakefile
new file mode 100644
index 0000000..c878ef1
--- /dev/null
+++ b/tests/test_symlink_time_handling/Snakefile
@@ -0,0 +1,68 @@
+# vim: ft=python
+"""Snakemake should work where the input/output of a rule is a symlink, no problem.
+ Here we test that updates to symlinks are recognised and that the timestamps
+ on symlinks are updated properly.
+
+ Unfortunately this does not work on some Python builds where
+ os.supports_follow_symlinks does not include utime(). This includes the Miniconda
+ build used on continuum.io. Therefore this test is an expected failure on those
+ systems.
+
+ In practise, the impact of the bug is low - lutime() on a link will just be a no-op
+ and users will see a warning message. Anyone for whom this is a real problem should
+ install a fully-working version of Python.
+"""
+
+"""Description of the test:
+
+ input_file is a file that is 1 hour old
+ input_link is a link to input file that is 4 hours old
+ output_link is a symlink to input_link that is 2 hours old
+
+ So output_link needs to be re-evaluated since the contents of input_file changed.
+
+ rule main outputs the time difference between inout and output in hours which
+ can be checked by Nose to ensure output_link gets touched by Snakemake but
+ neither input_file nor input_link are changed.
+"""
+
+import os
+
+import time
+def timestr(hr_delta=0):
+ return time.strftime("%Y%m%d%H%M",
+ time.localtime(time.time() - (hr_delta * 3600)))
+
+shell("touch -t {} input_file".format(timestr(1)))
+shell("ln -s input_file input_link")
+shell("touch -h -t {} input_link".format(timestr(4)))
+
+shell("ln -s input_link output_link")
+shell("touch -h -t {} output_link".format(timestr(2)))
+
+shell("ls -lR > /dev/stderr")
+
+rule main:
+ output: "time_diff.txt"
+ input: "output_link"
+ run:
+ time_diff1 = int( (
+ os.stat("output_link", follow_symlinks=False).st_mtime -
+ os.stat("input_link", follow_symlinks=False).st_mtime
+ ) / (60*60) )
+ time_diff2 = int( (
+ os.stat("output_link", follow_symlinks=False).st_mtime -
+ os.stat("input_file", follow_symlinks=False).st_mtime
+ ) / (60*60) )
+ # I expect the result "4 1"
+ shell("ls -lR > /dev/stderr")
+ shell("echo {time_diff1} {time_diff2} | tee time_diff.txt 2>&1")
+
+rule make_output:
+ output: "output_link"
+ input: "input_link"
+ run:
+ #shell("rm -f {output}") - no longer necessary
+ shell("ln -s {input} {output}")
+ #This next command should be undone by Snakemake which now touches all outpout
+ shell("touch -h -t 201604011600 {output}")
diff --git a/tests/test_symlink_time_handling/expected-results/time_diff.txt b/tests/test_symlink_time_handling/expected-results/time_diff.txt
new file mode 100644
index 0000000..b05461b
--- /dev/null
+++ b/tests/test_symlink_time_handling/expected-results/time_diff.txt
@@ -0,0 +1 @@
+4 1
diff --git a/tests/test_wildcard_keyword/Snakefile b/tests/test_wildcard_keyword/Snakefile
new file mode 100644
index 0000000..4e27c9e
--- /dev/null
+++ b/tests/test_wildcard_keyword/Snakefile
@@ -0,0 +1,35 @@
+wildcard_constraints:
+ foo='globalbar',
+ bar='out/{SM,[a-z]+}_{PU,[0-9]+}'
+
+rule all:
+ input: "stringbar.txt", "localbar.txt", "globalbar.txt"
+
+
+##: NB: setting output: "{foo}.in" only works for globalwildcard rule
+## since constraints will be set to "globalbar"
+rule infile:
+ output: temp("{foo,(globalbar|localbar|stringbar)}.in")
+ shell: "touch {output}"
+
+
+rule stringwildcard:
+ input: "{foo}.in"
+ output: "{foo,stringbar}.txt"
+ log: "{foo}.log"
+ shell: "echo {input} {output} {log} > {output}; touch {log}"
+
+
+rule localwildcard:
+ input: "{foo}.in"
+ output: "{foo}.txt"
+ wildcard_constraints: foo="localbar"
+ log: "{foo}.log"
+ shell: "echo {input} {output} {log} > {output}; touch {log}"
+
+
+rule globalwildcard:
+ input: "{foo}.in"
+ output: "{foo}.txt"
+ log: "{foo}.log"
+ shell: "echo {input} {output} {log} > {output}; touch {log}"
diff --git a/tests/test_wildcard_keyword/expected-results/globalbar.log b/tests/test_wildcard_keyword/expected-results/globalbar.log
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_wildcard_keyword/expected-results/globalbar.txt b/tests/test_wildcard_keyword/expected-results/globalbar.txt
new file mode 100644
index 0000000..21ce158
--- /dev/null
+++ b/tests/test_wildcard_keyword/expected-results/globalbar.txt
@@ -0,0 +1 @@
+globalbar.in globalbar.txt globalbar.log
diff --git a/tests/test_wildcard_keyword/expected-results/localbar.log b/tests/test_wildcard_keyword/expected-results/localbar.log
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_wildcard_keyword/expected-results/localbar.txt b/tests/test_wildcard_keyword/expected-results/localbar.txt
new file mode 100644
index 0000000..84eed71
--- /dev/null
+++ b/tests/test_wildcard_keyword/expected-results/localbar.txt
@@ -0,0 +1 @@
+localbar.in localbar.txt localbar.log
diff --git a/tests/test_wildcard_keyword/expected-results/stringbar.log b/tests/test_wildcard_keyword/expected-results/stringbar.log
new file mode 100644
index 0000000..e69de29
diff --git a/tests/test_wildcard_keyword/expected-results/stringbar.txt b/tests/test_wildcard_keyword/expected-results/stringbar.txt
new file mode 100644
index 0000000..b9e1d22
--- /dev/null
+++ b/tests/test_wildcard_keyword/expected-results/stringbar.txt
@@ -0,0 +1 @@
+stringbar.in stringbar.txt stringbar.log
diff --git a/tests/tests.py b/tests/tests.py
index 168b616..b873051 100644
--- a/tests/tests.py
+++ b/tests/tests.py
@@ -11,6 +11,7 @@ from tempfile import mkdtemp
import hashlib
import urllib
from shutil import rmtree
+from shlex import quote
from snakemake import snakemake
@@ -68,12 +69,13 @@ def run(path,
subpath), '{} does not exist'.format(subpath)
subworkdir = os.path.join(tmpdir, "subworkdir")
os.mkdir(subworkdir)
- call('cp `find {} -maxdepth 1 -type f` {}'.format(subpath,
- subworkdir),
+ call('find {} -maxdepth 1 -type f -print0 | xargs -0 cp -t {}'.format(
+ quote(subpath), quote(subworkdir)),
shell=True)
config['subworkdir'] = subworkdir
- call('cp `find {} -maxdepth 1 -type f` {}'.format(path, tmpdir),
+ call('find {} -maxdepth 1 -type f -print0 | xargs -0 cp -t {}'.format(
+ quote(path), quote(tmpdir)),
shell=True)
success = snakemake(snakefile,
cores=cores,
@@ -234,6 +236,10 @@ def test_update_config():
run(dpath("test_update_config"))
+def test_wildcard_keyword():
+ run(dpath("test_wildcard_keyword"))
+
+
def test_benchmark():
run(dpath("test_benchmark"), check_md5=False)
@@ -300,25 +306,92 @@ def test_script():
def test_shadow():
run(dpath("test_shadow"))
+
def test_until():
run(dpath("test_until"),
until=["leveltwo_first", # rule name
"leveltwo_second.txt", # file name
"second_wildcard"]) # wildcard rule
+
def test_omitfrom():
- run(dpath("test_omitfrom"),
+ run(dpath("test_omitfrom"),
omit_from=["leveltwo_first", # rule name
"leveltwo_second.txt", # file name
"second_wildcard"]) # wildcard rule
+
def test_nonstr_params():
run(dpath("test_nonstr_params"))
+def test_delete_output():
+ run(dpath("test_delete_output"))
+
+
def test_input_generator():
run(dpath("test_input_generator"))
+
+def test_symlink_time_handling():
+ #See Snakefile for notes on why this fails on some systems
+ if os.utime in os.supports_follow_symlinks:
+ run(dpath("test_symlink_time_handling"))
+
+
+def test_issue328():
+ run(dpath("test_issue328"), forcerun=["split"])
+
+
+def test_get_log_none():
+ run(dpath("test_get_log_none"))
+
+
+def test_get_log_both():
+ run(dpath("test_get_log_both"))
+
+
+def test_get_log_stderr():
+ run(dpath("test_get_log_stderr"))
+
+
+def test_get_log_stdout():
+ run(dpath("test_get_log_stdout"))
+
+
+def test_get_log_complex():
+ run(dpath("test_get_log_complex"))
+
+
+def test_spaces_in_fnames():
+ run(dpath("test_spaces_in_fnames"),
+ # cluster="./qsub",
+ targets=["test bam file realigned.bam"],
+ verbose=True,
+ printshellcmds=True)
+
+
+def test_static_remote():
+ try:
+ import moto
+ import boto
+ import filechunkio
+
+ # only run the remote file test if the dependencies
+ # are installed, otherwise do nothing
+ run(dpath("test_static_remote"), cores=1)
+ except ImportError:
+ pass
+
+
+def test_deferred_func_eval():
+ run(dpath("test_deferred_func_eval"))
+
+
+def test_format_params():
+ run(dpath("test_format_params"), check_md5=True)
+
+
if __name__ == '__main__':
import nose
nose.run(defaultTest=__name__)
--
Alioth's /usr/local/bin/git-commit-notice on /srv/git.debian.org/git/debian-med/snakemake.git
More information about the debian-med-commit
mailing list